diff options
| author | Steve Klabnik <steve@steveklabnik.com> | 2015-01-02 12:21:00 -0500 |
|---|---|---|
| committer | Steve Klabnik <steve@steveklabnik.com> | 2015-01-05 17:35:16 -0500 |
| commit | f031671c6ea79391eeb3e1ad8f06fe0e436103fb (patch) | |
| tree | da7e40b4a1864bfe9949197e12f1e7dbde674cc6 /src/libstd/sync | |
| parent | 03268bbf35d3ff2350d987fe7b60375839abdf2e (diff) | |
| download | rust-f031671c6ea79391eeb3e1ad8f06fe0e436103fb.tar.gz rust-f031671c6ea79391eeb3e1ad8f06fe0e436103fb.zip | |
Remove i suffix in docs
Diffstat (limited to 'src/libstd/sync')
| -rw-r--r-- | src/libstd/sync/mpsc/mod.rs | 1420 | ||||
| -rw-r--r-- | src/libstd/sync/rwlock.rs | 2 |
2 files changed, 696 insertions, 726 deletions
diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 6bc3f561bb3..338cadafff7 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -8,7 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -//! Multi-producer, single-consumer communication primitives threads +//! Communication primitives for concurrent tasks +//! +//! Rust makes it very difficult to share data among tasks to prevent race +//! conditions and to improve parallelism, but there is often a need for +//! communication between concurrent tasks. The primitives defined in this +//! module are the building blocks for synchronization in rust. //! //! This module provides message-based communication over channels, concretely //! defined among three types: @@ -18,10 +23,12 @@ //! * `Receiver` //! //! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both -//! senders are clone-able (multi-producer) such that many threads can send -//! simultaneously to one receiver (single-consumer). +//! senders are clone-able such that many tasks can send simultaneously to one +//! receiver. These channels are *task blocking*, not *thread blocking*. This +//! means that if one task is blocked on a channel, other tasks can continue to +//! make progress. //! -//! These channels come in two flavors: +//! Rust channels come in one of two flavors: //! //! 1. An asynchronous, infinitely buffered channel. The `channel()` function //! will return a `(Sender, Receiver)` tuple where all sends will be @@ -36,39 +43,36 @@ //! "rendezvous" channel where each sender atomically hands off a message to //! a receiver. //! -//! ## Disconnection +//! ## Panic Propagation //! -//! The send and receive operations on channels will all return a `Result` -//! indicating whether the operation succeeded or not. An unsuccessful operation -//! is normally indicative of the other half of a channel having "hung up" by -//! being dropped in its corresponding thread. +//! In addition to being a core primitive for communicating in rust, channels +//! are the points at which panics are propagated among tasks. Whenever the one +//! half of channel is closed, the other half will have its next operation +//! `panic!`. The purpose of this is to allow propagation of panics among tasks +//! that are linked to one another via channels. //! -//! Once half of a channel has been deallocated, most operations can no longer -//! continue to make progress, so `Err` will be returned. Many applications will -//! continue to `unwrap()` the results returned from this module, instigating a -//! propagation of failure among threads if one unexpectedly dies. +//! There are methods on both of senders and receivers to perform their +//! respective operations without panicking, however. //! -//! # Examples +//! # Example //! //! Simple usage: //! //! ``` //! use std::thread::Thread; -//! use std::sync::mpsc::channel; //! //! // Create a simple streaming channel //! let (tx, rx) = channel(); //! Thread::spawn(move|| { -//! tx.send(10i).unwrap(); +//! tx.send(10); //! }).detach(); -//! assert_eq!(rx.recv().unwrap(), 10i); +//! assert_eq!(rx.recv(), 10); //! ``` //! //! Shared usage: //! //! ``` //! use std::thread::Thread; -//! use std::sync::mpsc::channel; //! //! // Create a shared channel that can be sent along from many threads //! // where tx is the sending half (tx for transmission), and rx is the receiving @@ -77,40 +81,37 @@ //! for i in range(0i, 10i) { //! let tx = tx.clone(); //! Thread::spawn(move|| { -//! tx.send(i).unwrap(); +//! tx.send(i); //! }).detach() //! } //! //! for _ in range(0i, 10i) { -//! let j = rx.recv().unwrap(); +//! let j = rx.recv(); //! assert!(0 <= j && j < 10); //! } //! ``` //! //! Propagating panics: //! -//! ``` -//! use std::sync::mpsc::channel; -//! -//! // The call to recv() will return an error because the channel has already -//! // hung up (or been deallocated) +//! ```should_fail +//! // The call to recv() will panic!() because the channel has already hung +//! // up (or been deallocated) //! let (tx, rx) = channel::<int>(); //! drop(tx); -//! assert!(rx.recv().is_err()); +//! rx.recv(); //! ``` //! //! Synchronous channels: //! //! ``` //! use std::thread::Thread; -//! use std::sync::mpsc::sync_channel; //! //! let (tx, rx) = sync_channel::<int>(0); //! Thread::spawn(move|| { //! // This will wait for the parent task to start receiving -//! tx.send(53).unwrap(); +//! tx.send(53); //! }).detach(); -//! rx.recv().unwrap(); +//! rx.recv(); //! ``` //! //! Reading from a channel with a timeout requires to use a Timer together @@ -119,7 +120,6 @@ //! after 10 seconds no matter what: //! //! ```no_run -//! use std::sync::mpsc::channel; //! use std::io::timer::Timer; //! use std::time::Duration; //! @@ -129,8 +129,8 @@ //! //! loop { //! select! { -//! val = rx.recv() => println!("Received {}", val.unwrap()), -//! _ = timeout.recv() => { +//! val = rx.recv() => println!("Received {}", val), +//! () = timeout.recv() => { //! println!("timed out, total time was more than 10 seconds"); //! break; //! } @@ -143,7 +143,6 @@ //! has been inactive for 5 seconds: //! //! ```no_run -//! use std::sync::mpsc::channel; //! use std::io::timer::Timer; //! use std::time::Duration; //! @@ -154,8 +153,8 @@ //! let timeout = timer.oneshot(Duration::seconds(5)); //! //! select! { -//! val = rx.recv() => println!("Received {}", val.unwrap()), -//! _ = timeout.recv() => { +//! val = rx.recv() => println!("Received {}", val), +//! () = timeout.recv() => { //! println!("timed out, no message received in 5 seconds"); //! break; //! } @@ -313,19 +312,38 @@ // And now that you've seen all the races that I found and attempted to fix, // here's the code for you to find some more! -use prelude::v1::*; +use core::prelude::*; -use sync::Arc; -use fmt; -use kinds::marker; -use mem; -use cell::UnsafeCell; +pub use self::TryRecvError::*; +pub use self::TrySendError::*; + +use alloc::arc::Arc; +use core::kinds; +use core::kinds::marker; +use core::mem; +use core::cell::UnsafeCell; pub use self::select::{Select, Handle}; use self::select::StartResult; use self::select::StartResult::*; use self::blocking::SignalToken; +macro_rules! test { + { fn $name:ident() $b:block $(#[$a:meta])*} => ( + mod $name { + #![allow(unused_imports)] + + use super::*; + use comm::*; + use thread::Thread; + use prelude::{Ok, Err, spawn, range, drop, Box, Some, None, Option}; + use prelude::{Vec, Buffer, from_str, Clone}; + + $(#[$a])* #[test] fn f() { $b } + } + ) +} + mod blocking; mod oneshot; mod select; @@ -337,7 +355,7 @@ mod spsc_queue; /// The receiving-half of Rust's channel type. This half can only be owned by /// one task -#[stable] +#[unstable] pub struct Receiver<T> { inner: UnsafeCell<Flavor<T>>, } @@ -349,14 +367,14 @@ unsafe impl<T:Send> Send for Receiver<T> { } /// An iterator over messages on a receiver, this iterator will block /// whenever `next` is called, waiting for a new message, and `None` will be /// returned when the corresponding channel has hung up. -#[stable] -pub struct Iter<'a, T:'a> { +#[unstable] +pub struct Messages<'a, T:'a> { rx: &'a Receiver<T> } /// The sending-half of Rust's asynchronous channel type. This half can only be /// owned by one task, but it can be cloned to send to other tasks. -#[stable] +#[unstable] pub struct Sender<T> { inner: UnsafeCell<Flavor<T>>, } @@ -367,50 +385,30 @@ unsafe impl<T:Send> Send for Sender<T> { } /// The sending-half of Rust's synchronous channel type. This half can only be /// owned by one task, but it can be cloned to send to other tasks. -#[stable] +#[unstable = "this type may be renamed, but it will always exist"] pub struct SyncSender<T> { inner: Arc<RacyCell<sync::Packet<T>>>, // can't share in an arc _marker: marker::NoSync, } -/// An error returned from the `send` function on channels. -/// -/// A `send` operation can only fail if the receiving end of a channel is -/// disconnected, implying that the data could never be received. The error -/// contains the data being sent as a payload so it can be recovered. -#[derive(PartialEq, Eq)] -#[stable] -pub struct SendError<T>(pub T); - -/// An error returned from the `recv` function on a `Receiver`. -/// -/// The `recv` operation can only fail if the sending half of a channel is -/// disconnected, implying that no further messages will ever be received. -#[derive(PartialEq, Eq, Clone, Copy)] -#[stable] -pub struct RecvError; - /// This enumeration is the list of the possible reasons that try_recv could not /// return data when called. -#[derive(PartialEq, Clone, Copy)] -#[stable] +#[deriving(PartialEq, Clone, Copy, Show)] +#[experimental = "this is likely to be removed in changing try_recv()"] pub enum TryRecvError { /// This channel is currently empty, but the sender(s) have not yet /// disconnected, so data may yet become available. - #[stable] Empty, - /// This channel's sending half has become disconnected, and there will /// never be any more data received on this channel - #[stable] Disconnected, } /// This enumeration is the list of the possible error outcomes for the /// `SyncSender::try_send` method. -#[derive(PartialEq, Clone)] -#[stable] +#[deriving(PartialEq, Clone, Show)] +#[experimental = "this is likely to be removed in changing try_send()"] pub enum TrySendError<T> { /// The data could not be sent on the channel because it would require that /// the callee block to send the data. @@ -418,13 +416,10 @@ pub enum TrySendError<T> { /// If this is a buffered channel, then the buffer is full at this time. If /// this is not a buffered channel, then there is no receiver available to /// acquire the data. - #[stable] Full(T), - /// This channel's receiving half has disconnected, so the data could not be /// sent. The data is returned back to the callee in this case. - #[stable] - Disconnected(T), + RecvDisconnected(T), } enum Flavor<T> { @@ -463,7 +458,6 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// # Example /// /// ``` -/// use std::sync::mpsc::channel; /// use std::thread::Thread; /// /// // tx is is the sending half (tx for transmission), and rx is the receiving @@ -473,15 +467,15 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// // Spawn off an expensive computation /// Thread::spawn(move|| { /// # fn expensive_computation() {} -/// tx.send(expensive_computation()).unwrap(); +/// tx.send(expensive_computation()); /// }).detach(); /// /// // Do some useful work for awhile /// /// // Let's see what that answer was -/// println!("{}", rx.recv().unwrap()); +/// println!("{}", rx.recv()); /// ``` -#[stable] +#[unstable] pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { let a = Arc::new(RacyCell::new(oneshot::Packet::new())); (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a))) @@ -505,23 +499,23 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { /// # Example /// /// ``` -/// use std::sync::mpsc::sync_channel; /// use std::thread::Thread; /// /// let (tx, rx) = sync_channel(1); /// /// // this returns immediately -/// tx.send(1i).unwrap(); +/// tx.send(1); /// /// Thread::spawn(move|| { /// // this will block until the previous message has been received -/// tx.send(2i).unwrap(); +/// tx.send(2); /// }).detach(); /// -/// assert_eq!(rx.recv().unwrap(), 1i); -/// assert_eq!(rx.recv().unwrap(), 2i); +/// assert_eq!(rx.recv(), 1); +/// assert_eq!(rx.recv(), 2); /// ``` -#[stable] +#[unstable = "this function may be renamed to more accurately reflect the type \ + of channel that is is creating"] pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { let a = Arc::new(RacyCell::new(sync::Packet::new(bound))); (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a))) @@ -538,6 +532,33 @@ impl<T: Send> Sender<T> { } } + /// Sends a value along this channel to be received by the corresponding + /// receiver. + /// + /// Rust channels are infinitely buffered so this method will never block. + /// + /// # Panics + /// + /// This function will panic if the other end of the channel has hung up. + /// This means that if the corresponding receiver has fallen out of scope, + /// this function will trigger a panic message saying that a message is + /// being sent on a closed channel. + /// + /// Note that if this function does *not* panic, it does not mean that the + /// data will be successfully received. All sends are placed into a queue, + /// so it is possible for a send to succeed (the other end is alive), but + /// then the other end could immediately disconnect. + /// + /// The purpose of this functionality is to propagate panics among tasks. + /// If a panic is not desired, then consider using the `send_opt` method + #[experimental = "this function is being considered candidate for removal \ + to adhere to the general guidelines of rust"] + pub fn send(&self, t: T) { + if self.send_opt(t).is_err() { + panic!("sending on a closed channel"); + } + } + /// Attempts to send a value on this channel, returning it back if it could /// not be sent. /// @@ -549,34 +570,37 @@ impl<T: Send> Sender<T> { /// will be received. It is possible for the corresponding receiver to /// hang up immediately after this function returns `Ok`. /// - /// This method will never block the current thread. + /// Like `send`, this method will never block. + /// + /// # Panics + /// + /// This method will never panic, it will return the message back to the + /// caller if the other end is disconnected /// /// # Example /// /// ``` - /// use std::sync::mpsc::channel; - /// /// let (tx, rx) = channel(); /// /// // This send is always successful - /// tx.send(1i).unwrap(); + /// assert_eq!(tx.send_opt(1), Ok(())); /// /// // This send will fail because the receiver is gone /// drop(rx); - /// assert_eq!(tx.send(1i).err().unwrap().0, 1); + /// assert_eq!(tx.send_opt(1), Err(1)); /// ``` - pub fn send(&self, t: T) -> Result<(), SendError<T>> { + #[unstable = "this function may be renamed to send() in the future"] + pub fn send_opt(&self, t: T) -> Result<(), T> { let (new_inner, ret) = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { unsafe { let p = p.get(); if !(*p).sent() { - return (*p).send(t).map_err(SendError); + return (*p).send(t); } else { let a = Arc::new(RacyCell::new(stream::Packet::new())); - let rx = Receiver::new(Flavor::Stream(a.clone())); - match (*p).upgrade(rx) { + match (*p).upgrade(Receiver::new(Flavor::Stream(a.clone()))) { oneshot::UpSuccess => { let ret = (*a.get()).send(t); (a, ret) @@ -594,12 +618,8 @@ impl<T: Send> Sender<T> { } } } - Flavor::Stream(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, - Flavor::Shared(ref p) => return unsafe { - (*p.get()).send(t).map_err(SendError) - }, + Flavor::Stream(ref p) => return unsafe { (*p.get()).send(t) }, + Flavor::Shared(ref p) => return unsafe { (*p.get()).send(t) }, Flavor::Sync(..) => unreachable!(), }; @@ -607,7 +627,7 @@ impl<T: Send> Sender<T> { let tmp = Sender::new(Flavor::Stream(new_inner)); mem::swap(self.inner_mut(), tmp.inner_mut()); } - ret.map_err(SendError) + return ret; } } @@ -619,8 +639,7 @@ impl<T: Send> Clone for Sender<T> { let a = Arc::new(RacyCell::new(shared::Packet::new())); unsafe { let guard = (*a.get()).postinit_lock(); - let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + match (*p.get()).upgrade(Receiver::new(Flavor::Shared(a.clone()))) { oneshot::UpSuccess | oneshot::UpDisconnected => (a, None, guard), oneshot::UpWoke(task) => (a, Some(task), guard) @@ -631,8 +650,7 @@ impl<T: Send> Clone for Sender<T> { let a = Arc::new(RacyCell::new(shared::Packet::new())); unsafe { let guard = (*a.get()).postinit_lock(); - let rx = Receiver::new(Flavor::Shared(a.clone())); - match (*p.get()).upgrade(rx) { + match (*p.get()).upgrade(Receiver::new(Flavor::Shared(a.clone()))) { stream::UpSuccess | stream::UpDisconnected => (a, None, guard), stream::UpWoke(task) => (a, Some(task), guard), @@ -683,29 +701,59 @@ impl<T: Send> SyncSender<T> { /// available or a receiver is available to hand off the message to. /// /// Note that a successful send does *not* guarantee that the receiver will - /// ever see the data if there is a buffer on this channel. Items may be + /// ever see the data if there is a buffer on this channel. Messages may be /// enqueued in the internal buffer for the receiver to receive at a later /// time. If the buffer size is 0, however, it can be guaranteed that the /// receiver has indeed received the data if this function returns success. /// - /// This function will never panic, but it may return `Err` if the - /// `Receiver` has disconnected and is no longer able to receive - /// information. - #[stable] - pub fn send(&self, t: T) -> Result<(), SendError<T>> { - unsafe { (*self.inner.get()).send(t).map_err(SendError) } + /// # Panics + /// + /// Similarly to `Sender::send`, this function will panic if the + /// corresponding `Receiver` for this channel has disconnected. This + /// behavior is used to propagate panics among tasks. + /// + /// If a panic is not desired, you can achieve the same semantics with the + /// `SyncSender::send_opt` method which will not panic if the receiver + /// disconnects. + #[experimental = "this function is being considered candidate for removal \ + to adhere to the general guidelines of rust"] + pub fn send(&self, t: T) { + if self.send_opt(t).is_err() { + panic!("sending on a closed channel"); + } + } + + /// Send a value on a channel, returning it back if the receiver + /// disconnected + /// + /// This method will *block* to send the value `t` on the channel, but if + /// the value could not be sent due to the receiver disconnecting, the value + /// is returned back to the callee. This function is similar to `try_send`, + /// except that it will block if the channel is currently full. + /// + /// # Panics + /// + /// This function cannot panic. + #[unstable = "this function may be renamed to send() in the future"] + pub fn send_opt(&self, t: T) -> Result<(), T> { + unsafe { (*self.inner.get()).send(t) } } /// Attempts to send a value on this channel without blocking. /// - /// This method differs from `send` by returning immediately if the + /// This method differs from `send_opt` by returning immediately if the /// channel's buffer is full or no receiver is waiting to acquire some - /// data. Compared with `send`, this function has two failure cases + /// data. Compared with `send_opt`, this function has two failure cases /// instead of one (one for disconnection, one for a full buffer). /// /// See `SyncSender::send` for notes about guarantees of whether the /// receiver has received the data or not if this function is successful. - #[stable] + /// + /// # Panics + /// + /// This function cannot panic + #[unstable = "the return type of this function is candidate for \ + modification"] pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { unsafe { (*self.inner.get()).try_send(t) } } @@ -735,6 +783,34 @@ impl<T: Send> Receiver<T> { Receiver { inner: UnsafeCell::new(inner) } } + /// Blocks waiting for a value on this receiver + /// + /// This function will block if necessary to wait for a corresponding send + /// on the channel from its paired `Sender` structure. This receiver will + /// be woken up when data is ready, and the data will be returned. + /// + /// # Panics + /// + /// Similar to channels, this method will trigger a task panic if the + /// other end of the channel has hung up (been deallocated). The purpose of + /// this is to propagate panics among tasks. + /// + /// If a panic is not desired, then there are two options: + /// + /// * If blocking is still desired, the `recv_opt` method will return `None` + /// when the other end hangs up + /// + /// * If blocking is not desired, then the `try_recv` method will attempt to + /// peek at a value on this receiver. + #[experimental = "this function is being considered candidate for removal \ + to adhere to the general guidelines of rust"] + pub fn recv(&self) -> T { + match self.recv_opt() { + Ok(t) => t, + Err(()) => panic!("receiving on a closed channel"), + } + } + /// Attempts to return a pending value on this receiver without blocking /// /// This method will never block the caller in order to wait for data to @@ -743,46 +819,42 @@ impl<T: Send> Receiver<T> { /// /// This is useful for a flavor of "optimistic check" before deciding to /// block on a receiver. - #[stable] + /// + /// # Panics + /// + /// This function cannot panic. + #[unstable = "the return type of this function may be altered"] pub fn try_recv(&self) -> Result<T, TryRecvError> { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { match unsafe { (*p.get()).try_recv() } { Ok(t) => return Ok(t), - Err(oneshot::Empty) => return Err(TryRecvError::Empty), - Err(oneshot::Disconnected) => { - return Err(TryRecvError::Disconnected) - } + Err(oneshot::Empty) => return Err(Empty), + Err(oneshot::Disconnected) => return Err(Disconnected), Err(oneshot::Upgraded(rx)) => rx, } } Flavor::Stream(ref p) => { match unsafe { (*p.get()).try_recv() } { Ok(t) => return Ok(t), - Err(stream::Empty) => return Err(TryRecvError::Empty), - Err(stream::Disconnected) => { - return Err(TryRecvError::Disconnected) - } + Err(stream::Empty) => return Err(Empty), + Err(stream::Disconnected) => return Err(Disconnected), Err(stream::Upgraded(rx)) => rx, } } Flavor::Shared(ref p) => { match unsafe { (*p.get()).try_recv() } { Ok(t) => return Ok(t), - Err(shared::Empty) => return Err(TryRecvError::Empty), - Err(shared::Disconnected) => { - return Err(TryRecvError::Disconnected) - } + Err(shared::Empty) => return Err(Empty), + Err(shared::Disconnected) => return Err(Disconnected), } } Flavor::Sync(ref p) => { match unsafe { (*p.get()).try_recv() } { Ok(t) => return Ok(t), - Err(sync::Empty) => return Err(TryRecvError::Empty), - Err(sync::Disconnected) => { - return Err(TryRecvError::Disconnected) - } + Err(sync::Empty) => return Err(Empty), + Err(sync::Disconnected) => return Err(Disconnected), } } }; @@ -793,26 +865,27 @@ impl<T: Send> Receiver<T> { } } - /// Attempt to wait for a value on this receiver, returning an error if the + /// Attempt to wait for a value on this receiver, but does not panic if the /// corresponding channel has hung up. /// - /// This function will always block the current thread if there is no data - /// available and it's possible for more data to be sent. Once a message is - /// sent to the corresponding `Sender`, then this receiver will wake up and - /// return that message. + /// This implementation of iterators for ports will always block if there is + /// not data available on the receiver, but it will not panic in the case + /// that the channel has been deallocated. /// - /// If the corresponding `Sender` has disconnected, or it disconnects while - /// this call is blocking, this call will wake up and return `Err` to - /// indicate that no more messages can ever be received on this channel. - #[stable] - pub fn recv(&self) -> Result<T, RecvError> { + /// In other words, this function has the same semantics as the `recv` + /// method except for the panic aspect. + /// + /// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of + /// the value found on the receiver is returned. + #[unstable = "this function may be renamed to recv()"] + pub fn recv_opt(&self) -> Result<T, ()> { loop { let new_port = match *unsafe { self.inner() } { Flavor::Oneshot(ref p) => { match unsafe { (*p.get()).recv() } { Ok(t) => return Ok(t), Err(oneshot::Empty) => return unreachable!(), - Err(oneshot::Disconnected) => return Err(RecvError), + Err(oneshot::Disconnected) => return Err(()), Err(oneshot::Upgraded(rx)) => rx, } } @@ -820,7 +893,7 @@ impl<T: Send> Receiver<T> { match unsafe { (*p.get()).recv() } { Ok(t) => return Ok(t), Err(stream::Empty) => return unreachable!(), - Err(stream::Disconnected) => return Err(RecvError), + Err(stream::Disconnected) => return Err(()), Err(stream::Upgraded(rx)) => rx, } } @@ -828,12 +901,10 @@ impl<T: Send> Receiver<T> { match unsafe { (*p.get()).recv() } { Ok(t) => return Ok(t), Err(shared::Empty) => return unreachable!(), - Err(shared::Disconnected) => return Err(RecvError), + Err(shared::Disconnected) => return Err(()), } } - Flavor::Sync(ref p) => return unsafe { - (*p.get()).recv().map_err(|()| RecvError) - } + Flavor::Sync(ref p) => return unsafe { (*p.get()).recv() } }; unsafe { mem::swap(self.inner_mut(), new_port.inner_mut()); @@ -843,9 +914,9 @@ impl<T: Send> Receiver<T> { /// Returns an iterator that will block waiting for messages, but never /// `panic!`. It will return `None` when the channel has hung up. - #[stable] - pub fn iter(&self) -> Iter<T> { - Iter { rx: self } + #[unstable] + pub fn iter<'a>(&'a self) -> Messages<'a, T> { + Messages { rx: self } } } @@ -936,10 +1007,8 @@ impl<T: Send> select::Packet for Receiver<T> { } #[unstable] -impl<'a, T: Send> Iterator for Iter<'a, T> { - type Item = T; - - fn next(&mut self) -> Option<T> { self.rx.recv().ok() } +impl<'a, T: Send> Iterator<T> for Messages<'a, T> { + fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() } } #[unsafe_destructor] @@ -972,425 +1041,368 @@ impl<T> RacyCell<T> { unsafe impl<T:Send> Send for RacyCell<T> { } -unsafe impl<T> Sync for RacyCell<T> { } // Oh dear - -impl<T> fmt::Show for SendError<T> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - "sending on a closed channel".fmt(f) - } -} - -impl<T> fmt::Show for TrySendError<T> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - TrySendError::Full(..) => { - "sending on a full channel".fmt(f) - } - TrySendError::Disconnected(..) => { - "sending on a closed channel".fmt(f) - } - } - } -} - -impl fmt::Show for RecvError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - "receiving on a closed channel".fmt(f) - } -} +unsafe impl<T> kinds::Sync for RacyCell<T> { } // Oh dear -impl fmt::Show for TryRecvError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - TryRecvError::Empty => { - "receiving on an empty channel".fmt(f) - } - TryRecvError::Disconnected => { - "receiving on a closed channel".fmt(f) - } - } - } -} #[cfg(test)] mod test { - use prelude::v1::*; - - use os; use super::*; - use thread::Thread; + use prelude::{spawn, range, Some, None, from_str, Clone, Str}; + use os; pub fn stress_factor() -> uint { match os::getenv("RUST_TEST_STRESS") { - Some(val) => val.parse().unwrap(), + Some(val) => from_str::<uint>(val.as_slice()).unwrap(), None => 1, } } - #[test] - fn smoke() { + test! { fn smoke() { let (tx, rx) = channel::<int>(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } + tx.send(1); + assert_eq!(rx.recv(), 1); + } } - #[test] - fn drop_full() { + test! { fn drop_full() { let (tx, _rx) = channel(); - tx.send(box 1i).unwrap(); - } + tx.send(box 1i); + } } - #[test] - fn drop_full_shared() { + test! { fn drop_full_shared() { let (tx, _rx) = channel(); drop(tx.clone()); drop(tx.clone()); - tx.send(box 1i).unwrap(); - } + tx.send(box 1i); + } } - #[test] - fn smoke_shared() { + test! { fn smoke_shared() { let (tx, rx) = channel::<int>(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); + tx.send(1); + assert_eq!(rx.recv(), 1); let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } + tx.send(1); + assert_eq!(rx.recv(), 1); + } } - #[test] - fn smoke_threads() { + test! { fn smoke_threads() { let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { - tx.send(1).unwrap(); + spawn(move|| { + tx.send(1); }); - assert_eq!(rx.recv().unwrap(), 1); - } + assert_eq!(rx.recv(), 1); + } } - #[test] - fn smoke_port_gone() { + test! { fn smoke_port_gone() { let (tx, rx) = channel::<int>(); drop(rx); - assert!(tx.send(1).is_err()); - } + tx.send(1); + } #[should_fail] } - #[test] - fn smoke_shared_port_gone() { + test! { fn smoke_shared_port_gone() { let (tx, rx) = channel::<int>(); drop(rx); - assert!(tx.send(1).is_err()) - } + tx.send(1); + } #[should_fail] } - #[test] - fn smoke_shared_port_gone2() { + test! { fn smoke_shared_port_gone2() { let (tx, rx) = channel::<int>(); drop(rx); let tx2 = tx.clone(); drop(tx); - assert!(tx2.send(1).is_err()); - } + tx2.send(1); + } #[should_fail] } - #[test] - fn port_gone_concurrent() { + test! { fn port_gone_concurrent() { let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { - rx.recv().unwrap(); + spawn(move|| { + rx.recv(); }); - while tx.send(1).is_ok() {} - } + loop { tx.send(1) } + } #[should_fail] } - #[test] - fn port_gone_concurrent_shared() { + test! { fn port_gone_concurrent_shared() { let (tx, rx) = channel::<int>(); let tx2 = tx.clone(); - let _t = Thread::spawn(move|| { - rx.recv().unwrap(); + spawn(move|| { + rx.recv(); }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} - } + loop { + tx.send(1); + tx2.send(1); + } + } #[should_fail] } - #[test] - fn smoke_chan_gone() { + test! { fn smoke_chan_gone() { let (tx, rx) = channel::<int>(); drop(tx); - assert!(rx.recv().is_err()); - } + rx.recv(); + } #[should_fail] } - #[test] - fn smoke_chan_gone_shared() { + test! { fn smoke_chan_gone_shared() { let (tx, rx) = channel::<()>(); let tx2 = tx.clone(); drop(tx); drop(tx2); - assert!(rx.recv().is_err()); - } + rx.recv(); + } #[should_fail] } - #[test] - fn chan_gone_concurrent() { + test! { fn chan_gone_concurrent() { let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { - tx.send(1).unwrap(); - tx.send(1).unwrap(); + spawn(move|| { + tx.send(1); + tx.send(1); }); - while rx.recv().is_ok() {} - } + loop { rx.recv(); } + } #[should_fail] } - #[test] - fn stress() { + test! { fn stress() { let (tx, rx) = channel::<int>(); - let t = Thread::spawn(move|| { - for _ in range(0u, 10000) { tx.send(1i).unwrap(); } + spawn(move|| { + for _ in range(0u, 10000) { tx.send(1i); } }); for _ in range(0u, 10000) { - assert_eq!(rx.recv().unwrap(), 1); + assert_eq!(rx.recv(), 1); } - t.join().ok().unwrap(); - } + } } - #[test] - fn stress_shared() { + test! { fn stress_shared() { static AMT: uint = 10000; static NTHREADS: uint = 8; let (tx, rx) = channel::<int>(); + let (dtx, drx) = channel::<()>(); - let t = Thread::spawn(move|| { + spawn(move|| { for _ in range(0, AMT * NTHREADS) { - assert_eq!(rx.recv().unwrap(), 1); + assert_eq!(rx.recv(), 1); } match rx.try_recv() { Ok(..) => panic!(), _ => {} } + dtx.send(()); }); for _ in range(0, NTHREADS) { let tx = tx.clone(); - Thread::spawn(move|| { - for _ in range(0, AMT) { tx.send(1).unwrap(); } - }).detach(); + spawn(move|| { + for _ in range(0, AMT) { tx.send(1); } + }); } drop(tx); - t.join().ok().unwrap(); - } + drx.recv(); + } } #[test] fn send_from_outside_runtime() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::<int>(); - let t1 = Thread::spawn(move|| { - tx1.send(()).unwrap(); + let (tx3, rx3) = channel::<()>(); + let tx4 = tx3.clone(); + spawn(move|| { + tx1.send(()); for _ in range(0i, 40) { - assert_eq!(rx2.recv().unwrap(), 1); + assert_eq!(rx2.recv(), 1); } + tx3.send(()); }); - rx1.recv().unwrap(); - let t2 = Thread::spawn(move|| { + rx1.recv(); + spawn(move|| { for _ in range(0i, 40) { - tx2.send(1).unwrap(); + tx2.send(1); } + tx4.send(()); }); - t1.join().ok().unwrap(); - t2.join().ok().unwrap(); + rx3.recv(); + rx3.recv(); } #[test] fn recv_from_outside_runtime() { let (tx, rx) = channel::<int>(); - let t = Thread::spawn(move|| { + let (dtx, drx) = channel(); + spawn(move|| { for _ in range(0i, 40) { - assert_eq!(rx.recv().unwrap(), 1); + assert_eq!(rx.recv(), 1); } + dtx.send(()); }); for _ in range(0u, 40) { - tx.send(1).unwrap(); + tx.send(1); } - t.join().ok().unwrap(); + drx.recv(); } #[test] fn no_runtime() { let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<int>(); - let t1 = Thread::spawn(move|| { - assert_eq!(rx1.recv().unwrap(), 1); - tx2.send(2).unwrap(); + let (tx3, rx3) = channel::<()>(); + let tx4 = tx3.clone(); + spawn(move|| { + assert_eq!(rx1.recv(), 1); + tx2.send(2); + tx4.send(()); }); - let t2 = Thread::spawn(move|| { - tx1.send(1).unwrap(); - assert_eq!(rx2.recv().unwrap(), 2); + spawn(move|| { + tx1.send(1); + assert_eq!(rx2.recv(), 2); + tx3.send(()); }); - t1.join().ok().unwrap(); - t2.join().ok().unwrap(); + rx3.recv(); + rx3.recv(); } - #[test] - fn oneshot_single_thread_close_port_first() { + test! { fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending let (_tx, rx) = channel::<int>(); drop(rx); - } + } } - #[test] - fn oneshot_single_thread_close_chan_first() { + test! { fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending let (tx, _rx) = channel::<int>(); drop(tx); - } + } } - #[test] - fn oneshot_single_thread_send_port_close() { + test! { fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed let (tx, rx) = channel::<Box<int>>(); drop(rx); - assert!(tx.send(box 0).is_err()); - } + tx.send(box 0); + } #[should_fail] } - #[test] - fn oneshot_single_thread_recv_chan_close() { + test! { fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic let res = Thread::spawn(move|| { let (tx, rx) = channel::<int>(); drop(tx); - rx.recv().unwrap(); + rx.recv(); }).join(); // What is our res? assert!(res.is_err()); - } + } } - #[test] - fn oneshot_single_thread_send_then_recv() { + test! { fn oneshot_single_thread_send_then_recv() { let (tx, rx) = channel::<Box<int>>(); - tx.send(box 10).unwrap(); - assert!(rx.recv().unwrap() == box 10); - } + tx.send(box 10); + assert!(rx.recv() == box 10); + } } - #[test] - fn oneshot_single_thread_try_send_open() { + test! { fn oneshot_single_thread_try_send_open() { let (tx, rx) = channel::<int>(); - assert!(tx.send(10).is_ok()); - assert!(rx.recv().unwrap() == 10); - } + assert!(tx.send_opt(10).is_ok()); + assert!(rx.recv() == 10); + } } - #[test] - fn oneshot_single_thread_try_send_closed() { + test! { fn oneshot_single_thread_try_send_closed() { let (tx, rx) = channel::<int>(); drop(rx); - assert!(tx.send(10).is_err()); - } + assert!(tx.send_opt(10).is_err()); + } } - #[test] - fn oneshot_single_thread_try_recv_open() { + test! { fn oneshot_single_thread_try_recv_open() { let (tx, rx) = channel::<int>(); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); - } + tx.send(10); + assert!(rx.recv_opt() == Ok(10)); + } } - #[test] - fn oneshot_single_thread_try_recv_closed() { + test! { fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = channel::<int>(); drop(tx); - assert!(rx.recv().is_err()); - } + assert!(rx.recv_opt() == Err(())); + } } - #[test] - fn oneshot_single_thread_peek_data() { + test! { fn oneshot_single_thread_peek_data() { let (tx, rx) = channel::<int>(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Err(Empty)); + tx.send(10); assert_eq!(rx.try_recv(), Ok(10)); - } + } } - #[test] - fn oneshot_single_thread_peek_close() { + test! { fn oneshot_single_thread_peek_close() { let (tx, rx) = channel::<int>(); drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); + } } - #[test] - fn oneshot_single_thread_peek_open() { + test! { fn oneshot_single_thread_peek_open() { let (_tx, rx) = channel::<int>(); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - } + assert_eq!(rx.try_recv(), Err(Empty)); + } } - #[test] - fn oneshot_multi_task_recv_then_send() { + test! { fn oneshot_multi_task_recv_then_send() { let (tx, rx) = channel::<Box<int>>(); - let _t = Thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + spawn(move|| { + assert!(rx.recv() == box 10); }); - tx.send(box 10).unwrap(); - } + tx.send(box 10); + } } - #[test] - fn oneshot_multi_task_recv_then_close() { + test! { fn oneshot_multi_task_recv_then_close() { let (tx, rx) = channel::<Box<int>>(); - let _t = Thread::spawn(move|| { + spawn(move|| { drop(tx); }); let res = Thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + assert!(rx.recv() == box 10); }).join(); assert!(res.is_err()); - } + } } - #[test] - fn oneshot_multi_thread_close_stress() { + test! { fn oneshot_multi_thread_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { + spawn(move|| { drop(rx); }); drop(tx); } - } + } } - #[test] - fn oneshot_multi_thread_send_close_stress() { + test! { fn oneshot_multi_thread_send_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); - let _t = Thread::spawn(move|| { + spawn(move|| { drop(rx); }); let _ = Thread::spawn(move|| { - tx.send(1).unwrap(); + tx.send(1); }).join(); } - } + } } - #[test] - fn oneshot_multi_thread_recv_close_stress() { + test! { fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); - Thread::spawn(move|| { + spawn(move|| { let res = Thread::spawn(move|| { - rx.recv().unwrap(); + rx.recv(); }).join(); assert!(res.is_err()); - }).detach(); - let _t = Thread::spawn(move|| { - Thread::spawn(move|| { + }); + spawn(move|| { + spawn(move|| { drop(tx); - }).detach(); + }); }); } - } + } } - #[test] - fn oneshot_multi_thread_send_recv_stress() { + test! { fn oneshot_multi_thread_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel(); - let _t = Thread::spawn(move|| { - tx.send(box 10i).unwrap(); + spawn(move|| { + tx.send(box 10i); + }); + spawn(move|| { + assert!(rx.recv() == box 10i); }); - assert!(rx.recv().unwrap() == box 10i); } - } + } } - #[test] - fn stream_send_recv_stress() { + test! { fn stream_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = channel(); @@ -1400,73 +1412,69 @@ mod test { fn send(tx: Sender<Box<int>>, i: int) { if i == 10 { return } - Thread::spawn(move|| { - tx.send(box i).unwrap(); + spawn(move|| { + tx.send(box i); send(tx, i + 1); - }).detach(); + }); } fn recv(rx: Receiver<Box<int>>, i: int) { if i == 10 { return } - Thread::spawn(move|| { - assert!(rx.recv().unwrap() == box i); + spawn(move|| { + assert!(rx.recv() == box i); recv(rx, i + 1); - }).detach(); + }); } } - } + } } - #[test] - fn recv_a_lot() { + test! { fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = channel(); - for _ in range(0i, 10000) { tx.send(()).unwrap(); } - for _ in range(0i, 10000) { rx.recv().unwrap(); } - } + for _ in range(0i, 10000) { tx.send(()); } + for _ in range(0i, 10000) { rx.recv(); } + } } - #[test] - fn shared_chan_stress() { + test! { fn shared_chan_stress() { let (tx, rx) = channel(); let total = stress_factor() + 100; for _ in range(0, total) { let tx = tx.clone(); - Thread::spawn(move|| { - tx.send(()).unwrap(); - }).detach(); + spawn(move|| { + tx.send(()); + }); } for _ in range(0, total) { - rx.recv().unwrap(); + rx.recv(); } - } + } } - #[test] - fn test_nested_recv_iter() { + test! { fn test_nested_recv_iter() { let (tx, rx) = channel::<int>(); let (total_tx, total_rx) = channel::<int>(); - let _t = Thread::spawn(move|| { + spawn(move|| { let mut acc = 0; for x in rx.iter() { acc += x; } - total_tx.send(acc).unwrap(); + total_tx.send(acc); }); - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); + tx.send(3); + tx.send(1); + tx.send(2); drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); - } + assert_eq!(total_rx.recv(), 6); + } } - #[test] - fn test_recv_iter_break() { + test! { fn test_recv_iter_break() { let (tx, rx) = channel::<int>(); let (count_tx, count_rx) = channel(); - let _t = Thread::spawn(move|| { + spawn(move|| { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -1475,51 +1483,49 @@ mod test { count += x; } } - count_tx.send(count).unwrap(); + count_tx.send(count); }); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - let _ = tx.send(2); + tx.send(2); + tx.send(2); + tx.send(2); + let _ = tx.send_opt(2); drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); - } + assert_eq!(count_rx.recv(), 4); + } } - #[test] - fn try_recv_states() { + test! { fn try_recv_states() { let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<()>(); let (tx3, rx3) = channel::<()>(); - let _t = Thread::spawn(move|| { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); + spawn(move|| { + rx2.recv(); + tx1.send(1); + tx3.send(()); + rx2.recv(); drop(tx1); - tx3.send(()).unwrap(); + tx3.send(()); }); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); - } + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Err(Disconnected)); + } } // This bug used to end up in a livelock inside of the Receiver destructor // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { + test! { fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = channel(); let (tx2, rx2) = channel(); - let _t = Thread::spawn(move|| { - rx.recv().unwrap(); // wait on a oneshot + spawn(move|| { + rx.recv(); // wait on a oneshot drop(rx); // destroy a shared - tx2.send(()).unwrap(); + tx2.send(()); }); // make sure the other task has gone to sleep for _ in range(0u, 5000) { Thread::yield_now(); } @@ -1527,334 +1533,303 @@ mod test { // upgrade to a shared chan and send a message let t = tx.clone(); drop(tx); - t.send(()).unwrap(); + t.send(()); // wait for the child task to exit before we exit - rx2.recv().unwrap(); - } + rx2.recv(); + }} } #[cfg(test)] mod sync_tests { - use prelude::v1::*; - + use prelude::*; use os; - use thread::Thread; - use super::*; pub fn stress_factor() -> uint { match os::getenv("RUST_TEST_STRESS") { - Some(val) => val.parse().unwrap(), + Some(val) => from_str::<uint>(val.as_slice()).unwrap(), None => 1, } } - #[test] - fn smoke() { + test! { fn smoke() { let (tx, rx) = sync_channel::<int>(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } + tx.send(1); + assert_eq!(rx.recv(), 1); + } } - #[test] - fn drop_full() { + test! { fn drop_full() { let (tx, _rx) = sync_channel(1); - tx.send(box 1i).unwrap(); - } + tx.send(box 1i); + } } - #[test] - fn smoke_shared() { + test! { fn smoke_shared() { let (tx, rx) = sync_channel::<int>(1); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); + tx.send(1); + assert_eq!(rx.recv(), 1); let tx = tx.clone(); - tx.send(1).unwrap(); - assert_eq!(rx.recv().unwrap(), 1); - } + tx.send(1); + assert_eq!(rx.recv(), 1); + } } - #[test] - fn smoke_threads() { + test! { fn smoke_threads() { let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { - tx.send(1).unwrap(); + spawn(move|| { + tx.send(1); }); - assert_eq!(rx.recv().unwrap(), 1); - } + assert_eq!(rx.recv(), 1); + } } - #[test] - fn smoke_port_gone() { + test! { fn smoke_port_gone() { let (tx, rx) = sync_channel::<int>(0); drop(rx); - assert!(tx.send(1).is_err()); - } + tx.send(1); + } #[should_fail] } - #[test] - fn smoke_shared_port_gone2() { + test! { fn smoke_shared_port_gone2() { let (tx, rx) = sync_channel::<int>(0); drop(rx); let tx2 = tx.clone(); drop(tx); - assert!(tx2.send(1).is_err()); - } + tx2.send(1); + } #[should_fail] } - #[test] - fn port_gone_concurrent() { + test! { fn port_gone_concurrent() { let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { - rx.recv().unwrap(); + spawn(move|| { + rx.recv(); }); - while tx.send(1).is_ok() {} - } + loop { tx.send(1) } + } #[should_fail] } - #[test] - fn port_gone_concurrent_shared() { + test! { fn port_gone_concurrent_shared() { let (tx, rx) = sync_channel::<int>(0); let tx2 = tx.clone(); - let _t = Thread::spawn(move|| { - rx.recv().unwrap(); + spawn(move|| { + rx.recv(); }); - while tx.send(1).is_ok() && tx2.send(1).is_ok() {} - } + loop { + tx.send(1); + tx2.send(1); + } + } #[should_fail] } - #[test] - fn smoke_chan_gone() { + test! { fn smoke_chan_gone() { let (tx, rx) = sync_channel::<int>(0); drop(tx); - assert!(rx.recv().is_err()); - } + rx.recv(); + } #[should_fail] } - #[test] - fn smoke_chan_gone_shared() { + test! { fn smoke_chan_gone_shared() { let (tx, rx) = sync_channel::<()>(0); let tx2 = tx.clone(); drop(tx); drop(tx2); - assert!(rx.recv().is_err()); - } + rx.recv(); + } #[should_fail] } - #[test] - fn chan_gone_concurrent() { + test! { fn chan_gone_concurrent() { let (tx, rx) = sync_channel::<int>(0); - Thread::spawn(move|| { - tx.send(1).unwrap(); - tx.send(1).unwrap(); - }).detach(); - while rx.recv().is_ok() {} - } + spawn(move|| { + tx.send(1); + tx.send(1); + }); + loop { rx.recv(); } + } #[should_fail] } - #[test] - fn stress() { + test! { fn stress() { let (tx, rx) = sync_channel::<int>(0); - Thread::spawn(move|| { - for _ in range(0u, 10000) { tx.send(1).unwrap(); } - }).detach(); + spawn(move|| { + for _ in range(0u, 10000) { tx.send(1); } + }); for _ in range(0u, 10000) { - assert_eq!(rx.recv().unwrap(), 1); + assert_eq!(rx.recv(), 1); } - } + } } - #[test] - fn stress_shared() { + test! { fn stress_shared() { static AMT: uint = 1000; static NTHREADS: uint = 8; let (tx, rx) = sync_channel::<int>(0); let (dtx, drx) = sync_channel::<()>(0); - Thread::spawn(move|| { + spawn(move|| { for _ in range(0, AMT * NTHREADS) { - assert_eq!(rx.recv().unwrap(), 1); + assert_eq!(rx.recv(), 1); } match rx.try_recv() { Ok(..) => panic!(), _ => {} } - dtx.send(()).unwrap(); - }).detach(); + dtx.send(()); + }); for _ in range(0, NTHREADS) { let tx = tx.clone(); - Thread::spawn(move|| { - for _ in range(0, AMT) { tx.send(1).unwrap(); } - }).detach(); + spawn(move|| { + for _ in range(0, AMT) { tx.send(1); } + }); } drop(tx); - drx.recv().unwrap(); - } + drx.recv(); + } } - #[test] - fn oneshot_single_thread_close_port_first() { + test! { fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending let (_tx, rx) = sync_channel::<int>(0); drop(rx); - } + } } - #[test] - fn oneshot_single_thread_close_chan_first() { + test! { fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending let (tx, _rx) = sync_channel::<int>(0); drop(tx); - } + } } - #[test] - fn oneshot_single_thread_send_port_close() { + test! { fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed let (tx, rx) = sync_channel::<Box<int>>(0); drop(rx); - assert!(tx.send(box 0).is_err()); - } + tx.send(box 0); + } #[should_fail] } - #[test] - fn oneshot_single_thread_recv_chan_close() { + test! { fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic let res = Thread::spawn(move|| { let (tx, rx) = sync_channel::<int>(0); drop(tx); - rx.recv().unwrap(); + rx.recv(); }).join(); // What is our res? assert!(res.is_err()); - } + } } - #[test] - fn oneshot_single_thread_send_then_recv() { + test! { fn oneshot_single_thread_send_then_recv() { let (tx, rx) = sync_channel::<Box<int>>(1); - tx.send(box 10).unwrap(); - assert!(rx.recv().unwrap() == box 10); - } + tx.send(box 10); + assert!(rx.recv() == box 10); + } } - #[test] - fn oneshot_single_thread_try_send_open() { + test! { fn oneshot_single_thread_try_send_open() { let (tx, rx) = sync_channel::<int>(1); assert_eq!(tx.try_send(10), Ok(())); - assert!(rx.recv().unwrap() == 10); - } + assert!(rx.recv() == 10); + } } - #[test] - fn oneshot_single_thread_try_send_closed() { + test! { fn oneshot_single_thread_try_send_closed() { let (tx, rx) = sync_channel::<int>(0); drop(rx); - assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); - } + assert_eq!(tx.try_send(10), Err(RecvDisconnected(10))); + } } - #[test] - fn oneshot_single_thread_try_send_closed2() { + test! { fn oneshot_single_thread_try_send_closed2() { let (tx, _rx) = sync_channel::<int>(0); - assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); - } + assert_eq!(tx.try_send(10), Err(Full(10))); + } } - #[test] - fn oneshot_single_thread_try_recv_open() { + test! { fn oneshot_single_thread_try_recv_open() { let (tx, rx) = sync_channel::<int>(1); - tx.send(10).unwrap(); - assert!(rx.recv() == Ok(10)); - } + tx.send(10); + assert!(rx.recv_opt() == Ok(10)); + } } - #[test] - fn oneshot_single_thread_try_recv_closed() { + test! { fn oneshot_single_thread_try_recv_closed() { let (tx, rx) = sync_channel::<int>(0); drop(tx); - assert!(rx.recv().is_err()); - } + assert!(rx.recv_opt() == Err(())); + } } - #[test] - fn oneshot_single_thread_peek_data() { + test! { fn oneshot_single_thread_peek_data() { let (tx, rx) = sync_channel::<int>(1); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Err(Empty)); + tx.send(10); assert_eq!(rx.try_recv(), Ok(10)); - } + } } - #[test] - fn oneshot_single_thread_peek_close() { + test! { fn oneshot_single_thread_peek_close() { let (tx, rx) = sync_channel::<int>(0); drop(tx); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); - } + assert_eq!(rx.try_recv(), Err(Disconnected)); + assert_eq!(rx.try_recv(), Err(Disconnected)); + } } - #[test] - fn oneshot_single_thread_peek_open() { + test! { fn oneshot_single_thread_peek_open() { let (_tx, rx) = sync_channel::<int>(0); - assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); - } + assert_eq!(rx.try_recv(), Err(Empty)); + } } - #[test] - fn oneshot_multi_task_recv_then_send() { + test! { fn oneshot_multi_task_recv_then_send() { let (tx, rx) = sync_channel::<Box<int>>(0); - let _t = Thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + spawn(move|| { + assert!(rx.recv() == box 10); }); - tx.send(box 10).unwrap(); - } + tx.send(box 10); + } } - #[test] - fn oneshot_multi_task_recv_then_close() { + test! { fn oneshot_multi_task_recv_then_close() { let (tx, rx) = sync_channel::<Box<int>>(0); - let _t = Thread::spawn(move|| { + spawn(move|| { drop(tx); }); let res = Thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + assert!(rx.recv() == box 10); }).join(); assert!(res.is_err()); - } + } } - #[test] - fn oneshot_multi_thread_close_stress() { + test! { fn oneshot_multi_thread_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { + spawn(move|| { drop(rx); }); drop(tx); } - } + } } - #[test] - fn oneshot_multi_thread_send_close_stress() { + test! { fn oneshot_multi_thread_send_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { + spawn(move|| { drop(rx); }); let _ = Thread::spawn(move || { - tx.send(1).unwrap(); + tx.send(1); }).join(); } - } + } } - #[test] - fn oneshot_multi_thread_recv_close_stress() { + test! { fn oneshot_multi_thread_recv_close_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { + spawn(move|| { let res = Thread::spawn(move|| { - rx.recv().unwrap(); + rx.recv(); }).join(); assert!(res.is_err()); }); - let _t = Thread::spawn(move|| { - Thread::spawn(move|| { + spawn(move|| { + spawn(move|| { drop(tx); - }).detach(); + }); }); } - } + } } - #[test] - fn oneshot_multi_thread_send_recv_stress() { + test! { fn oneshot_multi_thread_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<Box<int>>(0); - let _t = Thread::spawn(move|| { - tx.send(box 10i).unwrap(); + spawn(move|| { + tx.send(box 10i); + }); + spawn(move|| { + assert!(rx.recv() == box 10i); }); - assert!(rx.recv().unwrap() == box 10i); } - } + } } - #[test] - fn stream_send_recv_stress() { + test! { fn stream_send_recv_stress() { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<Box<int>>(0); @@ -1864,73 +1839,69 @@ mod sync_tests { fn send(tx: SyncSender<Box<int>>, i: int) { if i == 10 { return } - Thread::spawn(move|| { - tx.send(box i).unwrap(); + spawn(move|| { + tx.send(box i); send(tx, i + 1); - }).detach(); + }); } fn recv(rx: Receiver<Box<int>>, i: int) { if i == 10 { return } - Thread::spawn(move|| { - assert!(rx.recv().unwrap() == box i); + spawn(move|| { + assert!(rx.recv() == box i); recv(rx, i + 1); - }).detach(); + }); } } - } + } } - #[test] - fn recv_a_lot() { + test! { fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context let (tx, rx) = sync_channel(10000); - for _ in range(0u, 10000) { tx.send(()).unwrap(); } - for _ in range(0u, 10000) { rx.recv().unwrap(); } - } + for _ in range(0u, 10000) { tx.send(()); } + for _ in range(0u, 10000) { rx.recv(); } + } } - #[test] - fn shared_chan_stress() { + test! { fn shared_chan_stress() { let (tx, rx) = sync_channel(0); let total = stress_factor() + 100; for _ in range(0, total) { let tx = tx.clone(); - Thread::spawn(move|| { - tx.send(()).unwrap(); - }).detach(); + spawn(move|| { + tx.send(()); + }); } for _ in range(0, total) { - rx.recv().unwrap(); + rx.recv(); } - } + } } - #[test] - fn test_nested_recv_iter() { + test! { fn test_nested_recv_iter() { let (tx, rx) = sync_channel::<int>(0); let (total_tx, total_rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { + spawn(move|| { let mut acc = 0; for x in rx.iter() { acc += x; } - total_tx.send(acc).unwrap(); + total_tx.send(acc); }); - tx.send(3).unwrap(); - tx.send(1).unwrap(); - tx.send(2).unwrap(); + tx.send(3); + tx.send(1); + tx.send(2); drop(tx); - assert_eq!(total_rx.recv().unwrap(), 6); - } + assert_eq!(total_rx.recv(), 6); + } } - #[test] - fn test_recv_iter_break() { + test! { fn test_recv_iter_break() { let (tx, rx) = sync_channel::<int>(0); let (count_tx, count_rx) = sync_channel(0); - let _t = Thread::spawn(move|| { + spawn(move|| { let mut count = 0; for x in rx.iter() { if count >= 3 { @@ -1939,51 +1910,49 @@ mod sync_tests { count += x; } } - count_tx.send(count).unwrap(); + count_tx.send(count); }); - tx.send(2).unwrap(); - tx.send(2).unwrap(); - tx.send(2).unwrap(); + tx.send(2); + tx.send(2); + tx.send(2); let _ = tx.try_send(2); drop(tx); - assert_eq!(count_rx.recv().unwrap(), 4); - } + assert_eq!(count_rx.recv(), 4); + } } - #[test] - fn try_recv_states() { + test! { fn try_recv_states() { let (tx1, rx1) = sync_channel::<int>(1); let (tx2, rx2) = sync_channel::<()>(1); let (tx3, rx3) = sync_channel::<()>(1); - let _t = Thread::spawn(move|| { - rx2.recv().unwrap(); - tx1.send(1).unwrap(); - tx3.send(()).unwrap(); - rx2.recv().unwrap(); + spawn(move|| { + rx2.recv(); + tx1.send(1); + tx3.send(()); + rx2.recv(); drop(tx1); - tx3.send(()).unwrap(); + tx3.send(()); }); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); assert_eq!(rx1.try_recv(), Ok(1)); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); - tx2.send(()).unwrap(); - rx3.recv().unwrap(); - assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); - } + assert_eq!(rx1.try_recv(), Err(Empty)); + tx2.send(()); + rx3.recv(); + assert_eq!(rx1.try_recv(), Err(Disconnected)); + } } // This bug used to end up in a livelock inside of the Receiver destructor // because the internal state of the Shared packet was corrupted - #[test] - fn destroy_upgraded_shared_port_when_sender_still_active() { + test! { fn destroy_upgraded_shared_port_when_sender_still_active() { let (tx, rx) = sync_channel::<()>(0); let (tx2, rx2) = sync_channel::<()>(0); - let _t = Thread::spawn(move|| { - rx.recv().unwrap(); // wait on a oneshot + spawn(move|| { + rx.recv(); // wait on a oneshot drop(rx); // destroy a shared - tx2.send(()).unwrap(); + tx2.send(()); }); // make sure the other task has gone to sleep for _ in range(0u, 5000) { Thread::yield_now(); } @@ -1991,91 +1960,92 @@ mod sync_tests { // upgrade to a shared chan and send a message let t = tx.clone(); drop(tx); - t.send(()).unwrap(); + t.send(()); // wait for the child task to exit before we exit - rx2.recv().unwrap(); - } + rx2.recv(); + } } - #[test] - fn send1() { + test! { fn send_opt1() { let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { rx.recv().unwrap(); }); - assert_eq!(tx.send(1), Ok(())); - } + spawn(move|| { rx.recv(); }); + assert_eq!(tx.send_opt(1), Ok(())); + } } - #[test] - fn send2() { + test! { fn send_opt2() { let (tx, rx) = sync_channel::<int>(0); - let _t = Thread::spawn(move|| { drop(rx); }); - assert!(tx.send(1).is_err()); - } + spawn(move|| { drop(rx); }); + assert_eq!(tx.send_opt(1), Err(1)); + } } - #[test] - fn send3() { + test! { fn send_opt3() { let (tx, rx) = sync_channel::<int>(1); - assert_eq!(tx.send(1), Ok(())); - let _t =Thread::spawn(move|| { drop(rx); }); - assert!(tx.send(1).is_err()); - } + assert_eq!(tx.send_opt(1), Ok(())); + spawn(move|| { drop(rx); }); + assert_eq!(tx.send_opt(1), Err(1)); + } } - #[test] - fn send4() { + test! { fn send_opt4() { let (tx, rx) = sync_channel::<int>(0); let tx2 = tx.clone(); let (done, donerx) = channel(); let done2 = done.clone(); - let _t = Thread::spawn(move|| { - assert!(tx.send(1).is_err()); - done.send(()).unwrap(); + spawn(move|| { + assert_eq!(tx.send_opt(1), Err(1)); + done.send(()); }); - let _t = Thread::spawn(move|| { - assert!(tx2.send(2).is_err()); - done2.send(()).unwrap(); + spawn(move|| { + assert_eq!(tx2.send_opt(2), Err(2)); + done2.send(()); }); drop(rx); - donerx.recv().unwrap(); - donerx.recv().unwrap(); - } + donerx.recv(); + donerx.recv(); + } } - #[test] - fn try_send1() { + test! { fn try_send1() { let (tx, _rx) = sync_channel::<int>(0); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); - } + assert_eq!(tx.try_send(1), Err(Full(1))); + } } - #[test] - fn try_send2() { + test! { fn try_send2() { let (tx, _rx) = sync_channel::<int>(1); assert_eq!(tx.try_send(1), Ok(())); - assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); - } + assert_eq!(tx.try_send(1), Err(Full(1))); + } } - #[test] - fn try_send3() { + test! { fn try_send3() { let (tx, rx) = sync_channel::<int>(1); assert_eq!(tx.try_send(1), Ok(())); drop(rx); - assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); - } + assert_eq!(tx.try_send(1), Err(RecvDisconnected(1))); + } } - #[test] - fn issue_15761() { + test! { fn try_send4() { + let (tx, rx) = sync_channel::<int>(0); + spawn(move|| { + for _ in range(0u, 1000) { Thread::yield_now(); } + assert_eq!(tx.try_send(1), Ok(())); + }); + assert_eq!(rx.recv(), 1); + } #[ignore(reason = "flaky on libnative")] } + + test! { fn issue_15761() { fn repro() { let (tx1, rx1) = sync_channel::<()>(3); let (tx2, rx2) = sync_channel::<()>(3); - let _t = Thread::spawn(move|| { - rx1.recv().unwrap(); + spawn(move|| { + rx1.recv(); tx2.try_send(()).unwrap(); }); tx1.try_send(()).unwrap(); - rx2.recv().unwrap(); + rx2.recv(); } for _ in range(0u, 100) { repro() } - } + } } } diff --git a/src/libstd/sync/rwlock.rs b/src/libstd/sync/rwlock.rs index b2367ff8352..bd98b09d779 100644 --- a/src/libstd/sync/rwlock.rs +++ b/src/libstd/sync/rwlock.rs @@ -41,7 +41,7 @@ use sys_common::rwlock as sys; /// ``` /// use std::sync::RWLock; /// -/// let lock = RWLock::new(5i); +/// let lock = RWLock::new(5); /// /// // many reader locks can be held at once /// { |
