about summary refs log tree commit diff
path: root/src/libcore
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-04-24 20:20:03 -0700
committerBrian Anderson <banderson@mozilla.com>2013-05-13 14:24:10 -0700
commitb2fbd34603c5e209ab7a61a09ca943bd5b15f1a3 (patch)
tree6dca7e91760ec1d47acea7742426464aae3055cd /src/libcore
parent0b4d4edf8bc6a90c0bcbf06599ddf92fea1ed58f (diff)
downloadrust-b2fbd34603c5e209ab7a61a09ca943bd5b15f1a3.tar.gz
rust-b2fbd34603c5e209ab7a61a09ca943bd5b15f1a3.zip
core::rt: Begin implementing TcpStream
This ended up touching a lot of code related to error handling.
Diffstat (limited to 'src/libcore')
-rw-r--r--src/libcore/macros.rs8
-rw-r--r--src/libcore/rt/io/mod.rs11
-rw-r--r--src/libcore/rt/io/net/tcp.rs164
-rw-r--r--src/libcore/rt/local_services.rs3
-rw-r--r--src/libcore/rt/mod.rs12
-rw-r--r--src/libcore/rt/rtio.rs5
-rw-r--r--src/libcore/rt/sched/local_sched.rs31
-rw-r--r--src/libcore/rt/uv/mod.rs60
-rw-r--r--src/libcore/rt/uv/net.rs14
-rw-r--r--src/libcore/rt/uvio.rs24
-rw-r--r--src/libcore/rt/uvll.rs7
11 files changed, 282 insertions, 57 deletions
diff --git a/src/libcore/macros.rs b/src/libcore/macros.rs
index b19a753b715..b2e94f327c8 100644
--- a/src/libcore/macros.rs
+++ b/src/libcore/macros.rs
@@ -30,6 +30,14 @@ macro_rules! rtdebug (
     ($( $arg:expr),+) => ( $(let _ = $arg)*; )
 )
 
+macro_rules! rtassert (
+    ( $arg:expr ) => ( {
+        if !$arg {
+            abort!("assertion failed: %s", stringify!($arg));
+        }
+    } )
+)
+
 macro_rules! abort(
     ($( $msg:expr),+) => ( {
         rtdebug!($($msg),+);
diff --git a/src/libcore/rt/io/mod.rs b/src/libcore/rt/io/mod.rs
index d2249aad95e..93daa36dd60 100644
--- a/src/libcore/rt/io/mod.rs
+++ b/src/libcore/rt/io/mod.rs
@@ -252,7 +252,9 @@ pub use self::stdio::println;
 
 pub use self::file::FileStream;
 pub use self::net::ip::IpAddr;
+#[cfg(not(stage0))]
 pub use self::net::tcp::TcpListener;
+#[cfg(not(stage0))]
 pub use self::net::tcp::TcpStream;
 pub use self::net::udp::UdpStream;
 
@@ -266,6 +268,7 @@ pub mod file;
 
 /// Synchronous, non-blocking network I/O.
 pub mod net {
+    #[cfg(not(stage0))]
     pub mod tcp;
     pub mod udp;
     pub mod ip;
@@ -326,12 +329,14 @@ pub struct IoError {
 
 #[deriving(Eq)]
 pub enum IoErrorKind {
+    PreviousIoError,
+    OtherIoError,
+    EndOfFile,
     FileNotFound,
-    FilePermission,
+    PermissionDenied,
     ConnectionFailed,
     Closed,
-    OtherIoError,
-    PreviousIoError
+    ConnectionRefused,
 }
 
 // XXX: Can't put doc comments on macros
diff --git a/src/libcore/rt/io/net/tcp.rs b/src/libcore/rt/io/net/tcp.rs
index 00b48738d0b..2ac2ffb60a8 100644
--- a/src/libcore/rt/io/net/tcp.rs
+++ b/src/libcore/rt/io/net/tcp.rs
@@ -8,63 +8,179 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use prelude::*;
-use super::super::*;
-use super::ip::IpAddr;
+use option::{Option, Some, None};
+use result::{Result, Ok, Err};
+use ops::Drop;
+use rt::sched::local_sched::unsafe_borrow_io;
+use rt::io::net::ip::IpAddr;
+use rt::io::{Reader, Writer, Listener};
+use rt::io::io_error;
+use rt::rtio;
+use rt::rtio::{IoFactory, TcpListener, Stream};
 
-pub struct TcpStream;
+pub struct TcpStream {
+    rtstream: ~rtio::StreamObject
+}
 
 impl TcpStream {
-    pub fn connect(_addr: IpAddr) -> Option<TcpStream> {
-        fail!()
+    fn new(s: ~rtio::StreamObject) -> TcpStream {
+        TcpStream {
+            rtstream: s
+        }
+    }
+
+    pub fn connect(addr: IpAddr) -> Option<TcpStream> {
+        let stream = unsafe {
+            rtdebug!("borrowing io to connect");
+            let io = unsafe_borrow_io();
+            rtdebug!("about to connect");
+            io.connect(addr)
+        };
+
+        match stream {
+            Ok(s) => {
+                Some(TcpStream::new(s))
+            }
+            Err(ioerr) => {
+                rtdebug!("failed to connect: %?", ioerr);
+                io_error::cond.raise(ioerr);
+                return None;
+            }
+        }
     }
 }
 
 impl Reader for TcpStream {
-    fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
+    fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
+        let bytes_read = self.rtstream.read(buf);
+        match bytes_read {
+            Ok(read) => Some(read),
+            Err(_) => {
+                abort!("TODO");
+            }
+        }
+    }
 
     fn eof(&mut self) -> bool { fail!() }
 }
 
 impl Writer for TcpStream {
-    fn write(&mut self, _buf: &[u8]) { fail!() }
+    fn write(&mut self, buf: &[u8]) {
+        let res = self.rtstream.write(buf);
+        match res {
+            Ok(_) => (),
+            Err(_) => {
+                abort!("TODO");
+            }
+        }
+    }
 
     fn flush(&mut self) { fail!() }
 }
 
-pub struct TcpListener;
+impl Drop for TcpStream {
+    fn finalize(&self) {
+        self.rtstream.close();
+    }
+}
+
+pub struct TcpListener {
+    rtlistener: ~rtio::TcpListenerObject
+}
 
 impl TcpListener {
-    pub fn bind(_addr: IpAddr) -> Option<TcpListener> {
-        fail!()
+    pub fn bind(addr: IpAddr) -> Option<TcpListener> {
+        let listener = unsafe { unsafe_borrow_io().bind(addr) };
+        match listener {
+            Ok(l) => {
+                Some(TcpListener {
+                    rtlistener: l
+                })
+            }
+            Err(ioerr) => {
+                io_error::cond.raise(ioerr);
+                return None;
+            }
+        }
     }
 }
 
 impl Listener<TcpStream> for TcpListener {
-    fn accept(&mut self) -> Option<TcpStream> { fail!() }
+    fn accept(&mut self) -> Option<TcpStream> {
+        let rtstream = self.rtlistener.listen();
+        match rtstream {
+            Some(s) => {
+                Some(TcpStream::new(s))
+            }
+            None => {
+                abort!("TODO");
+            }
+        }
+    }
+}
+
+impl Drop for TcpListener {
+    fn finalize(&self) {
+        self.rtlistener.close();
+    }
 }
 
 #[cfg(test)]
 mod test {
+    use super::*;
+    use rt::test::*;
+    use rt::io::net::ip::Ipv4;
+    use rt::io::*;
+
+    #[test]
+    fn bind_error() {
+        do run_in_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == PermissionDenied);
+                called = true;
+            }).in {
+                let addr = Ipv4(0, 0, 0, 0, 1);
+                let listener = TcpListener::bind(addr);
+                assert!(listener.is_none());
+            }
+            assert!(called);
+        }
+    }
+
+    #[test]
+    fn connect_error() {
+        do run_in_newsched_task {
+            let mut called = false;
+            do io_error::cond.trap(|e| {
+                assert!(e.kind == ConnectionRefused);
+                called = true;
+            }).in {
+                let addr = Ipv4(0, 0, 0, 0, 1);
+                let stream = TcpStream::connect(addr);
+                assert!(stream.is_none());
+            }
+            assert!(called);
+        }
+    }
 
-    #[test] #[ignore]
+    #[test]
     fn smoke_test() {
-        /*do run_in_newsched_task {
+        do run_in_newsched_task {
             let addr = next_test_ip4();
 
-            do spawn_immediately {
-                let listener = TcpListener::bind(addr);
-                do listener.accept() {
-                    let mut buf = [0];
-                    listener.read(buf);
-                    assert!(buf[0] == 99);
-                }
+            do spawntask_immediately {
+                let mut listener = TcpListener::bind(addr);
+                let mut stream = listener.accept();
+                let mut buf = [0];
+                stream.read(buf);
+                assert!(buf[0] == 99);
             }
 
-            do spawn_immediately {
-                let stream = TcpStream::connect(addr);
+            do spawntask_immediately {
+                let mut stream = TcpStream::connect(addr);
                 stream.write([99]);
             }
-        }*/
+        }
     }
 }
diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs
index 01bef5e2458..47e8669b546 100644
--- a/src/libcore/rt/local_services.rs
+++ b/src/libcore/rt/local_services.rs
@@ -177,7 +177,8 @@ pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices {
             transmute_mut_region(&mut task.local_services)
         }
         None => {
-            fail!(~"no local services for schedulers yet")
+            // Don't fail. Infinite recursion
+            abort!("no local services for schedulers yet")
         }
     }
 }
diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs
index 25f6c870654..72715ea9b28 100644
--- a/src/libcore/rt/mod.rs
+++ b/src/libcore/rt/mod.rs
@@ -8,7 +8,15 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-/*! The Rust runtime, including the scheduler and I/O interface */
+/*! The Rust runtime, including the scheduler and I/O interface
+
+# XXX
+
+* Unsafe uses of borrowed pointers should just use unsafe pointers
+* Unwinding is not wired up correctly
+
+*/
+
 
 #[doc(hidden)];
 
@@ -16,7 +24,7 @@ use libc::c_char;
 
 #[path = "sched/mod.rs"]
 mod sched;
-mod rtio;
+pub mod rtio;
 pub mod uvll;
 mod uvio;
 #[path = "uv/mod.rs"]
diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs
index fd64438c61b..961a032607e 100644
--- a/src/libcore/rt/rtio.rs
+++ b/src/libcore/rt/rtio.rs
@@ -11,6 +11,7 @@
 use option::*;
 use result::*;
 
+use rt::io::IoError;
 use super::io::net::ip::IpAddr;
 
 // XXX: ~object doesn't work currently so these are some placeholder
@@ -28,8 +29,8 @@ pub trait EventLoop {
 }
 
 pub trait IoFactory {
-    fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>;
-    fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>;
+    fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError>;
+    fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError>;
 }
 
 pub trait TcpListener {
diff --git a/src/libcore/rt/sched/local_sched.rs b/src/libcore/rt/sched/local_sched.rs
index a7e02f30e01..c4153381d91 100644
--- a/src/libcore/rt/sched/local_sched.rs
+++ b/src/libcore/rt/sched/local_sched.rs
@@ -13,18 +13,21 @@
 use prelude::*;
 use ptr::mut_null;
 use libc::c_void;
-use cast::transmute;
+use cast;
+use cell::Cell;
 
 use super::Scheduler;
 use super::super::rtio::IoFactoryObject;
 use tls = super::super::thread_local_storage;
+use unstable::finally::Finally;
+
 #[cfg(test)] use super::super::uvio::UvEventLoop;
 
 /// Give the Scheduler to thread-local storage
 pub fn put(sched: ~Scheduler) {
     unsafe {
         let key = tls_key();
-        let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched);
+        let void_sched: *mut c_void = cast::transmute(sched);
         tls::set(key, void_sched);
     }
 }
@@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler {
     unsafe {
         let key = tls_key();
         let void_sched: *mut c_void = tls::get(key);
-        assert!(void_sched.is_not_null());
-        let sched = transmute::<*mut c_void, ~Scheduler>(void_sched);
+        rtassert!(void_sched.is_not_null());
+        let sched: ~Scheduler = cast::transmute(void_sched);
         tls::set(key, mut_null());
         return sched;
     }
@@ -55,8 +58,18 @@ pub fn exists() -> bool {
 /// While the scheduler is borrowed it is not available in TLS.
 pub fn borrow(f: &fn(&mut Scheduler)) {
     let mut sched = take();
-    f(sched);
-    put(sched);
+
+    // XXX: Need a different abstraction from 'finally' here to avoid unsafety
+    unsafe {
+        let unsafe_sched = cast::transmute_mut_region(&mut *sched);
+        let sched = Cell(sched);
+        
+        do (|| {
+            f(unsafe_sched);
+        }).finally {
+            put(sched.take());
+        }
+    }
 }
 
 /// Borrow a mutable reference to the thread-local Scheduler
@@ -68,11 +81,11 @@ pub fn borrow(f: &fn(&mut Scheduler)) {
 pub unsafe fn unsafe_borrow() -> &mut Scheduler {
     let key = tls_key();
     let mut void_sched: *mut c_void = tls::get(key);
-    assert!(void_sched.is_not_null());
+    rtassert!(void_sched.is_not_null());
     {
         let void_sched_ptr = &mut void_sched;
         let sched: &mut ~Scheduler = {
-            transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
+            cast::transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
         };
         let sched: &mut Scheduler = &mut **sched;
         return sched;
@@ -91,7 +104,7 @@ fn tls_key() -> tls::Key {
 fn maybe_tls_key() -> Option<tls::Key> {
     unsafe {
         let key: *mut c_void = rust_get_sched_tls_key();
-        let key: &mut tls::Key = transmute(key);
+        let key: &mut tls::Key = cast::transmute(key);
         let key = *key;
         // Check that the key has been initialized.
 
diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs
index 013a28abf28..87aa7524ed6 100644
--- a/src/libcore/rt/uv/mod.rs
+++ b/src/libcore/rt/uv/mod.rs
@@ -34,17 +34,22 @@ via `close` and `delete` methods.
 
 */
 
+use libc;
+use vec;
+use ptr;
+use cast;
+use str;
 use option::*;
 use str::raw::from_c_str;
 use to_str::ToStr;
-use vec;
-use ptr;
 use libc::{c_void, c_int, size_t, malloc, free};
 use cast::transmute;
 use ptr::null;
-use super::uvll;
 use unstable::finally::Finally;
 
+use rt::uvll;
+use rt::io::{IoError, FileNotFound};
+
 #[cfg(test)] use unstable::run_in_bare_thread;
 
 pub use self::file::{FsRequest, FsCallback};
@@ -211,6 +216,55 @@ fn error_smoke_test() {
     assert!(err.to_str() == ~"EOF: end of file");
 }
 
+pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError {
+    unsafe {
+        let loop_ = loop_from_watcher(watcher);
+        UvError(uvll::last_error(loop_.native_handle()))
+    }
+}
+
+pub fn uv_error_to_io_error(uverr: UvError) -> IoError {
+
+    // XXX: Could go in str::raw
+    unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str {
+        let s = s as *u8;
+        let mut curr = s, len = 0u;
+        while *curr != 0u8 {
+            len += 1u;
+            curr = ptr::offset(s, len);
+        }
+
+        str::raw::buf_as_slice(s, len, |d| cast::transmute(d))
+    }
+
+
+    unsafe {
+        // Importing error constants
+        use rt::uvll::*;
+        use rt::io::*;
+
+        // uv error descriptions are static
+        let c_desc = uvll::strerror(&*uverr);
+        let desc = c_str_to_static_slice(c_desc);
+
+        let kind = match uverr.code {
+            UNKNOWN => OtherIoError,
+            OK => OtherIoError,
+            EOF => EndOfFile,
+            EACCES => PermissionDenied,
+            ECONNREFUSED => ConnectionRefused,
+            e => {
+                abort!("unknown uv error code: %u", e as uint);
+            }
+        };
+
+        IoError {
+            kind: kind,
+            desc: desc,
+            detail: None
+        }
+    }
+}
 
 /// Given a uv handle, convert a callback status to a UvError
 // XXX: Follow the pattern below by parameterizing over T: Watcher, not T
diff --git a/src/libcore/rt/uv/net.rs b/src/libcore/rt/uv/net.rs
index 376231e3b27..6d8979e04d6 100644
--- a/src/libcore/rt/uv/net.rs
+++ b/src/libcore/rt/uv/net.rs
@@ -18,13 +18,14 @@ use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCa
             install_watcher_data, get_watcher_data, drop_watcher_data,
             vec_to_uv_buf, vec_from_uv_buf};
 use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6};
+use rt::uv::last_uv_error;
 
 #[cfg(test)] use cell::Cell;
 #[cfg(test)] use unstable::run_in_bare_thread;
 #[cfg(test)] use super::super::thread::Thread;
 #[cfg(test)] use super::super::test::*;
 
-fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
+fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T {
     match addr {
         Ipv4(a, b, c, d, p) => {
             unsafe {
@@ -34,7 +35,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) {
                                                 c as uint,
                                                 d as uint), p as int);
                 do (|| {
-                    f(addr);
+                    f(addr)
                 }).finally {
                     free_ip4_addr(addr);
                 }
@@ -193,15 +194,18 @@ pub impl TcpWatcher {
         }
     }
 
-    fn bind(&mut self, address: IpAddr) {
+    fn bind(&mut self, address: IpAddr) -> Result<(), UvError> {
         match address {
             Ipv4(*) => {
                 do ip4_as_uv_ip4(address) |addr| {
                     let result = unsafe {
                         uvll::tcp_bind(self.native_handle(), addr)
                     };
-                    // XXX: bind is likely to fail. need real error handling
-                    assert!(result == 0);
+                    if result == 0 {
+                        Ok(())
+                    } else {
+                        Err(last_uv_error(self))
+                    }
                 }
             }
             _ => fail!()
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs
index 8f1a6ea0d34..2c4ff37e4be 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uvio.rs
@@ -11,6 +11,7 @@
 use option::*;
 use result::*;
 
+use rt::io::IoError;
 use super::io::net::ip::IpAddr;
 use super::uv::*;
 use super::rtio::*;
@@ -98,11 +99,11 @@ impl IoFactory for UvIoFactory {
     // Connect to an address and return a new stream
     // NB: This blocks the task waiting on the connection.
     // It would probably be better to return a future
-    fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> {
+    fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError> {
         // Create a cell in the task to hold the result. We will fill
         // the cell before resuming the task.
         let result_cell = empty_cell();
-        let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
+        let result_cell_ptr: *Cell<Result<~StreamObject, IoError>> = &result_cell;
 
         let scheduler = local_sched::take();
         assert!(scheduler.in_task_context());
@@ -122,11 +123,12 @@ impl IoFactory for UvIoFactory {
                 rtdebug!("connect: in connect callback");
                 let maybe_stream = if status.is_none() {
                     rtdebug!("status is none");
-                    Some(~UvStream(stream_watcher))
+                    Ok(~UvStream(stream_watcher))
                 } else {
                     rtdebug!("status is some");
+                    // XXX: Wait for close
                     stream_watcher.close(||());
-                    None
+                    Err(uv_error_to_io_error(status.get()))
                 };
 
                 // Store the stream in the task's stack
@@ -142,10 +144,16 @@ impl IoFactory for UvIoFactory {
         return result_cell.take();
     }
 
-    fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> {
+    fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError> {
         let mut watcher = TcpWatcher::new(self.uv_loop());
-        watcher.bind(addr);
-        return Some(~UvTcpListener(watcher));
+        match watcher.bind(addr) {
+            Ok(_) => Ok(~UvTcpListener(watcher)),
+            Err(uverr) => {
+                // XXX: Should we wait until close completes?
+                watcher.as_stream().close(||());
+                Err(uv_error_to_io_error(uverr))
+            }
+        }
     }
 }
 
@@ -321,7 +329,7 @@ fn test_simple_io_no_connect() {
         let io = unsafe { local_sched::unsafe_borrow_io() };
         let addr = next_test_ip4();
         let maybe_chan = io.connect(addr);
-        assert!(maybe_chan.is_none());
+        assert!(maybe_chan.is_err());
     }
 }
 
diff --git a/src/libcore/rt/uvll.rs b/src/libcore/rt/uvll.rs
index 4bff3bff7d3..2a2812c6718 100644
--- a/src/libcore/rt/uvll.rs
+++ b/src/libcore/rt/uvll.rs
@@ -33,6 +33,13 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t};
 use libc::{malloc, free};
 use prelude::*;
 
+pub static UNKNOWN: c_int = -1;
+pub static OK: c_int = 0;
+pub static EOF: c_int = 1;
+pub static EADDRINFO: c_int = 2;
+pub static EACCES: c_int = 3;
+pub static ECONNREFUSED: c_int = 12;
+
 pub struct uv_err_t {
     code: c_int,
     sys_errno_: c_int