diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2013-12-05 18:19:06 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2013-12-16 17:47:11 -0800 |
| commit | 529e268ab900f1b6e731af64ce2aeecda3555f4e (patch) | |
| tree | 7ebb9ed2a7f36455b9550749a442522d45f0dc30 /src/libstd | |
| parent | bfa9064ba2687eb1d95708f72f41ddd9729a6ba1 (diff) | |
| download | rust-529e268ab900f1b6e731af64ce2aeecda3555f4e.tar.gz rust-529e268ab900f1b6e731af64ce2aeecda3555f4e.zip | |
Fallout of rewriting std::comm
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/comm/select.rs | 2 | ||||
| -rw-r--r-- | src/libstd/io/comm_adapters.rs | 48 | ||||
| -rw-r--r-- | src/libstd/io/mod.rs | 3 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 141 | ||||
| -rw-r--r-- | src/libstd/io/net/udp.rs | 67 | ||||
| -rw-r--r-- | src/libstd/io/net/unix.rs | 25 | ||||
| -rw-r--r-- | src/libstd/io/signal.rs | 11 | ||||
| -rw-r--r-- | src/libstd/io/timer.rs | 4 | ||||
| -rw-r--r-- | src/libstd/lib.rs | 1 | ||||
| -rw-r--r-- | src/libstd/prelude.rs | 2 | ||||
| -rw-r--r-- | src/libstd/rand/os.rs | 10 | ||||
| -rw-r--r-- | src/libstd/rt/comm.rs | 1141 | ||||
| -rw-r--r-- | src/libstd/rt/kill.rs | 37 | ||||
| -rw-r--r-- | src/libstd/rt/local_ptr.rs | 3 | ||||
| -rw-r--r-- | src/libstd/rt/message_queue.rs | 55 | ||||
| -rw-r--r-- | src/libstd/rt/mod.rs | 21 | ||||
| -rw-r--r-- | src/libstd/rt/rtio.rs | 4 | ||||
| -rw-r--r-- | src/libstd/rt/sched.rs | 105 | ||||
| -rw-r--r-- | src/libstd/rt/select.rs | 29 | ||||
| -rw-r--r-- | src/libstd/rt/task.rs | 62 | ||||
| -rw-r--r-- | src/libstd/rt/test.rs | 22 | ||||
| -rw-r--r-- | src/libstd/rt/thread.rs | 187 | ||||
| -rw-r--r-- | src/libstd/run.rs | 5 | ||||
| -rw-r--r-- | src/libstd/select.rs | 306 | ||||
| -rw-r--r-- | src/libstd/task/mod.rs | 72 | ||||
| -rw-r--r-- | src/libstd/task/spawn.rs | 21 | ||||
| -rw-r--r-- | src/libstd/unstable/mod.rs | 13 | ||||
| -rw-r--r-- | src/libstd/unstable/sync.rs | 23 |
28 files changed, 387 insertions, 2033 deletions
diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 81a77000bad..2d9bc6e9c12 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -41,6 +41,8 @@ //! } //! ) +#[allow(dead_code)]; + use cast; use iter::Iterator; use kinds::Send; diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index b3e5a9a0c86..7f94af8307e 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -10,7 +10,7 @@ use prelude::*; -use comm::{GenericPort, GenericChan, GenericSmartChan}; +use comm::{Port, Chan}; use cmp; use io; use option::{None, Option, Some}; @@ -30,15 +30,15 @@ use vec::{bytes, CopyableVector, MutableVector, ImmutableVector}; /// None => println!("At the end of the stream!") /// } /// ``` -pub struct PortReader<P> { +pub struct PortReader { priv buf: Option<~[u8]>, // A buffer of bytes received but not consumed. priv pos: uint, // How many of the buffered bytes have already be consumed. - priv port: P, // The port to pull data from. + priv port: Port<~[u8]>, // The port to pull data from. priv closed: bool, // Whether the pipe this port connects to has been closed. } -impl<P: GenericPort<~[u8]>> PortReader<P> { - pub fn new(port: P) -> PortReader<P> { +impl PortReader { + pub fn new(port: Port<~[u8]>) -> PortReader<P> { PortReader { buf: None, pos: 0, @@ -48,7 +48,7 @@ impl<P: GenericPort<~[u8]>> PortReader<P> { } } -impl<P: GenericPort<~[u8]>> Reader for PortReader<P> { +impl Reader for PortReader { fn read(&mut self, buf: &mut [u8]) -> Option<uint> { let mut num_read = 0; loop { @@ -67,7 +67,7 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> { break; } self.pos = 0; - self.buf = self.port.try_recv(); + self.buf = self.port.recv_opt(); self.closed = self.buf.is_none(); } if self.closed && num_read == 0 { @@ -89,17 +89,17 @@ impl<P: GenericPort<~[u8]>> Reader for PortReader<P> { /// let writer = ChanWriter::new(chan); /// writer.write("hello, world".as_bytes()); /// ``` -pub struct ChanWriter<C> { - chan: C, +pub struct ChanWriter { + chan: Chan<~[u8]>, } -impl<C: GenericSmartChan<~[u8]>> ChanWriter<C> { +impl ChanWriter { pub fn new(chan: C) -> ChanWriter<C> { ChanWriter { chan: chan } } } -impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> { +impl Writer for ChanWriter { fn write(&mut self, buf: &[u8]) { if !self.chan.try_send(buf.to_owned()) { io::io_error::cond.raise(io::IoError { @@ -111,28 +111,6 @@ impl<C: GenericSmartChan<~[u8]>> Writer for ChanWriter<C> { } } -pub struct ReaderPort<R>; - -impl<R: Reader> ReaderPort<R> { - pub fn new(_reader: R) -> ReaderPort<R> { fail!() } -} - -impl<R: Reader> GenericPort<~[u8]> for ReaderPort<R> { - fn recv(&self) -> ~[u8] { fail!() } - - fn try_recv(&self) -> Option<~[u8]> { fail!() } -} - -pub struct WriterChan<W>; - -impl<W: Writer> WriterChan<W> { - pub fn new(_writer: W) -> WriterChan<W> { fail!() } -} - -impl<W: Writer> GenericChan<~[u8]> for WriterChan<W> { - fn send(&self, _x: ~[u8]) { fail!() } -} - #[cfg(test)] mod test { @@ -144,7 +122,7 @@ mod test { #[test] fn test_port_reader() { - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); do task::spawn { chan.send(~[1u8, 2u8]); chan.send(~[]); @@ -199,7 +177,7 @@ mod test { #[test] fn test_chan_writer() { - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); let mut writer = ChanWriter::new(chan); writer.write_be_u32(42); diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index c0bdc2a2014..2e9056a6aee 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -318,9 +318,6 @@ mod option; /// Basic stream compression. XXX: Belongs with other flate code pub mod flate; -/// Interop between byte streams and pipes. Not sure where it belongs -pub mod comm_adapters; - /// Extension traits pub mod extensions; diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 3c7582db7b8..a6230ede7e3 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -151,7 +151,6 @@ mod test { use io::net::ip::{Ipv4Addr, SocketAddr}; use io::*; use prelude::*; - use rt::comm::oneshot; #[test] #[ignore] fn bind_error() { @@ -195,7 +194,7 @@ mod test { fn smoke_test_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -206,11 +205,9 @@ mod test { assert!(buf[0] == 99); } - do spawntask { - port.recv(); - let mut stream = TcpStream::connect(addr); - stream.write([99]); - } + port.recv(); + let mut stream = TcpStream::connect(addr); + stream.write([99]); } } @@ -218,7 +215,7 @@ mod test { fn smoke_test_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -229,11 +226,9 @@ mod test { assert!(buf[0] == 99); } - do spawntask { - port.recv(); - let mut stream = TcpStream::connect(addr); - stream.write([99]); - } + port.recv(); + let mut stream = TcpStream::connect(addr); + stream.write([99]); } } @@ -241,7 +236,7 @@ mod test { fn read_eof_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -252,11 +247,9 @@ mod test { assert!(nread.is_none()); } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -264,7 +257,7 @@ mod test { fn read_eof_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -275,11 +268,9 @@ mod test { assert!(nread.is_none()); } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -287,7 +278,7 @@ mod test { fn read_eof_twice_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -308,11 +299,9 @@ mod test { }) } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -320,7 +309,7 @@ mod test { fn read_eof_twice_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -341,11 +330,9 @@ mod test { }) } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -353,7 +340,7 @@ mod test { fn write_close_ip4() { do run_in_mt_newsched_task { let addr = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -377,11 +364,9 @@ mod test { } } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -389,7 +374,7 @@ mod test { fn write_close_ip6() { do run_in_mt_newsched_task { let addr = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -413,11 +398,9 @@ mod test { } } - do spawntask { - port.recv(); - let _stream = TcpStream::connect(addr); - // Close - } + port.recv(); + let _stream = TcpStream::connect(addr); + // Close } } @@ -426,7 +409,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip4(); let max = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -438,13 +421,11 @@ mod test { } } - do spawntask { - port.recv(); - max.times(|| { - let mut stream = TcpStream::connect(addr); - stream.write([99]); - }); - } + port.recv(); + max.times(|| { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + }); } } @@ -453,7 +434,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip6(); let max = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -465,13 +446,11 @@ mod test { } } - do spawntask { - port.recv(); - max.times(|| { - let mut stream = TcpStream::connect(addr); - stream.write([99]); - }); - } + port.recv(); + max.times(|| { + let mut stream = TcpStream::connect(addr); + stream.write([99]); + }); } } @@ -480,7 +459,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -520,7 +499,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip6(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -560,7 +539,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip4(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -599,7 +578,7 @@ mod test { do run_in_mt_newsched_task { let addr = next_test_ip6(); static MAX: int = 10; - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -653,7 +632,7 @@ mod test { #[cfg(test)] fn peer_name(addr: SocketAddr) { do run_in_mt_newsched_task { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = TcpListener::bind(addr).listen(); @@ -662,20 +641,18 @@ mod test { acceptor.accept(); } - do spawntask { - port.recv(); - let stream = TcpStream::connect(addr); + port.recv(); + let stream = TcpStream::connect(addr); - assert!(stream.is_some()); - let mut stream = stream.unwrap(); + assert!(stream.is_some()); + let mut stream = stream.unwrap(); - // Make sure peer_name gives us the - // address/port of the peer we've - // connected to. - let peer_name = stream.peer_name(); - assert!(peer_name.is_some()); - assert_eq!(addr, peer_name.unwrap()); - } + // Make sure peer_name gives us the + // address/port of the peer we've + // connected to. + let peer_name = stream.peer_name(); + assert!(peer_name.is_some()); + assert_eq!(addr, peer_name.unwrap()); } } diff --git a/src/libstd/io/net/udp.rs b/src/libstd/io/net/udp.rs index 87cf59aba3b..1e56f964bea 100644 --- a/src/libstd/io/net/udp.rs +++ b/src/libstd/io/net/udp.rs @@ -107,8 +107,7 @@ mod test { use rt::test::*; use io::net::ip::{Ipv4Addr, SocketAddr}; use io::*; - use option::{Some, None}; - use rt::comm::oneshot; + use prelude::*; #[test] #[ignore] fn bind_error() { @@ -131,7 +130,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -151,14 +150,12 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(ref mut client) => { - port.recv(); - client.sendto([99], server_ip) - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(ref mut client) => { + port.recv(); + client.sendto([99], server_ip) } + None => fail!() } } } @@ -168,7 +165,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -188,14 +185,12 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(ref mut client) => { - port.recv(); - client.sendto([99], server_ip) - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(ref mut client) => { + port.recv(); + client.sendto([99], server_ip) } + None => fail!() } } } @@ -205,7 +200,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip4(); let client_ip = next_test_ip4(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -226,16 +221,14 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(client) => { - let client = ~client; - let mut stream = client.connect(server_ip); - port.recv(); - stream.write([99]); - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(client) => { + let client = ~client; + let mut stream = client.connect(server_ip); + port.recv(); + stream.write([99]); } + None => fail!() } } } @@ -245,7 +238,7 @@ mod test { do run_in_mt_newsched_task { let server_ip = next_test_ip6(); let client_ip = next_test_ip6(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { match UdpSocket::bind(server_ip) { @@ -266,16 +259,14 @@ mod test { } } - do spawntask { - match UdpSocket::bind(client_ip) { - Some(client) => { - let client = ~client; - let mut stream = client.connect(server_ip); - port.recv(); - stream.write([99]); - } - None => fail!() + match UdpSocket::bind(client_ip) { + Some(client) => { + let client = ~client; + let mut stream = client.connect(server_ip); + port.recv(); + stream.write([99]); } + None => fail!() } } } diff --git a/src/libstd/io/net/unix.rs b/src/libstd/io/net/unix.rs index c1f75465d9c..d3fc265cf2a 100644 --- a/src/libstd/io/net/unix.rs +++ b/src/libstd/io/net/unix.rs @@ -152,25 +152,22 @@ mod tests { use super::*; use rt::test::*; use io::*; - use rt::comm::oneshot; fn smalltest(server: proc(UnixStream), client: proc(UnixStream)) { do run_in_mt_newsched_task { let path1 = next_test_unix(); let path2 = path1.clone(); - let (port, chan) = oneshot(); let (client, server) = (client, server); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = UnixListener::bind(&path1).listen(); chan.send(()); - server(acceptor.accept().unwrap()); + server.take()(acceptor.accept().unwrap()); } - do spawntask { - port.recv(); - client(UnixStream::connect(&path2).unwrap()); - } + port.recv(); + client.take()(UnixStream::connect(&path2).unwrap()); } } @@ -251,7 +248,7 @@ mod tests { let times = 10; let path1 = next_test_unix(); let path2 = path1.clone(); - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask { let mut acceptor = UnixListener::bind(&path1).listen(); @@ -264,13 +261,11 @@ mod tests { }) } - do spawntask { - port.recv(); - times.times(|| { - let mut stream = UnixStream::connect(&path2); - stream.write([100]); - }) - } + port.recv(); + times.times(|| { + let mut stream = UnixStream::connect(&path2); + stream.write([100]); + }) } } diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 3b6c6013dd2..c568a19dfa2 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -20,7 +20,7 @@ definitions for a number of signals. */ use clone::Clone; -use comm::{Port, SharedChan, stream}; +use comm::{Port, SharedChan}; use container::{Map, MutableMap}; use hashmap; use io::io_error; @@ -93,9 +93,9 @@ impl Listener { /// Creates a new listener for signals. Once created, signals are bound via /// the `register` method (otherwise nothing will ever be received) pub fn new() -> Listener { - let (port, chan) = stream(); + let (port, chan) = SharedChan::new(); Listener { - chan: SharedChan::new(chan), + chan: chan, port: port, handles: hashmap::HashMap::new(), } @@ -149,7 +149,6 @@ mod test { use libc; use io::timer; use super::{Listener, Interrupt}; - use comm::{GenericPort, Peekable}; // kill is only available on Unixes #[cfg(unix)] @@ -198,9 +197,7 @@ mod test { s2.unregister(Interrupt); sigint(); timer::sleep(10); - if s2.port.peek() { - fail!("Unexpected {:?}", s2.port.recv()); - } + assert!(s2.port.try_recv().is_none()); } #[cfg(windows)] diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index 202e02d55d0..5fb64ab3d09 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -38,7 +38,7 @@ loop { */ -use comm::{Port, PortOne}; +use comm::Port; use option::{Option, Some, None}; use result::{Ok, Err}; use io::io_error; @@ -86,7 +86,7 @@ impl Timer { /// Note that this invalidates any previous port which has been created by /// this timer, and that the returned port will be invalidated once the /// timer is destroyed (when it falls out of scope). - pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> { + pub fn oneshot(&mut self, msecs: u64) -> Port<()> { self.obj.oneshot(msecs) } diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs index 6948eb60b1f..53e26e435b7 100644 --- a/src/libstd/lib.rs +++ b/src/libstd/lib.rs @@ -156,7 +156,6 @@ pub mod trie; pub mod task; pub mod comm; -pub mod select; pub mod local_data; diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs index 83439d4c903..201259679cc 100644 --- a/src/libstd/prelude.rs +++ b/src/libstd/prelude.rs @@ -84,7 +84,7 @@ pub use vec::{OwnedVector, OwnedCopyableVector,OwnedEqVector, MutableVector}; pub use vec::{Vector, VectorVector, CopyableVector, ImmutableVector}; // Reexported runtime types -pub use comm::{stream, Port, Chan, GenericChan, GenericSmartChan, GenericPort, Peekable}; +pub use comm::{Port, Chan, SharedChan}; pub use task::spawn; /// Disposes of a value. diff --git a/src/libstd/rand/os.rs b/src/libstd/rand/os.rs index 5558b8b3348..1eaf1a29fa8 100644 --- a/src/libstd/rand/os.rs +++ b/src/libstd/rand/os.rs @@ -135,8 +135,10 @@ impl Drop for OSRng { #[cfg(test)] mod test { + use prelude::*; use super::*; use rand::Rng; + use task; #[test] fn test_os_rng() { @@ -151,16 +153,10 @@ mod test { #[test] fn test_os_rng_tasks() { - use task; - use comm; - use comm::{GenericChan, GenericPort}; - use option::{None, Some}; - use iter::{Iterator, range}; - use vec::{ImmutableVector, OwnedVector}; let mut chans = ~[]; for _ in range(0, 20) { - let (p, c) = comm::stream(); + let (p, c) = Chan::new(); chans.push(c); do task::spawn { // wait until all the tasks are ready to go. diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs deleted file mode 100644 index 2fa34994292..00000000000 --- a/src/libstd/rt/comm.rs +++ /dev/null @@ -1,1141 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Ports and channels. - -use option::*; -use cast; -use ops::Drop; -use rt::kill::BlockedTask; -use kinds::Send; -use rt; -use rt::sched::Scheduler; -use rt::local::Local; -use rt::select::{SelectInner, SelectPortInner}; -use select::{Select, SelectPort}; -use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Relaxed, SeqCst}; -use unstable::sync::UnsafeArc; -use util; -use util::Void; -use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable, SendDeferred}; -use cell::RefCell; -use clone::Clone; -use tuple::ImmutableTuple; - -/// A combined refcount / BlockedTask-as-uint pointer. -/// -/// Can be equal to the following values: -/// -/// * 2 - both endpoints are alive -/// * 1 - either the sender or the receiver is dead, determined by context -/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint) -type State = uint; - -static STATE_BOTH: State = 2; -static STATE_ONE: State = 1; - -/// The heap-allocated structure shared between two endpoints. -struct Packet<T> { - state: AtomicUint, - payload: Option<T>, -} - -// A one-shot channel. -pub struct ChanOne<T> { - priv void_packet: *mut Void, - priv suppress_finalize: bool -} - -/// A one-shot port. -pub struct PortOne<T> { - priv void_packet: *mut Void, - priv suppress_finalize: bool -} - -pub fn oneshot<T: Send>() -> (PortOne<T>, ChanOne<T>) { - let packet: ~Packet<T> = ~Packet { - state: AtomicUint::new(STATE_BOTH), - payload: None - }; - - unsafe { - let packet: *mut Void = cast::transmute(packet); - let port = PortOne { - void_packet: packet, - suppress_finalize: false - }; - let chan = ChanOne { - void_packet: packet, - suppress_finalize: false - }; - return (port, chan); - } -} - -impl<T: Send> ChanOne<T> { - #[inline] - fn packet(&self) -> *mut Packet<T> { - unsafe { - let p: *mut ~Packet<T> = cast::transmute(&self.void_packet); - let p: *mut Packet<T> = &mut **p; - return p; - } - } - - /// Send a message on the one-shot channel. If a receiver task is blocked - /// waiting for the message, will wake it up and reschedule to it. - pub fn send(self, val: T) { - self.try_send(val); - } - - /// As `send`, but also returns whether or not the receiver endpoint is still open. - pub fn try_send(self, val: T) -> bool { - self.try_send_inner(val, true) - } - - /// Send a message without immediately rescheduling to a blocked receiver. - /// This can be useful in contexts where rescheduling is forbidden, or to - /// optimize for when the sender expects to still have useful work to do. - pub fn send_deferred(self, val: T) { - self.try_send_deferred(val); - } - - /// As `send_deferred` and `try_send` together. - pub fn try_send_deferred(self, val: T) -> bool { - self.try_send_inner(val, false) - } - - // 'do_resched' configures whether the scheduler immediately switches to - // the receiving task, or leaves the sending task still running. - fn try_send_inner(mut self, val: T, do_resched: bool) -> bool { - if do_resched { - rtassert!(!rt::in_sched_context()); - } - - // In order to prevent starvation of other tasks in situations - // where a task sends repeatedly without ever receiving, we - // occassionally yield instead of doing a send immediately. - // Only doing this if we're doing a rescheduling send, - // otherwise the caller is expecting not to context switch. - if do_resched { - // XXX: This TLS hit should be combined with other uses of the scheduler below - let sched: ~Scheduler = Local::take(); - sched.maybe_yield(); - } - - let mut recvr_active = true; - let packet = self.packet(); - - unsafe { - - // Install the payload - rtassert!((*packet).payload.is_none()); - (*packet).payload = Some(val); - - // Atomically swap out the old state to figure out what - // the port's up to, issuing a release barrier to prevent - // reordering of the payload write. This also issues an - // acquire barrier that keeps the subsequent access of the - // ~Task pointer from being reordered. - let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); - - // Suppress the synchronizing actions in the finalizer. We're - // done with the packet. NB: In case of do_resched, this *must* - // happen before waking up a blocked task (or be unkillable), - // because we might get a kill signal during the reschedule. - self.suppress_finalize = true; - - match oldstate { - STATE_BOTH => { - // Port is not waiting yet. Nothing to do - } - STATE_ONE => { - // Port has closed. Need to clean up. - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - recvr_active = false; - } - task_as_state => { - // Port is blocked. Wake it up. - let recvr = BlockedTask::cast_from_uint(task_as_state); - if do_resched { - recvr.wake().map(|woken_task| { - Scheduler::run_task(woken_task); - }); - } else { - let mut sched = Local::borrow(None::<Scheduler>); - sched.get().enqueue_blocked_task(recvr); - } - } - } - } - - return recvr_active; - } -} - -impl<T: Send> PortOne<T> { - fn packet(&self) -> *mut Packet<T> { - unsafe { - let p: *mut ~Packet<T> = cast::transmute(&self.void_packet); - let p: *mut Packet<T> = &mut **p; - return p; - } - } - - /// Wait for a message on the one-shot port. Fails if the send end is closed. - pub fn recv(self) -> T { - match self.try_recv() { - Some(val) => val, - None => { - fail!("receiving on closed channel"); - } - } - } - - /// As `recv`, but returns `None` if the send end is closed rather than failing. - pub fn try_recv(mut self) -> Option<T> { - // Optimistic check. If data was sent already, we don't even need to block. - // No release barrier needed here; we're not handing off our task pointer yet. - if !self.optimistic_check() { - // No data available yet. - // Switch to the scheduler to put the ~Task into the Packet state. - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|sched, task| { - self.block_on(sched, task); - }) - } - - // Task resumes. - self.recv_ready() - } -} - -impl<T: Send> SelectInner for PortOne<T> { - #[inline] #[cfg(not(test))] - fn optimistic_check(&mut self) -> bool { - unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } - } - - #[inline] #[cfg(test)] - fn optimistic_check(&mut self) -> bool { - // The optimistic check is never necessary for correctness. For testing - // purposes, making it randomly return false simulates a racing sender. - use rand::{Rand}; - let mut sched = Local::borrow(None::<Scheduler>); - let actually_check = Rand::rand(&mut sched.get().rng); - if actually_check { - unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } - } else { - false - } - } - - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - unsafe { - // Atomically swap the task pointer into the Packet state, issuing - // an acquire barrier to prevent reordering of the subsequent read - // of the payload. Also issues a release barrier to prevent - // reordering of any previous writes to the task structure. - let task_as_state = task.cast_to_uint(); - let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst); - match oldstate { - STATE_BOTH => { - // Data has not been sent. Now we're blocked. - rtdebug!("non-rendezvous recv"); - false - } - STATE_ONE => { - // Re-record that we are the only owner of the packet. - // No barrier needed, even if the task gets reawoken - // on a different core -- this is analogous to writing a - // payload; a barrier in enqueueing the task protects it. - // NB(#8132). This *must* occur before the enqueue below. - // FIXME(#6842, #8130) This is usually only needed for the - // assertion in recv_ready, except in the case of select(). - // This won't actually ever have cacheline contention, but - // maybe should be optimized out with a cfg(test) anyway? - (*self.packet()).state.store(STATE_ONE, Relaxed); - - rtdebug!("rendezvous recv"); - - // Channel is closed. Switch back and check the data. - // NB: We have to drop back into the scheduler event loop here - // instead of switching immediately back or we could end up - // triggering infinite recursion on the scheduler's stack. - let recvr = BlockedTask::cast_from_uint(task_as_state); - sched.enqueue_blocked_task(recvr); - true - } - _ => rtabort!("can't block_on; a task is already blocked") - } - } - } - - // This is the only select trait function that's not also used in recv. - fn unblock_from(&mut self) -> bool { - let packet = self.packet(); - unsafe { - // In case the data is available, the acquire barrier here matches - // the release barrier the sender used to release the payload. - match (*packet).state.load(Acquire) { - // Impossible. We removed STATE_BOTH when blocking on it, and - // no self-respecting sender would put it back. - STATE_BOTH => rtabort!("refcount already 2 in unblock_from"), - // Here, a sender already tried to wake us up. Perhaps they - // even succeeded! Data is available. - STATE_ONE => true, - // Still registered as blocked. Need to "unblock" the pointer. - task_as_state => { - // In the window between the load and the CAS, a sender - // might take the pointer and set the refcount to ONE. If - // that happens, we shouldn't clobber that with BOTH! - // Acquire barrier again for the same reason as above. - match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH, - Acquire) { - STATE_BOTH => rtabort!("refcount became 2 in unblock_from"), - STATE_ONE => true, // Lost the race. Data available. - same_ptr => { - // We successfully unblocked our task pointer. - rtassert!(task_as_state == same_ptr); - let handle = BlockedTask::cast_from_uint(task_as_state); - // Because we are already awake, the handle we - // gave to this port shall already be empty. - handle.assert_already_awake(); - false - } - } - } - } - } - } -} - -impl<T: Send> Select for PortOne<T> { } - -impl<T: Send> SelectPortInner<T> for PortOne<T> { - fn recv_ready(mut self) -> Option<T> { - let packet = self.packet(); - - // No further memory barrier is needed here to access the - // payload. Some scenarios: - // - // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine. - // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us - // and ran on its thread. The sending task issued a read barrier when taking the - // pointer to the receiving task. - // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task) - // is pinned to some other scheduler, so the sending task had to give us to - // a different scheduler for resuming. That send synchronized memory. - unsafe { - // See corresponding store() above in block_on for rationale. - // FIXME(#8130) This can happen only in test builds. - // This load is not required for correctness and may be compiled out. - rtassert!((*packet).state.load(Relaxed) == STATE_ONE); - - let payload = (*packet).payload.take(); - - // The sender has closed up shop. Drop the packet. - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - // Suppress the synchronizing actions in the finalizer. We're done with the packet. - self.suppress_finalize = true; - return payload; - } - } -} - -impl<T: Send> SelectPort<T> for PortOne<T> { } - -impl<T: Send> Peekable<T> for PortOne<T> { - fn peek(&self) -> bool { - unsafe { - let packet: *mut Packet<T> = self.packet(); - let oldstate = (*packet).state.load(SeqCst); - match oldstate { - STATE_BOTH => false, - STATE_ONE => (*packet).payload.is_some(), - _ => rtabort!("peeked on a blocked task") - } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for ChanOne<T> { - fn drop(&mut self) { - if self.suppress_finalize { return } - - unsafe { - let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst); - match oldstate { - STATE_BOTH => { - // Port still active. It will destroy the Packet. - }, - STATE_ONE => { - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - }, - task_as_state => { - // The port is blocked waiting for a message we will never send. Wake it. - rtassert!((*self.packet()).payload.is_none()); - let recvr = BlockedTask::cast_from_uint(task_as_state); - recvr.wake().map(|woken_task| { - Scheduler::run_task(woken_task); - }); - } - } - } - } -} - -#[unsafe_destructor] -impl<T: Send> Drop for PortOne<T> { - fn drop(&mut self) { - if self.suppress_finalize { return } - - unsafe { - let oldstate = (*self.packet()).state.swap(STATE_ONE, SeqCst); - match oldstate { - STATE_BOTH => { - // Chan still active. It will destroy the packet. - }, - STATE_ONE => { - let _packet: ~Packet<T> = cast::transmute(self.void_packet); - } - task_as_state => { - // This case occurs during unwinding, when the blocked - // receiver was killed awake. The task can't still be - // blocked (we are it), but we need to free the handle. - let recvr = BlockedTask::cast_from_uint(task_as_state); - recvr.assert_already_awake(); - } - } - } - } -} - -struct StreamPayload<T> { - val: T, - next: PortOne<StreamPayload<T>> -} - -type StreamChanOne<T> = ChanOne<StreamPayload<T>>; -type StreamPortOne<T> = PortOne<StreamPayload<T>>; - -/// A channel with unbounded size. -pub struct Chan<T> { - // FIXME #5372. Using RefCell because we don't take &mut self - next: RefCell<StreamChanOne<T>> -} - -/// An port with unbounded size. -pub struct Port<T> { - // FIXME #5372. Using RefCell because we don't take &mut self - next: RefCell<Option<StreamPortOne<T>>> -} - -pub fn stream<T: Send>() -> (Port<T>, Chan<T>) { - let (pone, cone) = oneshot(); - let port = Port { next: RefCell::new(Some(pone)) }; - let chan = Chan { next: RefCell::new(cone) }; - return (port, chan); -} - -impl<T: Send> Chan<T> { - fn try_send_inner(&self, val: T, do_resched: bool) -> bool { - let (next_pone, mut cone) = oneshot(); - let mut b = self.next.borrow_mut(); - util::swap(&mut cone, b.get()); - cone.try_send_inner(StreamPayload { val: val, next: next_pone }, do_resched) - } -} - -impl<T: Send> GenericChan<T> for Chan<T> { - fn send(&self, val: T) { - self.try_send(val); - } -} - -impl<T: Send> GenericSmartChan<T> for Chan<T> { - fn try_send(&self, val: T) -> bool { - self.try_send_inner(val, true) - } -} - -impl<T: Send> SendDeferred<T> for Chan<T> { - fn send_deferred(&self, val: T) { - self.try_send_deferred(val); - } - fn try_send_deferred(&self, val: T) -> bool { - self.try_send_inner(val, false) - } -} - -impl<T: Send> GenericPort<T> for Port<T> { - fn recv(&self) -> T { - match self.try_recv() { - Some(val) => val, - None => { - fail!("receiving on closed channel"); - } - } - } - - fn try_recv(&self) -> Option<T> { - let mut b = self.next.borrow_mut(); - b.get().take().map_default(None, |pone| { - match pone.try_recv() { - Some(StreamPayload { val, next }) => { - *b.get() = Some(next); - Some(val) - } - None => None - } - }) - } -} - -impl<T: Send> Peekable<T> for Port<T> { - fn peek(&self) -> bool { - self.next.with_mut(|p| p.get_mut_ref().peek()) - } -} - -// XXX: Kind of gross. A Port<T> should be selectable so you can make an array -// of them, but a &Port<T> should also be selectable so you can select2 on it -// alongside a PortOne<U> without passing the port by value in recv_ready. - -impl<'a, T: Send> SelectInner for &'a Port<T> { - #[inline] - fn optimistic_check(&mut self) -> bool { - self.next.with_mut(|pone| { pone.get_mut_ref().optimistic_check() }) - } - - #[inline] - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - let mut b = self.next.borrow_mut(); - b.get().get_mut_ref().block_on(sched, task) - } - - #[inline] - fn unblock_from(&mut self) -> bool { - self.next.with_mut(|pone| { pone.get_mut_ref().unblock_from() }) - } -} - -impl<'a, T: Send> Select for &'a Port<T> { } - -impl<T: Send> SelectInner for Port<T> { - #[inline] - fn optimistic_check(&mut self) -> bool { - (&*self).optimistic_check() - } - - #[inline] - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - (&*self).block_on(sched, task) - } - - #[inline] - fn unblock_from(&mut self) -> bool { - (&*self).unblock_from() - } -} - -impl<T: Send> Select for Port<T> { } - -impl<'a, T: Send> SelectPortInner<T> for &'a Port<T> { - fn recv_ready(self) -> Option<T> { - let mut b = self.next.borrow_mut(); - match b.get().take_unwrap().recv_ready() { - Some(StreamPayload { val, next }) => { - *b.get() = Some(next); - Some(val) - } - None => None - } - } -} - -impl<'a, T: Send> SelectPort<T> for &'a Port<T> { } - -pub struct SharedChan<T> { - // Just like Chan, but a shared AtomicOption - priv next: UnsafeArc<AtomicOption<StreamChanOne<T>>> -} - -impl<T: Send> SharedChan<T> { - pub fn new(chan: Chan<T>) -> SharedChan<T> { - let next = chan.next.unwrap(); - let next = AtomicOption::new(~next); - SharedChan { next: UnsafeArc::new(next) } - } -} - -impl<T: Send> SharedChan<T> { - fn try_send_inner(&self, val: T, do_resched: bool) -> bool { - unsafe { - let (next_pone, next_cone) = oneshot(); - let cone = (*self.next.get()).swap(~next_cone, SeqCst); - cone.unwrap().try_send_inner(StreamPayload { val: val, next: next_pone }, - do_resched) - } - } -} - -impl<T: Send> GenericChan<T> for SharedChan<T> { - fn send(&self, val: T) { - self.try_send(val); - } -} - -impl<T: Send> GenericSmartChan<T> for SharedChan<T> { - fn try_send(&self, val: T) -> bool { - self.try_send_inner(val, true) - } -} - -impl<T: Send> SendDeferred<T> for SharedChan<T> { - fn send_deferred(&self, val: T) { - self.try_send_deferred(val); - } - fn try_send_deferred(&self, val: T) -> bool { - self.try_send_inner(val, false) - } -} - -impl<T: Send> Clone for SharedChan<T> { - fn clone(&self) -> SharedChan<T> { - SharedChan { - next: self.next.clone() - } - } -} - -pub struct SharedPort<T> { - // The next port on which we will receive the next port on which we will receive T - priv next_link: UnsafeArc<AtomicOption<PortOne<StreamPortOne<T>>>> -} - -impl<T: Send> SharedPort<T> { - pub fn new(port: Port<T>) -> SharedPort<T> { - // Put the data port into a new link pipe - let next_data_port = port.next.unwrap().unwrap(); - let (next_link_port, next_link_chan) = oneshot(); - next_link_chan.send(next_data_port); - let next_link = AtomicOption::new(~next_link_port); - SharedPort { next_link: UnsafeArc::new(next_link) } - } -} - -impl<T: Send> GenericPort<T> for SharedPort<T> { - fn recv(&self) -> T { - match self.try_recv() { - Some(val) => val, - None => { - fail!("receiving on a closed channel"); - } - } - } - - fn try_recv(&self) -> Option<T> { - unsafe { - let (next_link_port, next_link_chan) = oneshot(); - let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst); - let link_port = link_port.unwrap(); - let data_port = link_port.recv(); - let (next_data_port, res) = match data_port.try_recv() { - Some(StreamPayload { val, next }) => { - (next, Some(val)) - } - None => { - let (next_data_port, _) = oneshot(); - (next_data_port, None) - } - }; - next_link_chan.send(next_data_port); - return res; - } - } -} - -impl<T: Send> Clone for SharedPort<T> { - fn clone(&self) -> SharedPort<T> { - SharedPort { - next_link: self.next_link.clone() - } - } -} - -// FIXME #7760: Need better name -type MegaPipe<T> = (SharedPort<T>, SharedChan<T>); - -pub fn megapipe<T: Send>() -> MegaPipe<T> { - let (port, chan) = stream(); - (SharedPort::new(port), SharedChan::new(chan)) -} - -impl<T: Send> GenericChan<T> for MegaPipe<T> { - fn send(&self, val: T) { - self.second_ref().send(val) - } -} - -impl<T: Send> GenericSmartChan<T> for MegaPipe<T> { - fn try_send(&self, val: T) -> bool { - self.second_ref().try_send(val) - } -} - -impl<T: Send> GenericPort<T> for MegaPipe<T> { - fn recv(&self) -> T { - self.first_ref().recv() - } - - fn try_recv(&self) -> Option<T> { - self.first_ref().try_recv() - } -} - -impl<T: Send> SendDeferred<T> for MegaPipe<T> { - fn send_deferred(&self, val: T) { - self.second_ref().send_deferred(val) - } - fn try_send_deferred(&self, val: T) -> bool { - self.second_ref().try_send_deferred(val) - } -} - -#[cfg(test)] -mod test { - use super::*; - use option::*; - use rt::test::*; - use num::Times; - use rt::util; - - #[test] - fn oneshot_single_thread_close_port_first() { - // Simple test of closing without sending - do run_in_newsched_task { - let (port, _chan) = oneshot::<int>(); - { let _p = port; } - } - } - - #[test] - fn oneshot_single_thread_close_chan_first() { - // Simple test of closing without sending - do run_in_newsched_task { - let (_port, chan) = oneshot::<int>(); - { let _c = chan; } - } - } - - #[test] - fn oneshot_single_thread_send_port_close() { - // Testing that the sender cleans up the payload if receiver is closed - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - { let _p = port; } - chan.send(~0); - } - } - - #[test] - fn oneshot_single_thread_recv_chan_close() { - // Receiving on a closed chan will fail - do run_in_newsched_task { - let res = do spawntask_try { - let (port, chan) = oneshot::<~int>(); - { let _c = chan; } - port.recv(); - }; - // What is our res? - rtdebug!("res is: {:?}", res.is_err()); - assert!(res.is_err()); - } - } - - #[test] - fn oneshot_single_thread_send_then_recv() { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - chan.send(~10); - assert!(port.recv() == ~10); - } - } - - #[test] - fn oneshot_single_thread_try_send_open() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - assert!(chan.try_send(10)); - assert!(port.recv() == 10); - } - } - - #[test] - fn oneshot_single_thread_try_send_closed() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - { let _p = port; } - assert!(!chan.try_send(10)); - } - } - - #[test] - fn oneshot_single_thread_try_recv_open() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - chan.send(10); - assert!(port.try_recv() == Some(10)); - } - } - - #[test] - fn oneshot_single_thread_try_recv_closed() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - { let _c = chan; } - assert!(port.try_recv() == None); - } - } - - #[test] - fn oneshot_single_thread_peek_data() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - assert!(!port.peek()); - chan.send(10); - assert!(port.peek()); - } - } - - #[test] - fn oneshot_single_thread_peek_close() { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - { let _c = chan; } - assert!(!port.peek()); - assert!(!port.peek()); - } - } - - #[test] - fn oneshot_single_thread_peek_open() { - do run_in_newsched_task { - let (port, _) = oneshot::<int>(); - assert!(!port.peek()); - } - } - - #[test] - fn oneshot_multi_task_recv_then_send() { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - do spawntask { - assert!(port.recv() == ~10); - } - - chan.send(~10); - } - } - - #[test] - fn oneshot_multi_task_recv_then_close() { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - do spawntask_later { - let _ = chan; - } - let res = do spawntask_try { - assert!(port.recv() == ~10); - }; - assert!(res.is_err()); - } - } - - #[test] - fn oneshot_multi_thread_close_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - let thread = do spawntask_thread { - let _ = port; - }; - let _chan = chan; - thread.join(); - } - }) - } - - #[test] - fn oneshot_multi_thread_send_close_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - let thread1 = do spawntask_thread { - let _ = port; - }; - let thread2 = do spawntask_thread { - chan.send(1); - }; - thread1.join(); - thread2.join(); - } - }) - } - - #[test] - fn oneshot_multi_thread_recv_close_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<int>(); - let thread1 = do spawntask_thread { - let port = port; - let res = do spawntask_try { - port.recv(); - }; - assert!(res.is_err()); - }; - let thread2 = do spawntask_thread { - let chan = chan; - do spawntask { - let _ = chan; - } - }; - thread1.join(); - thread2.join(); - } - }) - } - - #[test] - fn oneshot_multi_thread_send_recv_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_newsched_task { - let (port, chan) = oneshot::<~int>(); - let thread1 = do spawntask_thread { - chan.send(~10); - }; - let thread2 = do spawntask_thread { - assert!(port.recv() == ~10); - }; - thread1.join(); - thread2.join(); - } - }) - } - - #[test] - fn stream_send_recv_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - stress_factor().times(|| { - do run_in_mt_newsched_task { - let (port, chan) = stream::<~int>(); - - send(chan, 0); - recv(port, 0); - - fn send(chan: Chan<~int>, i: int) { - if i == 10 { return } - - do spawntask_random { - chan.send(~i); - send(chan, i + 1); - } - } - - fn recv(port: Port<~int>, i: int) { - if i == 10 { return } - - do spawntask_random { - assert!(port.recv() == ~i); - recv(port, i + 1); - }; - } - } - }) - } - - #[test] - fn recv_a_lot() { - // Regression test that we don't run out of stack in scheduler context - do run_in_newsched_task { - let (port, chan) = stream(); - 10000.times(|| { chan.send(()) }); - 10000.times(|| { port.recv() }); - } - } - - #[test] - fn shared_chan_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - do run_in_mt_newsched_task { - let (port, chan) = stream(); - let chan = SharedChan::new(chan); - let total = stress_factor() + 100; - total.times(|| { - let chan_clone = chan.clone(); - do spawntask_random { - chan_clone.send(()); - } - }); - - total.times(|| { - port.recv(); - }); - } - } - - #[test] - fn shared_port_stress() { - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - do run_in_mt_newsched_task { - let (end_port, end_chan) = stream(); - let (port, chan) = stream(); - let end_chan = SharedChan::new(end_chan); - let port = SharedPort::new(port); - let total = stress_factor() + 100; - total.times(|| { - let end_chan_clone = end_chan.clone(); - let port_clone = port.clone(); - do spawntask_random { - port_clone.recv(); - end_chan_clone.send(()); - } - }); - - total.times(|| { - chan.send(()); - }); - - total.times(|| { - end_port.recv(); - }); - } - } - - #[test] - fn shared_port_close_simple() { - do run_in_mt_newsched_task { - let (port, chan) = stream::<()>(); - let port = SharedPort::new(port); - { let _chan = chan; } - assert!(port.try_recv().is_none()); - } - } - - #[test] - fn shared_port_close() { - do run_in_mt_newsched_task { - let (end_port, end_chan) = stream::<bool>(); - let (port, chan) = stream::<()>(); - let end_chan = SharedChan::new(end_chan); - let port = SharedPort::new(port); - let chan = SharedChan::new(chan); - let send_total = 10; - let recv_total = 20; - do spawntask_random { - send_total.times(|| { - let chan_clone = chan.clone(); - do spawntask_random { - chan_clone.send(()); - } - }); - } - let end_chan_clone = end_chan.clone(); - do spawntask_random { - recv_total.times(|| { - let port_clone = port.clone(); - let end_chan_clone = end_chan_clone.clone(); - do spawntask_random { - let recvd = port_clone.try_recv().is_some(); - end_chan_clone.send(recvd); - } - }); - } - - let mut recvd = 0; - recv_total.times(|| { - recvd += if end_port.recv() { 1 } else { 0 }; - }); - - assert!(recvd == send_total); - } - } - - #[test] - fn megapipe_stress() { - use rand; - use rand::Rng; - - if util::limit_thread_creation_due_to_osx_and_valgrind() { return; } - - do run_in_mt_newsched_task { - let (end_port, end_chan) = stream::<()>(); - let end_chan = SharedChan::new(end_chan); - let pipe = megapipe(); - let total = stress_factor() + 10; - let mut rng = rand::rng(); - total.times(|| { - let msgs = rng.gen_range(0u, 10); - let pipe_clone = pipe.clone(); - let end_chan_clone = end_chan.clone(); - do spawntask_random { - msgs.times(|| { - pipe_clone.send(()); - }); - msgs.times(|| { - pipe_clone.recv(); - }); - } - - end_chan_clone.send(()); - }); - - total.times(|| { - end_port.recv(); - }); - } - } - - #[test] - fn send_deferred() { - use unstable::sync::atomic; - - // Tests no-rescheduling of send_deferred on all types of channels. - do run_in_newsched_task { - let (pone, cone) = oneshot(); - let (pstream, cstream) = stream(); - let (pshared, cshared) = stream(); - let cshared = SharedChan::new(cshared); - let mp = megapipe(); - - do spawntask { pone.recv(); } - do spawntask { pstream.recv(); } - do spawntask { pshared.recv(); } - let p_mp = mp.clone(); - do spawntask { p_mp.recv(); } - - unsafe { - let _guard = atomic(); - cone.send_deferred(()); - cstream.send_deferred(()); - cshared.send_deferred(()); - mp.send_deferred(()); - } - } - } - -} diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index e3f9cd09632..f4f128cf5aa 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -153,8 +153,9 @@ There are two known issues with the current scheme for exit code propagation. use cast; use option::{Option, Some, None}; use prelude::*; +use iter; +use task::TaskResult; use rt::task::Task; -use rt::task::UnwindResult; use unstable::atomics::{AtomicUint, SeqCst}; use unstable::sync::UnsafeArc; @@ -169,11 +170,21 @@ pub enum BlockedTask { pub struct Death { // Action to be done with the exit code. If set, also makes the task wait // until all its watched children exit before collecting the status. - on_exit: Option<proc(UnwindResult)>, + on_exit: Option<proc(TaskResult)>, // nesting level counter for unstable::atomically calls (0 == can deschedule). priv wont_sleep: int, } +pub struct BlockedTaskIterator { + priv inner: UnsafeArc<AtomicUint>, +} + +impl Iterator<BlockedTask> for BlockedTaskIterator { + fn next(&mut self) -> Option<BlockedTask> { + Some(Shared(self.inner.clone())) + } +} + impl BlockedTask { /// Returns Some if the task was successfully woken; None if already killed. pub fn wake(self) -> Option<~Task> { @@ -194,19 +205,17 @@ impl BlockedTask { } /// Converts one blocked task handle to a list of many handles to the same. - pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] { - let handles = match self { + pub fn make_selectable(self, num_handles: uint) + -> iter::Take<BlockedTaskIterator> + { + let arc = match self { Owned(task) => { - let flag = unsafe { - AtomicUint::new(cast::transmute(task)) - }; - UnsafeArc::newN(flag, num_handles) + let flag = unsafe { AtomicUint::new(cast::transmute(task)) }; + UnsafeArc::new(flag) } - Shared(arc) => arc.cloneN(num_handles), + Shared(arc) => arc.clone(), }; - // Even if the task was unkillable before, we use 'Killable' because - // multiple pipes will have handles. It does not really mean killable. - handles.move_iter().map(|x| Shared(x)).collect() + BlockedTaskIterator{ inner: arc }.take(num_handles) } // This assertion has two flavours because the wake involves an atomic op. @@ -254,10 +263,10 @@ impl Death { } /// Collect failure exit codes from children and propagate them to a parent. - pub fn collect_failure(&mut self, result: UnwindResult) { + pub fn collect_failure(&mut self, result: TaskResult) { match self.on_exit.take() { + Some(f) => f(result), None => {} - Some(on_exit) => on_exit(result), } } diff --git a/src/libstd/rt/local_ptr.rs b/src/libstd/rt/local_ptr.rs index 66fe9742121..925aa802ad5 100644 --- a/src/libstd/rt/local_ptr.rs +++ b/src/libstd/rt/local_ptr.rs @@ -77,10 +77,9 @@ pub unsafe fn borrow<T>() -> Borrowed<T> { /// it wherever possible. #[cfg(not(windows), not(target_os = "android"))] pub mod compiled { - #[cfg(not(test))] - use libc::c_void; use cast; use option::{Option, Some, None}; + #[cfg(not(test))] use libc::c_void; #[cfg(test)] pub use realstd::rt::shouldnt_be_public::RT_TLS_PTR; diff --git a/src/libstd/rt/message_queue.rs b/src/libstd/rt/message_queue.rs deleted file mode 100644 index 10e457368f0..00000000000 --- a/src/libstd/rt/message_queue.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A concurrent queue that supports multiple producers and a -//! single consumer. - -use kinds::Send; -use vec::OwnedVector; -use option::Option; -use clone::Clone; -use rt::mpsc_queue::Queue; - -pub struct MessageQueue<T> { - priv queue: Queue<T> -} - -impl<T: Send> MessageQueue<T> { - pub fn new() -> MessageQueue<T> { - MessageQueue { - queue: Queue::new() - } - } - - #[inline] - pub fn push(&mut self, value: T) { - self.queue.push(value) - } - - #[inline] - pub fn pop(&mut self) -> Option<T> { - self.queue.pop() - } - - /// A pop that may sometimes miss enqueued elements, but is much faster - /// to give up without doing any synchronization - #[inline] - pub fn casual_pop(&mut self) -> Option<T> { - self.queue.pop() - } -} - -impl<T: Send> Clone for MessageQueue<T> { - fn clone(&self) -> MessageQueue<T> { - MessageQueue { - queue: self.queue.clone() - } - } -} diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index ce8d1ab1983..5d2179e8b96 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -65,7 +65,7 @@ use ptr::RawPtr; use rt::local::Local; use rt::sched::{Scheduler, Shutdown}; use rt::sleeper_list::SleeperList; -use rt::task::UnwindResult; +use task::TaskResult; use rt::task::{Task, SchedTask, GreenTask, Sched}; use send_str::SendStrStatic; use unstable::atomics::{AtomicInt, AtomicBool, SeqCst}; @@ -91,8 +91,6 @@ pub use self::kill::BlockedTask; // XXX: these probably shouldn't be public... #[doc(hidden)] pub mod shouldnt_be_public { - pub use super::select::SelectInner; - pub use super::select::{SelectInner, SelectPortInner}; pub use super::local_ptr::native::maybe_tls_key; #[cfg(not(windows), not(target_os = "android"))] pub use super::local_ptr::compiled::RT_TLS_PTR; @@ -123,11 +121,11 @@ pub mod rtio; /// or task-local storage. pub mod local; -/// A parallel queue. -pub mod message_queue; - /// A mostly lock-free multi-producer, single consumer queue. -mod mpsc_queue; +pub mod mpsc_queue; + +/// A lock-free single-producer, single consumer queue. +pub mod spsc_queue; /// A lock-free multi-producer, multi-consumer bounded queue. mod mpmc_bounded_queue; @@ -169,11 +167,6 @@ pub mod rc; /// scheduler and task context pub mod tube; -/// Simple reimplementation of std::comm -pub mod comm; - -mod select; - /// The runtime needs to be able to put a pointer into thread-local storage. mod local_ptr; @@ -349,7 +342,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int { // When the main task exits, after all the tasks in the main // task tree, shut down the schedulers and set the exit code. let handles = handles; - let on_exit: proc(UnwindResult) = proc(exit_success) { + let on_exit: proc(TaskResult) = proc(exit_success) { unsafe { assert!(!(*exited_already.get()).swap(true, SeqCst), "the runtime already exited"); @@ -361,7 +354,7 @@ fn run_(main: proc(), use_main_sched: bool) -> int { } unsafe { - let exit_code = if exit_success.is_success() { + let exit_code = if exit_success.is_ok() { use rt::util; // If we're exiting successfully, then return the global diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 557d9c998ca..b54231421e3 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -10,7 +10,7 @@ use c_str::CString; use cast; -use comm::{SharedChan, PortOne, Port}; +use comm::{SharedChan, Port}; use libc::c_int; use libc; use ops::Drop; @@ -222,7 +222,7 @@ pub trait RtioUdpSocket : RtioSocket { pub trait RtioTimer { fn sleep(&mut self, msecs: u64); - fn oneshot(&mut self, msecs: u64) -> PortOne<()>; + fn oneshot(&mut self, msecs: u64) -> Port<()>; fn period(&mut self, msecs: u64) -> Port<()>; } diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index fa17efc833b..ac3aeb5a4bb 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -17,7 +17,6 @@ use super::stack::{StackPool}; use super::rtio::EventLoop; use super::context::Context; use super::task::{Task, AnySched, Sched}; -use super::message_queue::MessageQueue; use rt::kill::BlockedTask; use rt::deque; use rt::local_ptr; @@ -29,6 +28,7 @@ use iter::range; use unstable::mutex::Mutex; use vec::{OwnedVector}; +use mpsc = super::mpsc_queue; /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified @@ -47,7 +47,9 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - message_queue: MessageQueue<SchedMessage>, + message_queue: mpsc::Consumer<SchedMessage, ()>, + /// Producer used to clone sched handles from + message_producer: mpsc::Producer<SchedMessage, ()>, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -104,7 +106,7 @@ enum EffortLevel { GiveItYourBest } -static MAX_YIELD_CHECKS: uint = 200; +static MAX_YIELD_CHECKS: uint = 20000; fn reset_yield_check(rng: &mut XorShiftRng) -> uint { let r: uint = Rand::rand(rng); @@ -135,9 +137,11 @@ impl Scheduler { friend: Option<SchedHandle>) -> Scheduler { + let (consumer, producer) = mpsc::queue(()); let mut sched = Scheduler { sleeper_list: sleeper_list, - message_queue: MessageQueue::new(), + message_queue: consumer, + message_producer: producer, sleepy: false, no_sleep: false, event_loop: event_loop, @@ -218,7 +222,7 @@ impl Scheduler { // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(message.is_none()); + rtassert!(match message { mpsc::Empty => true, _ => false }); stask.destroyed = true; } @@ -315,10 +319,27 @@ impl Scheduler { fn interpret_message_queue(mut ~self, effort: EffortLevel) -> Option<~Scheduler> { let msg = if effort == DontTryTooHard { - // Do a cheap check that may miss messages self.message_queue.casual_pop() } else { - self.message_queue.pop() + // When popping our message queue, we could see an "inconsistent" + // state which means that we *should* be able to pop data, but we + // are unable to at this time. Our options are: + // + // 1. Spin waiting for data + // 2. Ignore this and pretend we didn't find a message + // + // If we choose route 1, then if the pusher in question is currently + // pre-empted, we're going to take up our entire time slice just + // spinning on this queue. If we choose route 2, then the pusher in + // question is still guaranteed to make a send() on its async + // handle, so we will guaranteed wake up and see its message at some + // point. + // + // I have chosen to take route #2. + match self.message_queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty | mpsc::Inconsistent => None + } }; match msg { @@ -793,7 +814,7 @@ impl Scheduler { return SchedHandle { remote: remote, - queue: self.message_queue.clone(), + queue: self.message_producer.clone(), sched_id: self.sched_id() }; } @@ -813,7 +834,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: MessageQueue<SchedMessage>, + priv queue: mpsc::Producer<SchedMessage, ()>, sched_id: uint } @@ -915,17 +936,17 @@ fn new_sched_rng() -> XorShiftRng { #[cfg(test)] mod test { use prelude::*; - use rt::test::*; - use unstable::run_in_bare_thread; + use borrow::to_uint; - use rt::sched::{Scheduler}; use rt::deque::BufferPool; - use rt::thread::Thread; - use rt::task::{Task, Sched}; use rt::basic; + use rt::sched::{Scheduler}; + use rt::task::{Task, Sched}; + use rt::test::*; + use rt::thread::Thread; use rt::util; - use option::{Some}; - use rt::task::UnwindResult; + use task::TaskResult; + use unstable::run_in_bare_thread; #[test] fn trivial_run_in_newsched_task_test() { @@ -1010,8 +1031,8 @@ mod test { assert!(Task::on_appropriate_sched()); }; - let on_exit: proc(UnwindResult) = proc(exit_status) { - rtassert!(exit_status.is_success()) + let on_exit: proc(TaskResult) = proc(exit_status) { + rtassert!(exit_status.is_ok()) }; task.death.on_exit = Some(on_exit); @@ -1027,7 +1048,6 @@ mod test { use rt::sleeper_list::SleeperList; use rt::sched::Shutdown; use borrow; - use rt::comm::*; do run_in_bare_thread { @@ -1089,7 +1109,7 @@ mod test { rtdebug!("task4 id: **{}**", borrow::to_uint(task4)); // Signal from the special task that we are done. - let (port, chan) = oneshot::<()>(); + let (port, chan) = Chan::<()>::new(); let normal_task = ~do Task::new_root(&mut normal_sched.stack_pool, None) { rtdebug!("*about to submit task2*"); @@ -1160,10 +1180,8 @@ mod test { #[test] fn handle() { - use rt::comm::*; - do run_in_bare_thread { - let (port, chan) = oneshot::<()>(); + let (port, chan) = Chan::new(); let thread_one = do Thread::start { let chan = chan; @@ -1230,7 +1248,6 @@ mod test { #[test] fn multithreading() { - use rt::comm::*; use num::Times; use vec::OwnedVector; use container::Container; @@ -1238,7 +1255,7 @@ mod test { do run_in_mt_newsched_task { let mut ports = ~[]; 10.times(|| { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); do spawntask_later { chan.send(()); } @@ -1253,21 +1270,17 @@ mod test { #[test] fn thread_ring() { - use rt::comm::*; - use comm::{GenericPort, GenericChan}; - do run_in_mt_newsched_task { - let (end_port, end_chan) = oneshot(); + let (end_port, end_chan) = Chan::new(); let n_tasks = 10; let token = 2000; - let (p, ch1) = stream(); - let mut p = p; + let (mut p, ch1) = Chan::new(); ch1.send((token, end_chan)); let mut i = 2; while i <= n_tasks { - let (next_p, ch) = stream(); + let (next_p, ch) = Chan::new(); let imm_i = i; let imm_p = p; do spawntask_random { @@ -1276,23 +1289,23 @@ mod test { p = next_p; i += 1; } - let imm_p = p; - let imm_ch = ch1; + let p = p; do spawntask_random { - roundtrip(1, n_tasks, &imm_p, &imm_ch); + roundtrip(1, n_tasks, &p, &ch1); } end_port.recv(); } fn roundtrip(id: int, n_tasks: int, - p: &Port<(int, ChanOne<()>)>, ch: &Chan<(int, ChanOne<()>)>) { + p: &Port<(int, Chan<()>)>, + ch: &Chan<(int, Chan<()>)>) { while (true) { match p.recv() { (1, end_chan) => { - debug!("{}\n", id); - end_chan.send(()); - return; + debug!("{}\n", id); + end_chan.send(()); + return; } (token, end_chan) => { debug!("thread: {} got token: {}", id, token); @@ -1331,16 +1344,14 @@ mod test { // FIXME: #9407: xfail-test fn dont_starve_1() { - use rt::comm::oneshot; - stress_factor().times(|| { do run_in_mt_newsched_task { - let (port, chan) = oneshot(); + let (port, chan) = Chan::new(); // This task should not be able to starve the sender; // The sender should get stolen to another thread. do spawntask { - while !port.peek() { } + while port.try_recv().is_none() { } } chan.send(()); @@ -1350,17 +1361,15 @@ mod test { #[test] fn dont_starve_2() { - use rt::comm::oneshot; - stress_factor().times(|| { do run_in_newsched_task { - let (port, chan) = oneshot(); - let (_port2, chan2) = stream(); + let (port, chan) = Chan::new(); + let (_port2, chan2) = Chan::new(); // This task should not be able to starve the other task. // The sends should eventually yield. do spawntask { - while !port.peek() { + while port.try_recv().is_none() { chan2.send(()); } } diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs deleted file mode 100644 index 6cde0a1f216..00000000000 --- a/src/libstd/rt/select.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! Module for private, abstraction-leaking select traits. Wrapped in std::select. - -use rt::kill::BlockedTask; -use rt::sched::Scheduler; -use option::Option; - -pub trait SelectInner { - // Returns true if data was available. - fn optimistic_check(&mut self) -> bool; - // Returns true if data was available. If so, shall also wake() the task. - fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool; - // Returns true if data was available. - fn unblock_from(&mut self) -> bool; -} - -pub trait SelectPortInner<T> { - fn recv_ready(self) -> Option<T>; -} - diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index 2adc32f33fb..62e012f9f41 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -20,20 +20,22 @@ use prelude::*; use borrow; use cast::transmute; use cleanup; +use io::Writer; use libc::{c_void, uintptr_t, c_char, size_t}; use local_data; use option::{Option, Some, None}; use rt::borrowck::BorrowRecord; use rt::borrowck; +use rt::context; use rt::context::Context; use rt::env; -use io::Writer; use rt::kill::Death; use rt::local::Local; use rt::logging::StdErrLogger; use rt::sched::{Scheduler, SchedHandle}; use rt::stack::{StackSegment, StackPool}; use send_str::SendStr; +use task::TaskResult; use unstable::finally::Finally; use unstable::mutex::Mutex; @@ -90,46 +92,17 @@ pub enum SchedHome { pub struct GarbageCollector; pub struct LocalStorage(Option<local_data::Map>); -/// Represents the reason for the current unwinding process -pub enum UnwindResult { - /// The task is ending successfully - Success, - - /// The Task is failing with reason `~Any` - Failure(~Any), -} - -impl UnwindResult { - /// Returns `true` if this `UnwindResult` is a failure - #[inline] - pub fn is_failure(&self) -> bool { - match *self { - Success => false, - Failure(_) => true - } - } - - /// Returns `true` if this `UnwindResult` is a success - #[inline] - pub fn is_success(&self) -> bool { - match *self { - Success => true, - Failure(_) => false - } - } -} - pub struct Unwinder { unwinding: bool, cause: Option<~Any> } impl Unwinder { - fn to_unwind_result(&mut self) -> UnwindResult { + fn result(&mut self) -> TaskResult { if self.unwinding { - Failure(self.cause.take().unwrap()) + Err(self.cause.take().unwrap()) } else { - Success + Ok(()) } } } @@ -326,7 +299,7 @@ impl Task { // Cleanup the dynamic borrowck debugging info borrowck::clear_task_borrow_list(); - self.death.collect_failure(self.unwinder.to_unwind_result()); + self.death.collect_failure(self.unwinder.result()); self.destroyed = true; } @@ -691,6 +664,7 @@ pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> ! mod test { use super::*; use rt::test::*; + use prelude::*; #[test] fn local_heap() { @@ -744,22 +718,9 @@ mod test { } #[test] - fn comm_oneshot() { - use comm::*; - - do run_in_newsched_task { - let (port, chan) = oneshot(); - chan.send(10); - assert!(port.recv() == 10); - } - } - - #[test] fn comm_stream() { - use comm::*; - do run_in_newsched_task() { - let (port, chan) = stream(); + let (port, chan) = Chan::new(); chan.send(10); assert!(port.recv() == 10); } @@ -767,11 +728,8 @@ mod test { #[test] fn comm_shared_chan() { - use comm::*; - do run_in_newsched_task() { - let (port, chan) = stream(); - let chan = SharedChan::new(chan); + let (port, chan) = SharedChan::new(); chan.send(10); assert!(port.recv() == 10); } diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index 96b80d11129..2b48b396c99 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -21,14 +21,14 @@ use rand::Rng; use rand; use result::{Result, Ok, Err}; use rt::basic; -use rt::comm::oneshot; use rt::deque::BufferPool; +use comm::Chan; use rt::new_event_loop; use rt::sched::Scheduler; use rt::sleeper_list::SleeperList; use rt::task::Task; -use rt::task::UnwindResult; use rt::thread::Thread; +use task::TaskResult; use unstable::{run_in_bare_thread}; use vec; use vec::{OwnedVector, MutableVector, ImmutableVector}; @@ -82,10 +82,10 @@ pub fn run_in_uv_task_core(f: proc()) { let mut sched = ~new_test_uv_sched(); let exit_handle = sched.make_handle(); - let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) { + let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { let mut exit_handle = exit_handle; exit_handle.send(Shutdown); - rtassert!(exit_status.is_success()); + rtassert!(exit_status.is_ok()); }; let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); task.death.on_exit = Some(on_exit); @@ -99,10 +99,10 @@ pub fn run_in_newsched_task_core(f: proc()) { let mut sched = ~new_test_sched(); let exit_handle = sched.make_handle(); - let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) { + let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { let mut exit_handle = exit_handle; exit_handle.send(Shutdown); - rtassert!(exit_status.is_success()); + rtassert!(exit_status.is_ok()); }; let mut task = ~Task::new_root(&mut sched.stack_pool, None, f); task.death.on_exit = Some(on_exit); @@ -240,14 +240,14 @@ pub fn run_in_mt_newsched_task(f: proc()) { } let handles = handles; // Work around not being able to capture mut - let on_exit: proc(UnwindResult) = proc(exit_status: UnwindResult) { + let on_exit: proc(TaskResult) = proc(exit_status: TaskResult) { // Tell schedulers to exit let mut handles = handles; for handle in handles.mut_iter() { handle.send(Shutdown); } - rtassert!(exit_status.is_success()); + rtassert!(exit_status.is_ok()); }; let mut main_task = ~Task::new_root(&mut scheds[0].stack_pool, None, @@ -311,8 +311,8 @@ pub fn spawntask_random(f: proc()) { pub fn spawntask_try(f: proc()) -> Result<(),()> { - let (port, chan) = oneshot(); - let on_exit: proc(UnwindResult) = proc(exit_status) { + let (port, chan) = Chan::new(); + let on_exit: proc(TaskResult) = proc(exit_status) { chan.send(exit_status) }; @@ -322,7 +322,7 @@ pub fn spawntask_try(f: proc()) -> Result<(),()> { Scheduler::run_task(new_task); let exit_status = port.recv(); - if exit_status.is_success() { Ok(()) } else { Err(()) } + if exit_status.is_ok() { Ok(()) } else { Err(()) } } diff --git a/src/libstd/rt/thread.rs b/src/libstd/rt/thread.rs index 9031147f8b1..da02988c75c 100644 --- a/src/libstd/rt/thread.rs +++ b/src/libstd/rt/thread.rs @@ -21,42 +21,32 @@ use kinds::Send; use libc; use ops::Drop; use option::{Option, Some, None}; -use ptr; use uint; -#[cfg(windows)] -use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, - LPVOID, DWORD, LPDWORD, HANDLE}; - -#[cfg(windows)] type rust_thread = HANDLE; -#[cfg(unix)] type rust_thread = libc::pthread_t; -#[cfg(windows)] type rust_thread_return = DWORD; -#[cfg(unix)] type rust_thread_return = *libc::c_void; - -type StartFn = extern "C" fn(*libc::c_void) -> rust_thread_return; +type StartFn = extern "C" fn(*libc::c_void) -> imp::rust_thread_return; /// This struct represents a native thread's state. This is used to join on an /// existing thread created in the join-able state. pub struct Thread<T> { - priv native: rust_thread, + priv native: imp::rust_thread, priv joined: bool, priv packet: ~Option<T>, } -static DEFAULT_STACK_SIZE: libc::size_t = 1024*1024; +static DEFAULT_STACK_SIZE: libc::size_t = 1024 * 1024; // This is the starting point of rust os threads. The first thing we do // is make sure that we don't trigger __morestack (also why this has a // no_split_stack annotation), and then we extract the main function // and invoke it. #[no_split_stack] -extern fn thread_start(main: *libc::c_void) -> rust_thread_return { +extern fn thread_start(main: *libc::c_void) -> imp::rust_thread_return { use rt::context; unsafe { context::record_stack_bounds(0, uint::max_value); let f: ~proc() = cast::transmute(main); (*f)(); - cast::transmute(0 as rust_thread_return) + cast::transmute(0 as imp::rust_thread_return) } } @@ -88,7 +78,7 @@ impl Thread<()> { *cast::transmute::<&~Option<T>, **mut Option<T>>(&packet) }; let main: proc() = proc() unsafe { *packet2 = Some(main()); }; - let native = unsafe { native_thread_create(~main) }; + let native = unsafe { imp::create(~main) }; Thread { native: native, @@ -105,10 +95,16 @@ impl Thread<()> { /// there are detached thread still running around. pub fn spawn(main: proc()) { unsafe { - let handle = native_thread_create(~main); - native_thread_detach(handle); + let handle = imp::create(~main); + imp::detach(handle); } } + + /// Relinquishes the CPU slot that this OS-thread is currently using, + /// allowing another thread to run for awhile. + pub fn yield_now() { + unsafe { imp::yield_now(); } + } } impl<T: Send> Thread<T> { @@ -116,7 +112,7 @@ impl<T: Send> Thread<T> { /// calculation. pub fn join(mut self) -> T { assert!(!self.joined); - unsafe { native_thread_join(self.native) }; + unsafe { imp::join(self.native) }; self.joined = true; assert!(self.packet.is_some()); self.packet.take_unwrap() @@ -129,80 +125,115 @@ impl<T: Send> Drop for Thread<T> { // This is required for correctness. If this is not done then the thread // would fill in a return box which no longer exists. if !self.joined { - unsafe { native_thread_join(self.native) }; + unsafe { imp::join(self.native) }; } } } #[cfg(windows)] -unsafe fn native_thread_create(p: ~proc()) -> rust_thread { - let arg: *mut libc::c_void = cast::transmute(p); - CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, thread_start, - arg, 0, ptr::mut_null()) -} - -#[cfg(windows)] -unsafe fn native_thread_join(native: rust_thread) { - use libc::consts::os::extra::INFINITE; - WaitForSingleObject(native, INFINITE); -} +mod imp { + use libc::types::os::arch::extra::{LPSECURITY_ATTRIBUTES, SIZE_T, BOOL, + LPVOID, DWORD, LPDWORD, HANDLE}; + use libc; + use cast; + use super::DEFAULT_STACK_SIZE; + + pub type rust_thread = HANDLE; + pub type rust_thread_return = DWORD; + + pub unsafe fn create(p: ~proc()) -> rust_thread { + let arg: *mut libc::c_void = cast::transmute(p); + CreateThread(ptr::mut_null(), DEFAULT_STACK_SIZE, super::thread_start, + arg, 0, ptr::mut_null()) + } -#[cfg(windows)] -unsafe fn native_thread_detach(native: rust_thread) { - assert!(libc::CloseHandle(native) != 0); -} + pub unsafe fn join(native: rust_thread) { + use libc::consts::os::extra::INFINITE; + WaitForSingleObject(native, INFINITE); + } -#[cfg(unix)] -unsafe fn native_thread_create(p: ~proc()) -> rust_thread { - use unstable::intrinsics; - use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; + pub unsafe fn detach(native: rust_thread) { + assert!(libc::CloseHandle(native) != 0); + } - let mut native: libc::pthread_t = intrinsics::uninit(); - let mut attr: libc::pthread_attr_t = intrinsics::uninit(); - assert_eq!(pthread_attr_init(&mut attr), 0); - assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0); - assert_eq!(pthread_attr_setdetachstate(&mut attr, PTHREAD_CREATE_JOINABLE), 0); + pub unsafe fn yield_now() { + // This function will return 0 if there are no other threads to execute, + // but this also means that the yield was useless so this isn't really a + // case that needs to be worried about. + SwitchToThread(); + } - let arg: *libc::c_void = cast::transmute(p); - assert_eq!(pthread_create(&mut native, &attr, thread_start, arg), 0); - native + extern "system" { + fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, + dwStackSize: SIZE_T, + lpStartAddress: super::StartFn, + lpParameter: LPVOID, + dwCreationFlags: DWORD, + lpThreadId: LPDWORD) -> HANDLE; + fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; + fn SwitchToThread() -> BOOL; + } } #[cfg(unix)] -unsafe fn native_thread_join(native: rust_thread) { - assert_eq!(pthread_join(native, ptr::null()), 0); -} +mod imp { + use cast; + use libc::consts::os::posix01::PTHREAD_CREATE_JOINABLE; + use libc; + use ptr; + use super::DEFAULT_STACK_SIZE; + use unstable::intrinsics; -#[cfg(unix)] -fn native_thread_detach(native: rust_thread) { - unsafe { assert_eq!(pthread_detach(native), 0) } -} + pub type rust_thread = libc::pthread_t; + pub type rust_thread_return = *libc::c_void; + + pub unsafe fn create(p: ~proc()) -> rust_thread { + let mut native: libc::pthread_t = intrinsics::uninit(); + let mut attr: libc::pthread_attr_t = intrinsics::uninit(); + assert_eq!(pthread_attr_init(&mut attr), 0); + assert_eq!(pthread_attr_setstacksize(&mut attr, DEFAULT_STACK_SIZE), 0); + assert_eq!(pthread_attr_setdetachstate(&mut attr, + PTHREAD_CREATE_JOINABLE), 0); + + let arg: *libc::c_void = cast::transmute(p); + assert_eq!(pthread_create(&mut native, &attr, + super::thread_start, arg), 0); + native + } -#[cfg(windows)] -extern "system" { - fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, - dwStackSize: SIZE_T, - lpStartAddress: StartFn, - lpParameter: LPVOID, - dwCreationFlags: DWORD, - lpThreadId: LPDWORD) -> HANDLE; - fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; -} + pub unsafe fn join(native: rust_thread) { + assert_eq!(pthread_join(native, ptr::null()), 0); + } -#[cfg(unix)] -extern { - fn pthread_create(native: *mut libc::pthread_t, - attr: *libc::pthread_attr_t, - f: StartFn, - value: *libc::c_void) -> libc::c_int; - fn pthread_join(native: libc::pthread_t, - value: **libc::c_void) -> libc::c_int; - fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int; - fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t, - stack_size: libc::size_t) -> libc::c_int; - fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t, - state: libc::c_int) -> libc::c_int; - fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; + pub unsafe fn detach(native: rust_thread) { + assert_eq!(pthread_detach(native), 0); + } + + #[cfg(target_os = "macos")] + pub unsafe fn yield_now() { assert_eq!(sched_yield(), 0); } + + #[cfg(not(target_os = "macos"))] + pub unsafe fn yield_now() { assert_eq!(pthread_yield(), 0); } + + extern { + fn pthread_create(native: *mut libc::pthread_t, + attr: *libc::pthread_attr_t, + f: super::StartFn, + value: *libc::c_void) -> libc::c_int; + fn pthread_join(native: libc::pthread_t, + value: **libc::c_void) -> libc::c_int; + fn pthread_attr_init(attr: *mut libc::pthread_attr_t) -> libc::c_int; + fn pthread_attr_setstacksize(attr: *mut libc::pthread_attr_t, + stack_size: libc::size_t) -> libc::c_int; + fn pthread_attr_setdetachstate(attr: *mut libc::pthread_attr_t, + state: libc::c_int) -> libc::c_int; + fn pthread_detach(thread: libc::pthread_t) -> libc::c_int; + + #[cfg(target_os = "macos")] + fn sched_yield() -> libc::c_int; + #[cfg(not(target_os = "macos"))] + fn pthread_yield() -> libc::c_int; + } } #[cfg(test)] diff --git a/src/libstd/run.rs b/src/libstd/run.rs index 14d49df59a4..70ad752ea93 100644 --- a/src/libstd/run.rs +++ b/src/libstd/run.rs @@ -12,7 +12,7 @@ #[allow(missing_doc)]; -use comm::{stream, SharedChan}; +use comm::SharedChan; use io::Reader; use io::process::ProcessExit; use io::process; @@ -220,8 +220,7 @@ impl Process { // in parallel so we don't deadlock while blocking on one // or the other. FIXME (#2625): Surely there's a much more // clever way to do this. - let (p, ch) = stream(); - let ch = SharedChan::new(ch); + let (p, ch) = SharedChan::new(); let ch_clone = ch.clone(); do spawn { diff --git a/src/libstd/select.rs b/src/libstd/select.rs deleted file mode 100644 index cca64244db5..00000000000 --- a/src/libstd/select.rs +++ /dev/null @@ -1,306 +0,0 @@ -// Copyright 2013 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -#[allow(missing_doc)]; - -use comm; -use container::Container; -use iter::{Iterator, DoubleEndedIterator}; -use kinds::Send; -use ops::Drop; -use option::*; -use rt::local::Local; -use rt::rtio::EventLoop; -use rt::sched::Scheduler; -use rt::shouldnt_be_public::{SelectInner, SelectPortInner}; -use vec::{OwnedVector, MutableVector}; - -/// Trait for message-passing primitives that can be select()ed on. -pub trait Select : SelectInner { } - -/// Trait for message-passing primitives that can use the select2() convenience wrapper. -// (This is separate from the above trait to enable heterogeneous lists of ports -// that implement Select on different types to use select().) -pub trait SelectPort<T> : SelectPortInner<T> { } - -/// A helper type that throws away a value on a port. -struct PortGuard<T> { - port: Option<comm::PortOne<T>>, -} - -#[unsafe_destructor] -impl<T:Send> Drop for PortGuard<T> { - fn drop(&mut self) { - let _ = self.port.take_unwrap().recv(); - } -} - -/// Receive a message from any one of many ports at once. Returns the index of the -/// port whose data is ready. (If multiple are ready, returns the lowest index.) -pub fn select<A: Select>(ports: &mut [A]) -> uint { - if ports.is_empty() { - fail!("can't select on an empty list"); - } - - for (index, port) in ports.mut_iter().enumerate() { - if port.optimistic_check() { - return index; - } - } - - // If one of the ports already contains data when we go to block on it, we - // don't bother enqueueing on the rest of them, so we shouldn't bother - // unblocking from it either. This is just for efficiency, not correctness. - // (If not, we need to unblock from all of them. Length is a placeholder.) - let mut ready_index = ports.len(); - - // XXX: We're using deschedule...and_then in an unsafe way here (see #8132), - // in that we need to continue mutating the ready_index in the environment - // after letting the task get woken up. The and_then closure needs to delay - // the task from resuming until all ports have become blocked_on. - let (p,c) = comm::oneshot(); - - { - let _guard = PortGuard { - port: Some(p), - }; - - let mut c = Some(c); - let sched: ~Scheduler = Local::take(); - sched.deschedule_running_task_and_then(|sched, task| { - let task_handles = task.make_selectable(ports.len()); - - for (index, (port, task_handle)) in - ports.mut_iter().zip(task_handles.move_iter()).enumerate() { - // If one of the ports has data by now, it will wake the handle. - if port.block_on(sched, task_handle) { - ready_index = index; - break; - } - } - - let c = c.take_unwrap(); - do sched.event_loop.callback { - c.send_deferred(()) - } - }) - } - - // Task resumes. Now unblock ourselves from all the ports we blocked on. - // If the success index wasn't reset, 'take' will just take all of them. - // Iterate in reverse so the 'earliest' index that's ready gets returned. - for (index, port) in ports.mut_slice(0, ready_index).mut_iter().enumerate().invert() { - if port.unblock_from() { - ready_index = index; - } - } - - assert!(ready_index < ports.len()); - return ready_index; -} - -/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet. - -impl <'a> Select for &'a mut Select { - fn optimistic_check(&mut self) -> bool { self.optimistic_check() } - fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { - self.block_on(sched, task) - } - fn unblock_from(&mut self) -> bool { self.unblock_from() } -} - -pub fn select2<TA, A: SelectPort<TA>, TB, B: SelectPort<TB>>(mut a: A, mut b: B) - -> Either<(Option<TA>, B), (A, Option<TB>)> { - let result = { - let mut ports = [&mut a as &mut Select, &mut b as &mut Select]; - select(ports) - }; - match result { - 0 => Left ((a.recv_ready(), b)), - 1 => Right((a, b.recv_ready())), - x => fail!("impossible case in select2: {:?}", x) - } -} - -*/ - -#[cfg(test)] -mod test { - use super::*; - use clone::Clone; - use num::Times; - use option::*; - use rt::comm::*; - use rt::test::*; - use vec::*; - use comm::GenericChan; - use task; - use iter::{Iterator, range}; - - #[test] #[should_fail] - fn select_doesnt_get_trolled() { - select::<PortOne<()>>([]); - } - - /* non-blocking select tests */ - - #[cfg(test)] - fn select_helper(num_ports: uint, send_on_chans: &[uint]) { - // Unfortunately this does not actually test the block_on early-break - // codepath in select -- racing between the sender and the receiver in - // separate tasks is necessary to get around the optimistic check. - let (ports, chans) = unzip(range(0, num_ports).map(|_| oneshot::<()>())); - let mut dead_chans = ~[]; - let mut ports = ports; - for (i, chan) in chans.move_iter().enumerate() { - if send_on_chans.contains(&i) { - chan.send(()); - } else { - dead_chans.push(chan); - } - } - let ready_index = select(ports); - assert!(send_on_chans.contains(&ready_index)); - assert!(ports.swap_remove(ready_index).recv_ready().is_some()); - let _ = dead_chans; - - // Same thing with streams instead. - // FIXME(#7971): This should be in a macro but borrowck isn't smart enough. - let (ports, chans) = unzip(range(0, num_ports).map(|_| stream::<()>())); - let mut dead_chans = ~[]; - let mut ports = ports; - for (i, chan) in chans.move_iter().enumerate() { - if send_on_chans.contains(&i) { - chan.send(()); - } else { - dead_chans.push(chan); - } - } - let ready_index = select(ports); - assert!(send_on_chans.contains(&ready_index)); - assert!(ports.swap_remove(ready_index).recv_ready().is_some()); - let _ = dead_chans; - } - - #[test] - fn select_one() { - do run_in_uv_task { select_helper(1, [0]) } - } - - #[test] - fn select_two() { - // NB. I would like to have a test that tests the first one that is - // ready is the one that's returned, but that can't be reliably tested - // with the randomized behaviour of optimistic_check. - do run_in_uv_task { select_helper(2, [1]) } - do run_in_uv_task { select_helper(2, [0]) } - do run_in_uv_task { select_helper(2, [1,0]) } - } - - #[test] - fn select_a_lot() { - do run_in_uv_task { select_helper(12, [7,8,9]) } - } - - #[test] - fn select_stream() { - use util; - use comm::GenericChan; - - // Sends 10 buffered packets, and uses select to retrieve them all. - // Puts the port in a different spot in the vector each time. - do run_in_uv_task { - let (ports, _) = unzip(range(0u, 10).map(|_| stream::<int>())); - let (port, chan) = stream(); - 10.times(|| { chan.send(31337); }); - let mut ports = ports; - let mut port = Some(port); - let order = [5u,0,4,3,2,6,9,8,7,1]; - for &index in order.iter() { - // put the port in the vector at any index - util::swap(port.get_mut_ref(), &mut ports[index]); - assert!(select(ports) == index); - // get it back out - util::swap(port.get_mut_ref(), &mut ports[index]); - // NB. Not recv(), because optimistic_check randomly fails. - assert!(port.get_ref().recv_ready().unwrap() == 31337); - } - } - } - - #[test] - fn select_simple() { - do run_in_uv_task { - select_helper(2, [1]) - } - } - - /* blocking select tests */ - - #[test] - fn select_blocking() { - do run_in_uv_task { - let (p1,_c) = oneshot(); - let (p2,c2) = oneshot(); - let mut ports = [p1,p2]; - - let (p3,c3) = oneshot(); - let (p4,c4) = oneshot(); - - do task::spawn { - p3.recv(); // handshake parent - c4.send(()); // normal receive - task::deschedule(); - c2.send(()); // select receive - } - - // Try to block before child sends on c2. - c3.send(()); - p4.recv(); - assert!(select(ports) == 1); - } - } - - #[test] - fn select_racing_senders() { - static NUM_CHANS: uint = 10; - - select_racing_senders_helper(~[0,1,2,3,4,5,6,7,8,9]); - select_racing_senders_helper(~[0,1,2]); - select_racing_senders_helper(~[3,4,5,6]); - select_racing_senders_helper(~[7,8,9]); - - fn select_racing_senders_helper(send_on_chans: ~[uint]) { - use rt::test::spawntask_random; - - do run_in_uv_task { - // A bit of stress, since ordinarily this is just smoke and mirrors. - 4.times(|| { - let send_on_chans = send_on_chans.clone(); - do task::spawn { - let mut ports = ~[]; - for i in range(0u, NUM_CHANS) { - let (p,c) = oneshot(); - ports.push(p); - if send_on_chans.contains(&i) { - do spawntask_random { - task::deschedule(); - c.send(()); - } - } - } - // nondeterministic result, but should succeed - select(ports); - } - }) - } - } - } -} diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 24a24f24818..0e56f42f5b9 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -55,11 +55,10 @@ use prelude::*; -use comm::{stream, Chan, GenericChan, GenericPort, Port, Peekable}; +use comm::{Chan, Port}; use result::{Result, Ok, Err}; use rt::in_green_task_context; use rt::local::Local; -use rt::task::{UnwindResult, Success, Failure}; use send_str::{SendStr, IntoSendStr}; use util; @@ -81,33 +80,6 @@ pub mod spawn; /// children tasks complete, recommend using a result future. pub type TaskResult = Result<(), ~Any>; -pub struct TaskResultPort { - priv port: Port<UnwindResult> -} - -fn to_task_result(res: UnwindResult) -> TaskResult { - match res { - Success => Ok(()), Failure(a) => Err(a), - } -} - -impl GenericPort<TaskResult> for TaskResultPort { - #[inline] - fn recv(&self) -> TaskResult { - to_task_result(self.port.recv()) - } - - #[inline] - fn try_recv(&self) -> Option<TaskResult> { - self.port.try_recv().map(to_task_result) - } -} - -impl Peekable<TaskResult> for TaskResultPort { - #[inline] - fn peek(&self) -> bool { self.port.peek() } -} - /// Scheduler modes #[deriving(Eq)] pub enum SchedMode { @@ -150,7 +122,7 @@ pub struct SchedOpts { */ pub struct TaskOpts { priv watched: bool, - priv notify_chan: Option<Chan<UnwindResult>>, + priv notify_chan: Option<Chan<TaskResult>>, name: Option<SendStr>, sched: SchedOpts, stack_size: Option<uint> @@ -232,7 +204,7 @@ impl TaskBuilder { /// /// # Failure /// Fails if a future_result was already set for this task. - pub fn future_result(&mut self) -> TaskResultPort { + pub fn future_result(&mut self) -> Port<TaskResult> { // FIXME (#3725): Once linked failure and notification are // handled in the library, I can imagine implementing this by just // registering an arbitrary number of task::on_exit handlers and @@ -243,12 +215,12 @@ impl TaskBuilder { } // Construct the future and give it to the caller. - let (notify_pipe_po, notify_pipe_ch) = stream::<UnwindResult>(); + let (notify_pipe_po, notify_pipe_ch) = Chan::new(); // Reconfigure self to use a notify channel. self.opts.notify_chan = Some(notify_pipe_ch); - TaskResultPort { port: notify_pipe_po } + notify_pipe_po } /// Name the task-to-be. Currently the name is used for identification @@ -341,7 +313,7 @@ impl TaskBuilder { * Fails if a future_result was already set for this task. */ pub fn try<T:Send>(mut self, f: proc() -> T) -> Result<T, ~Any> { - let (po, ch) = stream::<T>(); + let (po, ch) = Chan::new(); let result = self.future_result(); @@ -466,7 +438,7 @@ pub fn failing() -> bool { // !!! instead of exiting cleanly. This might wedge the buildbots. !!! #[cfg(test)] -fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); } +fn block_forever() { let (po, _ch) = Chan::<()>::new(); po.recv(); } #[test] fn test_unnamed_task() { @@ -528,9 +500,8 @@ fn test_send_named_task() { #[test] fn test_run_basic() { - let (po, ch) = stream::<()>(); - let builder = task(); - do builder.spawn { + let (po, ch) = Chan::new(); + do task().spawn { ch.send(()); } po.recv(); @@ -543,7 +514,7 @@ struct Wrapper { #[test] fn test_add_wrapper() { - let (po, ch) = stream::<()>(); + let (po, ch) = Chan::new(); let mut b0 = task(); do b0.add_wrapper |body| { let ch = ch; @@ -608,8 +579,7 @@ fn get_sched_id() -> int { #[test] fn test_spawn_sched() { - let (po, ch) = stream::<()>(); - let ch = SharedChan::new(ch); + let (po, ch) = SharedChan::new(); fn f(i: int, ch: SharedChan<()>) { let parent_sched_id = get_sched_id(); @@ -632,14 +602,14 @@ fn test_spawn_sched() { #[test] fn test_spawn_sched_childs_on_default_sched() { - let (po, ch) = stream(); + let (po, ch) = Chan::new(); // Assuming tests run on the default scheduler let default_id = get_sched_id(); do spawn_sched(SingleThreaded) { - let parent_sched_id = get_sched_id(); let ch = ch; + let parent_sched_id = get_sched_id(); do spawn { let child_sched_id = get_sched_id(); assert!(parent_sched_id != child_sched_id); @@ -660,8 +630,8 @@ fn test_spawn_sched_blocking() { // Testing that a task in one scheduler can block in foreign code // without affecting other schedulers 20u.times(|| { - let (start_po, start_ch) = stream(); - let (fin_po, fin_ch) = stream(); + let (start_po, start_ch) = Chan::new(); + let (fin_po, fin_ch) = Chan::new(); let mut lock = Mutex::new(); let lock2 = lock.clone(); @@ -686,14 +656,14 @@ fn test_spawn_sched_blocking() { let mut val = 20; while val > 0 { val = po.recv(); - ch.send(val - 1); + ch.try_send(val - 1); } } - let (setup_po, setup_ch) = stream(); - let (parent_po, parent_ch) = stream(); + let (setup_po, setup_ch) = Chan::new(); + let (parent_po, parent_ch) = Chan::new(); do spawn { - let (child_po, child_ch) = stream(); + let (child_po, child_ch) = Chan::new(); setup_ch.send(child_ch); pingpong(&child_po, &parent_ch); }; @@ -712,12 +682,12 @@ fn test_spawn_sched_blocking() { #[cfg(test)] fn avoid_copying_the_body(spawnfn: |v: proc()|) { - let (p, ch) = stream::<uint>(); + let (p, ch) = Chan::<uint>::new(); let x = ~1; let x_in_parent = ptr::to_unsafe_ptr(&*x) as uint; - do spawnfn || { + do spawnfn { let x_in_child = ptr::to_unsafe_ptr(&*x) as uint; ch.send(x_in_child); } diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 4ab7b74d300..eb3e19f4a5a 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -77,18 +77,15 @@ use prelude::*; -use comm::{GenericChan, oneshot}; +use comm::Chan; use rt::local::Local; use rt::sched::{Scheduler, Shutdown, TaskFromFriend}; use rt::task::{Task, Sched}; -use rt::task::UnwindResult; use rt::thread::Thread; use rt::{in_green_task_context, new_event_loop}; -use task::SingleThreaded; -use task::TaskOpts; +use task::{SingleThreaded, TaskOpts, TaskResult}; #[cfg(test)] use task::default_task_opts; -#[cfg(test)] use comm; #[cfg(test)] use task; pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { @@ -132,7 +129,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { // Create a task that will later be used to join with the new scheduler // thread when it is ready to terminate - let (thread_port, thread_chan) = oneshot(); + let (thread_port, thread_chan) = Chan::new(); let join_task = do Task::build_child(None) { debug!("running join task"); let thread: Thread<()> = thread_port.recv(); @@ -173,7 +170,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { if opts.notify_chan.is_some() { let notify_chan = opts.notify_chan.take_unwrap(); - let on_exit: proc(UnwindResult) = proc(task_result) { + let on_exit: proc(TaskResult) = proc(task_result) { notify_chan.send(task_result) }; task.death.on_exit = Some(on_exit); @@ -187,7 +184,7 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) { #[test] fn test_spawn_raw_simple() { - let (po, ch) = stream(); + let (po, ch) = Chan::new(); do spawn_raw(default_task_opts()) { ch.send(()); } @@ -208,7 +205,7 @@ fn test_spawn_raw_unsupervise() { #[test] fn test_spawn_raw_notify_success() { - let (notify_po, notify_ch) = comm::stream(); + let (notify_po, notify_ch) = Chan::new(); let opts = task::TaskOpts { notify_chan: Some(notify_ch), @@ -216,13 +213,13 @@ fn test_spawn_raw_notify_success() { }; do spawn_raw(opts) { } - assert!(notify_po.recv().is_success()); + assert!(notify_po.recv().is_ok()); } #[test] fn test_spawn_raw_notify_failure() { // New bindings for these - let (notify_po, notify_ch) = comm::stream(); + let (notify_po, notify_ch) = Chan::new(); let opts = task::TaskOpts { watched: false, @@ -232,5 +229,5 @@ fn test_spawn_raw_notify_failure() { do spawn_raw(opts) { fail!(); } - assert!(notify_po.recv().is_failure()); + assert!(notify_po.recv().is_err()); } diff --git a/src/libstd/unstable/mod.rs b/src/libstd/unstable/mod.rs index f8e2ea54f44..043d99eb1b8 100644 --- a/src/libstd/unstable/mod.rs +++ b/src/libstd/unstable/mod.rs @@ -10,10 +10,7 @@ #[doc(hidden)]; -use comm::{GenericChan, GenericPort}; -use comm; use prelude::*; -use task; use libc::uintptr_t; pub mod dynamic_lib; @@ -38,15 +35,7 @@ a normal large stack. */ pub fn run_in_bare_thread(f: proc()) { use rt::thread::Thread; - - let (port, chan) = comm::stream(); - // FIXME #4525: Unfortunate that this creates an extra scheduler but it's - // necessary since rust_raw_thread_join is blocking - do task::spawn_sched(task::SingleThreaded) { - Thread::start(f).join(); - chan.send(()); - } - port.recv(); + Thread::start(f).join() } #[test] diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index 2dd5515bdbc..50fae1e0239 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -9,7 +9,7 @@ // except according to those terms. use cast; -use comm; +use comm::{Chan, Port}; use ptr; use option::{Option,Some,None}; use task; @@ -56,7 +56,7 @@ struct ArcData<T> { // drops the last refcount on an arc. Unfortunately this can't be a proper // pipe protocol because the unwrapper has to access both stages at once. // FIXME(#7544): Maybe use AtomicPtr instead (to avoid xchg in take() later)? - unwrapper: AtomicOption<(comm::ChanOne<()>, comm::PortOne<bool>)>, + unwrapper: AtomicOption<(Chan<()>, Port<bool>)>, // FIXME(#3224) should be able to make this non-option to save memory data: Option<T>, } @@ -70,7 +70,7 @@ unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> { /// A helper object used by `UnsafeArc::unwrap`. struct ChannelAndDataGuard<T> { - channel: Option<comm::ChanOne<bool>>, + channel: Option<Chan<bool>>, data: Option<~ArcData<T>>, } @@ -92,7 +92,7 @@ impl<T> Drop for ChannelAndDataGuard<T> { } impl<T> ChannelAndDataGuard<T> { - fn unwrap(mut self) -> (comm::ChanOne<bool>, ~ArcData<T>) { + fn unwrap(mut self) -> (Chan<bool>, ~ArcData<T>) { (self.channel.take_unwrap(), self.data.take_unwrap()) } } @@ -167,8 +167,8 @@ impl<T: Send> UnsafeArc<T> { // The ~ dtor needs to run if this code succeeds. let mut data: ~ArcData<T> = cast::transmute(this.data); // Set up the unwrap protocol. - let (p1,c1) = comm::oneshot(); // () - let (p2,c2) = comm::oneshot(); // bool + let (p1,c1) = Chan::new(); // () + let (p2,c2) = Chan::new(); // bool // Try to put our server end in the unwrapper slot. // This needs no barrier -- it's protected by the release barrier on // the xadd, and the acquire+release barrier in the destructor's xadd. @@ -269,7 +269,7 @@ impl<T> Drop for UnsafeArc<T>{ // reference. In effect, being here means we're the only // *awake* task with the data. match data.unwrapper.take(Acquire) { - Some(~(message,response)) => { + Some(~(message, response)) => { // Send 'ready' and wait for a response. message.send(()); // Unkillable wait. Message guaranteed to come. @@ -508,7 +508,6 @@ impl<T:Send> Exclusive<T> { #[cfg(test)] mod tests { - use comm; use option::*; use prelude::*; use super::{Exclusive, UnsafeArc, atomic}; @@ -541,10 +540,10 @@ mod tests { for _ in range(0u, num_tasks) { let total = total.clone(); - let (port, chan) = comm::stream(); + let (port, chan) = Chan::new(); futures.push(port); - do task::spawn || { + do task::spawn { for _ in range(0u, count) { total.with(|count| **count += 1); } @@ -552,7 +551,7 @@ mod tests { } }; - for f in futures.iter() { f.recv() } + for f in futures.mut_iter() { f.recv() } total.with(|total| assert!(**total == num_tasks * count)); } @@ -625,7 +624,7 @@ mod tests { // When an unwrap and a try_unwrap race, the unwrapper should always win. let x = UnsafeArc::new(~~"hello"); let x2 = x.clone(); - let (p,c) = comm::stream(); + let (p,c) = Chan::new(); do task::spawn { c.send(()); assert!(x2.unwrap() == ~~"hello"); |
