diff options
| author | Brian Anderson <banderson@mozilla.com> | 2013-04-24 20:20:03 -0700 |
|---|---|---|
| committer | Brian Anderson <banderson@mozilla.com> | 2013-05-13 14:24:10 -0700 |
| commit | b2fbd34603c5e209ab7a61a09ca943bd5b15f1a3 (patch) | |
| tree | 6dca7e91760ec1d47acea7742426464aae3055cd /src/libcore/rt | |
| parent | 0b4d4edf8bc6a90c0bcbf06599ddf92fea1ed58f (diff) | |
| download | rust-b2fbd34603c5e209ab7a61a09ca943bd5b15f1a3.tar.gz rust-b2fbd34603c5e209ab7a61a09ca943bd5b15f1a3.zip | |
core::rt: Begin implementing TcpStream
This ended up touching a lot of code related to error handling.
Diffstat (limited to 'src/libcore/rt')
| -rw-r--r-- | src/libcore/rt/io/mod.rs | 11 | ||||
| -rw-r--r-- | src/libcore/rt/io/net/tcp.rs | 164 | ||||
| -rw-r--r-- | src/libcore/rt/local_services.rs | 3 | ||||
| -rw-r--r-- | src/libcore/rt/mod.rs | 12 | ||||
| -rw-r--r-- | src/libcore/rt/rtio.rs | 5 | ||||
| -rw-r--r-- | src/libcore/rt/sched/local_sched.rs | 31 | ||||
| -rw-r--r-- | src/libcore/rt/uv/mod.rs | 60 | ||||
| -rw-r--r-- | src/libcore/rt/uv/net.rs | 14 | ||||
| -rw-r--r-- | src/libcore/rt/uvio.rs | 24 | ||||
| -rw-r--r-- | src/libcore/rt/uvll.rs | 7 |
10 files changed, 274 insertions, 57 deletions
diff --git a/src/libcore/rt/io/mod.rs b/src/libcore/rt/io/mod.rs index d2249aad95e..93daa36dd60 100644 --- a/src/libcore/rt/io/mod.rs +++ b/src/libcore/rt/io/mod.rs @@ -252,7 +252,9 @@ pub use self::stdio::println; pub use self::file::FileStream; pub use self::net::ip::IpAddr; +#[cfg(not(stage0))] pub use self::net::tcp::TcpListener; +#[cfg(not(stage0))] pub use self::net::tcp::TcpStream; pub use self::net::udp::UdpStream; @@ -266,6 +268,7 @@ pub mod file; /// Synchronous, non-blocking network I/O. pub mod net { + #[cfg(not(stage0))] pub mod tcp; pub mod udp; pub mod ip; @@ -326,12 +329,14 @@ pub struct IoError { #[deriving(Eq)] pub enum IoErrorKind { + PreviousIoError, + OtherIoError, + EndOfFile, FileNotFound, - FilePermission, + PermissionDenied, ConnectionFailed, Closed, - OtherIoError, - PreviousIoError + ConnectionRefused, } // XXX: Can't put doc comments on macros diff --git a/src/libcore/rt/io/net/tcp.rs b/src/libcore/rt/io/net/tcp.rs index 00b48738d0b..2ac2ffb60a8 100644 --- a/src/libcore/rt/io/net/tcp.rs +++ b/src/libcore/rt/io/net/tcp.rs @@ -8,63 +8,179 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use prelude::*; -use super::super::*; -use super::ip::IpAddr; +use option::{Option, Some, None}; +use result::{Result, Ok, Err}; +use ops::Drop; +use rt::sched::local_sched::unsafe_borrow_io; +use rt::io::net::ip::IpAddr; +use rt::io::{Reader, Writer, Listener}; +use rt::io::io_error; +use rt::rtio; +use rt::rtio::{IoFactory, TcpListener, Stream}; -pub struct TcpStream; +pub struct TcpStream { + rtstream: ~rtio::StreamObject +} impl TcpStream { - pub fn connect(_addr: IpAddr) -> Option<TcpStream> { - fail!() + fn new(s: ~rtio::StreamObject) -> TcpStream { + TcpStream { + rtstream: s + } + } + + pub fn connect(addr: IpAddr) -> Option<TcpStream> { + let stream = unsafe { + rtdebug!("borrowing io to connect"); + let io = unsafe_borrow_io(); + rtdebug!("about to connect"); + io.connect(addr) + }; + + match stream { + Ok(s) => { + Some(TcpStream::new(s)) + } + Err(ioerr) => { + rtdebug!("failed to connect: %?", ioerr); + io_error::cond.raise(ioerr); + return None; + } + } } } impl Reader for TcpStream { - fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() } + fn read(&mut self, buf: &mut [u8]) -> Option<uint> { + let bytes_read = self.rtstream.read(buf); + match bytes_read { + Ok(read) => Some(read), + Err(_) => { + abort!("TODO"); + } + } + } fn eof(&mut self) -> bool { fail!() } } impl Writer for TcpStream { - fn write(&mut self, _buf: &[u8]) { fail!() } + fn write(&mut self, buf: &[u8]) { + let res = self.rtstream.write(buf); + match res { + Ok(_) => (), + Err(_) => { + abort!("TODO"); + } + } + } fn flush(&mut self) { fail!() } } -pub struct TcpListener; +impl Drop for TcpStream { + fn finalize(&self) { + self.rtstream.close(); + } +} + +pub struct TcpListener { + rtlistener: ~rtio::TcpListenerObject +} impl TcpListener { - pub fn bind(_addr: IpAddr) -> Option<TcpListener> { - fail!() + pub fn bind(addr: IpAddr) -> Option<TcpListener> { + let listener = unsafe { unsafe_borrow_io().bind(addr) }; + match listener { + Ok(l) => { + Some(TcpListener { + rtlistener: l + }) + } + Err(ioerr) => { + io_error::cond.raise(ioerr); + return None; + } + } } } impl Listener<TcpStream> for TcpListener { - fn accept(&mut self) -> Option<TcpStream> { fail!() } + fn accept(&mut self) -> Option<TcpStream> { + let rtstream = self.rtlistener.listen(); + match rtstream { + Some(s) => { + Some(TcpStream::new(s)) + } + None => { + abort!("TODO"); + } + } + } +} + +impl Drop for TcpListener { + fn finalize(&self) { + self.rtlistener.close(); + } } #[cfg(test)] mod test { + use super::*; + use rt::test::*; + use rt::io::net::ip::Ipv4; + use rt::io::*; + + #[test] + fn bind_error() { + do run_in_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert!(e.kind == PermissionDenied); + called = true; + }).in { + let addr = Ipv4(0, 0, 0, 0, 1); + let listener = TcpListener::bind(addr); + assert!(listener.is_none()); + } + assert!(called); + } + } + + #[test] + fn connect_error() { + do run_in_newsched_task { + let mut called = false; + do io_error::cond.trap(|e| { + assert!(e.kind == ConnectionRefused); + called = true; + }).in { + let addr = Ipv4(0, 0, 0, 0, 1); + let stream = TcpStream::connect(addr); + assert!(stream.is_none()); + } + assert!(called); + } + } - #[test] #[ignore] + #[test] fn smoke_test() { - /*do run_in_newsched_task { + do run_in_newsched_task { let addr = next_test_ip4(); - do spawn_immediately { - let listener = TcpListener::bind(addr); - do listener.accept() { - let mut buf = [0]; - listener.read(buf); - assert!(buf[0] == 99); - } + do spawntask_immediately { + let mut listener = TcpListener::bind(addr); + let mut stream = listener.accept(); + let mut buf = [0]; + stream.read(buf); + assert!(buf[0] == 99); } - do spawn_immediately { - let stream = TcpStream::connect(addr); + do spawntask_immediately { + let mut stream = TcpStream::connect(addr); stream.write([99]); } - }*/ + } } } diff --git a/src/libcore/rt/local_services.rs b/src/libcore/rt/local_services.rs index 01bef5e2458..47e8669b546 100644 --- a/src/libcore/rt/local_services.rs +++ b/src/libcore/rt/local_services.rs @@ -177,7 +177,8 @@ pub unsafe fn unsafe_borrow_local_services() -> &mut LocalServices { transmute_mut_region(&mut task.local_services) } None => { - fail!(~"no local services for schedulers yet") + // Don't fail. Infinite recursion + abort!("no local services for schedulers yet") } } } diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 25f6c870654..72715ea9b28 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -8,7 +8,15 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -/*! The Rust runtime, including the scheduler and I/O interface */ +/*! The Rust runtime, including the scheduler and I/O interface + +# XXX + +* Unsafe uses of borrowed pointers should just use unsafe pointers +* Unwinding is not wired up correctly + +*/ + #[doc(hidden)]; @@ -16,7 +24,7 @@ use libc::c_char; #[path = "sched/mod.rs"] mod sched; -mod rtio; +pub mod rtio; pub mod uvll; mod uvio; #[path = "uv/mod.rs"] diff --git a/src/libcore/rt/rtio.rs b/src/libcore/rt/rtio.rs index fd64438c61b..961a032607e 100644 --- a/src/libcore/rt/rtio.rs +++ b/src/libcore/rt/rtio.rs @@ -11,6 +11,7 @@ use option::*; use result::*; +use rt::io::IoError; use super::io::net::ip::IpAddr; // XXX: ~object doesn't work currently so these are some placeholder @@ -28,8 +29,8 @@ pub trait EventLoop { } pub trait IoFactory { - fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject>; - fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject>; + fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError>; + fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError>; } pub trait TcpListener { diff --git a/src/libcore/rt/sched/local_sched.rs b/src/libcore/rt/sched/local_sched.rs index a7e02f30e01..c4153381d91 100644 --- a/src/libcore/rt/sched/local_sched.rs +++ b/src/libcore/rt/sched/local_sched.rs @@ -13,18 +13,21 @@ use prelude::*; use ptr::mut_null; use libc::c_void; -use cast::transmute; +use cast; +use cell::Cell; use super::Scheduler; use super::super::rtio::IoFactoryObject; use tls = super::super::thread_local_storage; +use unstable::finally::Finally; + #[cfg(test)] use super::super::uvio::UvEventLoop; /// Give the Scheduler to thread-local storage pub fn put(sched: ~Scheduler) { unsafe { let key = tls_key(); - let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched); + let void_sched: *mut c_void = cast::transmute(sched); tls::set(key, void_sched); } } @@ -34,8 +37,8 @@ pub fn take() -> ~Scheduler { unsafe { let key = tls_key(); let void_sched: *mut c_void = tls::get(key); - assert!(void_sched.is_not_null()); - let sched = transmute::<*mut c_void, ~Scheduler>(void_sched); + rtassert!(void_sched.is_not_null()); + let sched: ~Scheduler = cast::transmute(void_sched); tls::set(key, mut_null()); return sched; } @@ -55,8 +58,18 @@ pub fn exists() -> bool { /// While the scheduler is borrowed it is not available in TLS. pub fn borrow(f: &fn(&mut Scheduler)) { let mut sched = take(); - f(sched); - put(sched); + + // XXX: Need a different abstraction from 'finally' here to avoid unsafety + unsafe { + let unsafe_sched = cast::transmute_mut_region(&mut *sched); + let sched = Cell(sched); + + do (|| { + f(unsafe_sched); + }).finally { + put(sched.take()); + } + } } /// Borrow a mutable reference to the thread-local Scheduler @@ -68,11 +81,11 @@ pub fn borrow(f: &fn(&mut Scheduler)) { pub unsafe fn unsafe_borrow() -> &mut Scheduler { let key = tls_key(); let mut void_sched: *mut c_void = tls::get(key); - assert!(void_sched.is_not_null()); + rtassert!(void_sched.is_not_null()); { let void_sched_ptr = &mut void_sched; let sched: &mut ~Scheduler = { - transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) + cast::transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) }; let sched: &mut Scheduler = &mut **sched; return sched; @@ -91,7 +104,7 @@ fn tls_key() -> tls::Key { fn maybe_tls_key() -> Option<tls::Key> { unsafe { let key: *mut c_void = rust_get_sched_tls_key(); - let key: &mut tls::Key = transmute(key); + let key: &mut tls::Key = cast::transmute(key); let key = *key; // Check that the key has been initialized. diff --git a/src/libcore/rt/uv/mod.rs b/src/libcore/rt/uv/mod.rs index 013a28abf28..87aa7524ed6 100644 --- a/src/libcore/rt/uv/mod.rs +++ b/src/libcore/rt/uv/mod.rs @@ -34,17 +34,22 @@ via `close` and `delete` methods. */ +use libc; +use vec; +use ptr; +use cast; +use str; use option::*; use str::raw::from_c_str; use to_str::ToStr; -use vec; -use ptr; use libc::{c_void, c_int, size_t, malloc, free}; use cast::transmute; use ptr::null; -use super::uvll; use unstable::finally::Finally; +use rt::uvll; +use rt::io::{IoError, FileNotFound}; + #[cfg(test)] use unstable::run_in_bare_thread; pub use self::file::{FsRequest, FsCallback}; @@ -211,6 +216,55 @@ fn error_smoke_test() { assert!(err.to_str() == ~"EOF: end of file"); } +pub fn last_uv_error<H, W: Watcher + NativeHandle<*H>>(watcher: &W) -> UvError { + unsafe { + let loop_ = loop_from_watcher(watcher); + UvError(uvll::last_error(loop_.native_handle())) + } +} + +pub fn uv_error_to_io_error(uverr: UvError) -> IoError { + + // XXX: Could go in str::raw + unsafe fn c_str_to_static_slice(s: *libc::c_char) -> &'static str { + let s = s as *u8; + let mut curr = s, len = 0u; + while *curr != 0u8 { + len += 1u; + curr = ptr::offset(s, len); + } + + str::raw::buf_as_slice(s, len, |d| cast::transmute(d)) + } + + + unsafe { + // Importing error constants + use rt::uvll::*; + use rt::io::*; + + // uv error descriptions are static + let c_desc = uvll::strerror(&*uverr); + let desc = c_str_to_static_slice(c_desc); + + let kind = match uverr.code { + UNKNOWN => OtherIoError, + OK => OtherIoError, + EOF => EndOfFile, + EACCES => PermissionDenied, + ECONNREFUSED => ConnectionRefused, + e => { + abort!("unknown uv error code: %u", e as uint); + } + }; + + IoError { + kind: kind, + desc: desc, + detail: None + } + } +} /// Given a uv handle, convert a callback status to a UvError // XXX: Follow the pattern below by parameterizing over T: Watcher, not T diff --git a/src/libcore/rt/uv/net.rs b/src/libcore/rt/uv/net.rs index 376231e3b27..6d8979e04d6 100644 --- a/src/libcore/rt/uv/net.rs +++ b/src/libcore/rt/uv/net.rs @@ -18,13 +18,14 @@ use super::{Loop, Watcher, Request, UvError, Buf, Callback, NativeHandle, NullCa install_watcher_data, get_watcher_data, drop_watcher_data, vec_to_uv_buf, vec_from_uv_buf}; use super::super::io::net::ip::{IpAddr, Ipv4, Ipv6}; +use rt::uv::last_uv_error; #[cfg(test)] use cell::Cell; #[cfg(test)] use unstable::run_in_bare_thread; #[cfg(test)] use super::super::thread::Thread; #[cfg(test)] use super::super::test::*; -fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) { +fn ip4_as_uv_ip4<T>(addr: IpAddr, f: &fn(*sockaddr_in) -> T) -> T { match addr { Ipv4(a, b, c, d, p) => { unsafe { @@ -34,7 +35,7 @@ fn ip4_as_uv_ip4(addr: IpAddr, f: &fn(*sockaddr_in)) { c as uint, d as uint), p as int); do (|| { - f(addr); + f(addr) }).finally { free_ip4_addr(addr); } @@ -193,15 +194,18 @@ pub impl TcpWatcher { } } - fn bind(&mut self, address: IpAddr) { + fn bind(&mut self, address: IpAddr) -> Result<(), UvError> { match address { Ipv4(*) => { do ip4_as_uv_ip4(address) |addr| { let result = unsafe { uvll::tcp_bind(self.native_handle(), addr) }; - // XXX: bind is likely to fail. need real error handling - assert!(result == 0); + if result == 0 { + Ok(()) + } else { + Err(last_uv_error(self)) + } } } _ => fail!() diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 8f1a6ea0d34..2c4ff37e4be 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -11,6 +11,7 @@ use option::*; use result::*; +use rt::io::IoError; use super::io::net::ip::IpAddr; use super::uv::*; use super::rtio::*; @@ -98,11 +99,11 @@ impl IoFactory for UvIoFactory { // Connect to an address and return a new stream // NB: This blocks the task waiting on the connection. // It would probably be better to return a future - fn connect(&mut self, addr: IpAddr) -> Option<~StreamObject> { + fn connect(&mut self, addr: IpAddr) -> Result<~StreamObject, IoError> { // Create a cell in the task to hold the result. We will fill // the cell before resuming the task. let result_cell = empty_cell(); - let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell; + let result_cell_ptr: *Cell<Result<~StreamObject, IoError>> = &result_cell; let scheduler = local_sched::take(); assert!(scheduler.in_task_context()); @@ -122,11 +123,12 @@ impl IoFactory for UvIoFactory { rtdebug!("connect: in connect callback"); let maybe_stream = if status.is_none() { rtdebug!("status is none"); - Some(~UvStream(stream_watcher)) + Ok(~UvStream(stream_watcher)) } else { rtdebug!("status is some"); + // XXX: Wait for close stream_watcher.close(||()); - None + Err(uv_error_to_io_error(status.get())) }; // Store the stream in the task's stack @@ -142,10 +144,16 @@ impl IoFactory for UvIoFactory { return result_cell.take(); } - fn bind(&mut self, addr: IpAddr) -> Option<~TcpListenerObject> { + fn bind(&mut self, addr: IpAddr) -> Result<~TcpListenerObject, IoError> { let mut watcher = TcpWatcher::new(self.uv_loop()); - watcher.bind(addr); - return Some(~UvTcpListener(watcher)); + match watcher.bind(addr) { + Ok(_) => Ok(~UvTcpListener(watcher)), + Err(uverr) => { + // XXX: Should we wait until close completes? + watcher.as_stream().close(||()); + Err(uv_error_to_io_error(uverr)) + } + } } } @@ -321,7 +329,7 @@ fn test_simple_io_no_connect() { let io = unsafe { local_sched::unsafe_borrow_io() }; let addr = next_test_ip4(); let maybe_chan = io.connect(addr); - assert!(maybe_chan.is_none()); + assert!(maybe_chan.is_err()); } } diff --git a/src/libcore/rt/uvll.rs b/src/libcore/rt/uvll.rs index 4bff3bff7d3..2a2812c6718 100644 --- a/src/libcore/rt/uvll.rs +++ b/src/libcore/rt/uvll.rs @@ -33,6 +33,13 @@ use libc::{size_t, c_int, c_uint, c_void, c_char, uintptr_t}; use libc::{malloc, free}; use prelude::*; +pub static UNKNOWN: c_int = -1; +pub static OK: c_int = 0; +pub static EOF: c_int = 1; +pub static EADDRINFO: c_int = 2; +pub static EACCES: c_int = 3; +pub static ECONNREFUSED: c_int = 12; + pub struct uv_err_t { code: c_int, sys_errno_: c_int |
