diff options
| author | Eric Reed <ereed@mozilla.com> | 2013-06-17 12:35:27 -0700 |
|---|---|---|
| committer | Eric Reed <ereed@mozilla.com> | 2013-06-17 12:35:41 -0700 |
| commit | 33ae193a3c1a156e73bf6880366c9785dd4b7393 (patch) | |
| tree | 54972af2c60f84112c7dd277d1c014471cfde406 /src/libstd/rt | |
| parent | e42f28c05cb8e579d06492c49822944946341c9f (diff) | |
| download | rust-33ae193a3c1a156e73bf6880366c9785dd4b7393.tar.gz rust-33ae193a3c1a156e73bf6880366c9785dd4b7393.zip | |
Started to implemented UdpStream
Diffstat (limited to 'src/libstd/rt')
| -rw-r--r-- | src/libstd/rt/uv/uvio.rs | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 0f98ab11513..1274dbc3220 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -433,6 +433,86 @@ impl RtioTcpStream for UvTcpStream { } } +pub struct UvUdpStream { + watcher: UdpWatcher, + address: IpAddr +} + +impl UvUdpStream { + fn watcher(&self) -> UdpWatcher { self.watcher } + fn address(&self) -> IpAddr { self.address } +} + +impl Drop for UvUdpStream { + fn finalize(&self) { + rtdebug!("closing udp stream"); + let watcher = self.watcher(); + let scheduler = Local::take::<Scheduler>(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell(task); + do watcher.close { + let scheduler = Local::take::<Scheduler>(); + scheduler.resume_task_immediately(task_cell.take()); + } + } + } +} + +impl RtioUdpStream for UvUdpStream { + fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> { + let result_cell = empty_cell(); + let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell; + + let scheduler = Local::take::<Scheduler>(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let connection_address = self.address(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |sched, task| { + rtdebug!("read: entered scheduler context"); + assert!(!sched.in_task_context()); + let mut watcher = watcher; + let task_cell = Cell(task); + // XXX: see note in RtioTcpStream implementation for UvTcpStream + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) + }; + do watcher.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| { + let _ = flags; // TODO actually use flags + + // XXX: see note in RtioTcpStream implementation for UvTcpStream + let mut watcher = watcher; + watcher.recv_stop(); + + let incoming_address = net::uv_ip4_to_ip4(&addr); + let result = if status.is_none() { + assert!(nread >= 0); + if incoming_address != connection_address { + Ok(0u) + } else { + Ok(nread as uint) + } + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::<Scheduler>(); + scheduler.resume_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + let _ = buf; + fail!() + } +} + #[test] fn test_simple_io_no_connect() { do run_in_newsched_task { |
