about summary refs log tree commit diff
path: root/src/libstd/rt
diff options
context:
space:
mode:
authorEric Reed <ereed@mozilla.com>2013-06-17 12:35:27 -0700
committerEric Reed <ereed@mozilla.com>2013-06-17 12:35:41 -0700
commit33ae193a3c1a156e73bf6880366c9785dd4b7393 (patch)
tree54972af2c60f84112c7dd277d1c014471cfde406 /src/libstd/rt
parente42f28c05cb8e579d06492c49822944946341c9f (diff)
downloadrust-33ae193a3c1a156e73bf6880366c9785dd4b7393.tar.gz
rust-33ae193a3c1a156e73bf6880366c9785dd4b7393.zip
Started to implemented UdpStream
Diffstat (limited to 'src/libstd/rt')
-rw-r--r--src/libstd/rt/uv/uvio.rs80
1 files changed, 80 insertions, 0 deletions
diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs
index 0f98ab11513..1274dbc3220 100644
--- a/src/libstd/rt/uv/uvio.rs
+++ b/src/libstd/rt/uv/uvio.rs
@@ -433,6 +433,86 @@ impl RtioTcpStream for UvTcpStream {
     }
 }
 
+pub struct UvUdpStream {
+    watcher: UdpWatcher,
+    address: IpAddr
+}
+
+impl UvUdpStream {
+    fn watcher(&self) -> UdpWatcher { self.watcher }
+    fn address(&self) -> IpAddr { self.address }
+}
+
+impl Drop for UvUdpStream {
+    fn finalize(&self) {
+        rtdebug!("closing udp stream");
+        let watcher = self.watcher();
+        let scheduler = Local::take::<Scheduler>();
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell(task);
+            do watcher.close {
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
+    }
+}
+
+impl RtioUdpStream for UvUdpStream {
+    fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
+        let result_cell = empty_cell();
+        let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
+
+        let scheduler = Local::take::<Scheduler>();
+        assert!(scheduler.in_task_context());
+        let watcher = self.watcher();
+        let connection_address = self.address();
+        let buf_ptr: *&mut [u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |sched, task| {
+            rtdebug!("read: entered scheduler context");
+            assert!(!sched.in_task_context());
+            let mut watcher = watcher;
+            let task_cell = Cell(task);
+            // XXX: see note in RtioTcpStream implementation for UvTcpStream
+            let alloc: AllocCallback = |_| unsafe {
+                slice_to_uv_buf(*buf_ptr)
+            };
+            do watcher.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| {
+                let _ = flags; // TODO actually use flags
+
+                // XXX: see note in RtioTcpStream implementation for UvTcpStream
+                let mut watcher = watcher;
+                watcher.recv_stop();
+
+                let incoming_address = net::uv_ip4_to_ip4(&addr);
+                let result = if status.is_none() {
+                    assert!(nread >= 0);
+                    if incoming_address != connection_address {
+                        Ok(0u)
+                    } else {
+                        Ok(nread as uint)
+                    }
+                } else {
+                    Err(uv_error_to_io_error(status.unwrap()))
+                };
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Local::take::<Scheduler>();
+                scheduler.resume_task_immediately(task_cell.take());
+            }
+        }
+
+        assert!(!result_cell.is_empty());
+        return result_cell.take();
+    }
+
+    fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { 
+        let _ = buf;
+        fail!() 
+    }
+}
+
 #[test]
 fn test_simple_io_no_connect() {
     do run_in_newsched_task {