diff options
| author | Alex Crichton <alex@alexcrichton.com> | 2015-01-06 15:38:38 -0800 |
|---|---|---|
| committer | Alex Crichton <alex@alexcrichton.com> | 2015-01-06 15:38:38 -0800 |
| commit | 36f5d122b80682de473aeda2e20f14b6ceb86d74 (patch) | |
| tree | 84fff634fa8759f4ef8d90651f8a9c242fb8ea51 /src/libstd | |
| parent | 0631b466c23ffdb1edb2997a8da2702cfe6fcd4a (diff) | |
| parent | caca9b2e7109a148d100a3c6851241d3815da3db (diff) | |
| download | rust-36f5d122b80682de473aeda2e20f14b6ceb86d74.tar.gz rust-36f5d122b80682de473aeda2e20f14b6ceb86d74.zip | |
rollup merge of #20615: aturon/stab-2-thread
This commit takes a first pass at stabilizing `std::thread`: * It removes the `detach` method in favor of two constructors -- `spawn` for detached threads, `scoped` for "scoped" (i.e., must-join) threads. This addresses some of the surprise/frustrating debug sessions with the previous API, in which `spawn` produced a guard that on destruction joined the thread (unless `detach` was called). The reason to have the division in part is that `Send` will soon not imply `'static`, which means that `scoped` thread creation can take a closure over *shared stack data* of the parent thread. On the other hand, this means that the parent must not pop the relevant stack frames while the child thread is running. The `JoinGuard` is used to prevent this from happening by joining on drop (if you have not already explicitly `join`ed.) The APIs around `scoped` are future-proofed for the `Send` changes by taking an additional lifetime parameter. With the current definition of `Send`, this is forced to be `'static`, but when `Send` changes these APIs will gain their full flexibility immediately. Threads that are `spawn`ed, on the other hand, are detached from the start and do not yield an RAII guard. The hope is that, by making `scoped` an explicit opt-in with a very suggestive name, it will be drastically less likely to be caught by a surprising deadlock due to an implicit join at the end of a scope. * The module itself is marked stable. * Existing methods other than `spawn` and `scoped` are marked stable. The migration path is: ```rust Thread::spawn(f).detached() ``` becomes ```rust Thread::spawn(f) ``` while ```rust let res = Thread::spawn(f); res.join() ``` becomes ```rust let res = Thread::scoped(f); res.join() ``` [breaking-change]
Diffstat (limited to 'src/libstd')
| -rw-r--r-- | src/libstd/io/comm_adapters.rs | 6 | ||||
| -rw-r--r-- | src/libstd/io/mod.rs | 10 | ||||
| -rw-r--r-- | src/libstd/io/net/pipe.rs | 12 | ||||
| -rw-r--r-- | src/libstd/io/net/tcp.rs | 26 | ||||
| -rw-r--r-- | src/libstd/io/process.rs | 2 | ||||
| -rw-r--r-- | src/libstd/io/timer.rs | 6 | ||||
| -rw-r--r-- | src/libstd/macros.rs | 4 | ||||
| -rw-r--r-- | src/libstd/path/posix.rs | 6 | ||||
| -rw-r--r-- | src/libstd/path/windows.rs | 6 | ||||
| -rw-r--r-- | src/libstd/rand/os.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/barrier.rs | 4 | ||||
| -rw-r--r-- | src/libstd/sync/condvar.rs | 4 | ||||
| -rw-r--r-- | src/libstd/sync/future.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/mod.rs | 68 | ||||
| -rw-r--r-- | src/libstd/sync/mpsc/mpsc_queue.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/mutex.rs | 12 | ||||
| -rw-r--r-- | src/libstd/sync/once.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/rwlock.rs | 16 | ||||
| -rw-r--r-- | src/libstd/sync/semaphore.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sync/task_pool.rs | 2 | ||||
| -rw-r--r-- | src/libstd/sys/common/helper_thread.rs | 2 | ||||
| -rw-r--r-- | src/libstd/thread.rs | 198 | ||||
| -rw-r--r-- | src/libstd/thread_local/mod.rs | 8 |
23 files changed, 237 insertions, 165 deletions
diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index bcd0c09b77d..bce097e17ef 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -173,7 +173,7 @@ mod test { tx.send(vec![3u8, 4u8]).unwrap(); tx.send(vec![5u8, 6u8]).unwrap(); tx.send(vec![7u8, 8u8]).unwrap(); - }).detach(); + }); let mut reader = ChanReader::new(rx); let mut buf = [0u8; 3]; @@ -216,7 +216,7 @@ mod test { tx.send(b"rld\nhow ".to_vec()).unwrap(); tx.send(b"are you?".to_vec()).unwrap(); tx.send(b"".to_vec()).unwrap(); - }).detach(); + }); let mut reader = ChanReader::new(rx); @@ -235,7 +235,7 @@ mod test { writer.write_be_u32(42).unwrap(); let wanted = vec![0u8, 0u8, 0u8, 42u8]; - let got = match Thread::spawn(move|| { rx.recv().unwrap() }).join() { + let got = match Thread::scoped(move|| { rx.recv().unwrap() }).join() { Ok(got) => got, Err(_) => panic!(), }; diff --git a/src/libstd/io/mod.rs b/src/libstd/io/mod.rs index 42f72fa3259..9ef9081bc3c 100644 --- a/src/libstd/io/mod.rs +++ b/src/libstd/io/mod.rs @@ -120,10 +120,12 @@ //! for stream in acceptor.incoming() { //! match stream { //! Err(e) => { /* connection failed */ } -//! Ok(stream) => Thread::spawn(move|| { -//! // connection succeeded -//! handle_client(stream) -//! }).detach() +//! Ok(stream) => { +//! Thread::spawn(move|| { +//! // connection succeeded +//! handle_client(stream) +//! }); +//! } //! } //! } //! diff --git a/src/libstd/io/net/pipe.rs b/src/libstd/io/net/pipe.rs index 738c70412f7..29295b5751c 100644 --- a/src/libstd/io/net/pipe.rs +++ b/src/libstd/io/net/pipe.rs @@ -608,7 +608,7 @@ mod tests { let mut a = a; let _s = a.accept().unwrap(); let _ = rx.recv(); - }).detach(); + }); let mut b = [0]; let mut s = UnixStream::connect(&addr).unwrap(); @@ -645,7 +645,7 @@ mod tests { let mut a = a; let _s = a.accept().unwrap(); let _ = rx.recv(); - }).detach(); + }); let mut s = UnixStream::connect(&addr).unwrap(); let s2 = s.clone(); @@ -672,7 +672,7 @@ mod tests { rx.recv().unwrap(); assert!(s.write(&[0]).is_ok()); let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); s.set_timeout(Some(20)); @@ -716,7 +716,7 @@ mod tests { } } let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); s.set_read_timeout(Some(20)); @@ -739,7 +739,7 @@ mod tests { rx.recv().unwrap(); assert!(s.write(&[0]).is_ok()); let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); s.set_write_timeout(Some(20)); @@ -766,7 +766,7 @@ mod tests { rx.recv().unwrap(); assert!(s.write(&[0]).is_ok()); let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); let s2 = s.clone(); diff --git a/src/libstd/io/net/tcp.rs b/src/libstd/io/net/tcp.rs index 3e59aaa05ef..b1762ff26fc 100644 --- a/src/libstd/io/net/tcp.rs +++ b/src/libstd/io/net/tcp.rs @@ -146,7 +146,7 @@ impl TcpStream { /// timer::sleep(Duration::seconds(1)); /// let mut stream = stream2; /// stream.close_read(); - /// }).detach(); + /// }); /// /// // wait for some data, will get canceled after one second /// let mut buf = [0]; @@ -295,10 +295,12 @@ impl sys_common::AsInner<TcpStreamImp> for TcpStream { /// for stream in acceptor.incoming() { /// match stream { /// Err(e) => { /* connection failed */ } -/// Ok(stream) => Thread::spawn(move|| { -/// // connection succeeded -/// handle_client(stream) -/// }).detach() +/// Ok(stream) => { +/// Thread::spawn(move|| { +/// // connection succeeded +/// handle_client(stream) +/// }); +/// } /// } /// } /// @@ -432,7 +434,7 @@ impl TcpAcceptor { /// Err(e) => panic!("unexpected error: {}", e), /// } /// } - /// }).detach(); + /// }); /// /// # fn wait_for_sigint() {} /// // Now that our accept loop is running, wait for the program to be @@ -1186,7 +1188,7 @@ mod test { let mut a = a; let _s = a.accept().unwrap(); let _ = rx.recv().unwrap(); - }).detach(); + }); let mut b = [0]; let mut s = TcpStream::connect(addr).unwrap(); @@ -1223,7 +1225,7 @@ mod test { let mut a = a; let _s = a.accept().unwrap(); let _ = rx.recv().unwrap(); - }).detach(); + }); let mut s = TcpStream::connect(addr).unwrap(); let s2 = s.clone(); @@ -1250,7 +1252,7 @@ mod test { rx.recv().unwrap(); assert!(s.write(&[0]).is_ok()); let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); s.set_timeout(Some(20)); @@ -1289,7 +1291,7 @@ mod test { } } let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); s.set_read_timeout(Some(20)); @@ -1312,7 +1314,7 @@ mod test { rx.recv().unwrap(); assert!(s.write(&[0]).is_ok()); let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); s.set_write_timeout(Some(20)); @@ -1340,7 +1342,7 @@ mod test { rx.recv().unwrap(); assert_eq!(s.write(&[0]), Ok(())); let _ = rx.recv(); - }).detach(); + }); let mut s = a.accept().unwrap(); let s2 = s.clone(); diff --git a/src/libstd/io/process.rs b/src/libstd/io/process.rs index efb57341620..55df6330dd3 100644 --- a/src/libstd/io/process.rs +++ b/src/libstd/io/process.rs @@ -720,7 +720,7 @@ impl Process { Thread::spawn(move |:| { let mut stream = stream; tx.send(stream.read_to_end()).unwrap(); - }).detach(); + }); } None => tx.send(Ok(Vec::new())).unwrap() } diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index e073f76af82..8a0445be471 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -358,7 +358,7 @@ mod test { Thread::spawn(move|| { let _ = timer_rx.recv(); - }).detach(); + }); // when we drop the TimerWatcher we're going to destroy the channel, // which must wake up the task on the other end @@ -372,7 +372,7 @@ mod test { Thread::spawn(move|| { let _ = timer_rx.recv(); - }).detach(); + }); timer.oneshot(Duration::milliseconds(1)); } @@ -385,7 +385,7 @@ mod test { Thread::spawn(move|| { let _ = timer_rx.recv(); - }).detach(); + }); timer.sleep(Duration::milliseconds(1)); } diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index be3e49c0b82..43f86033da0 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -303,8 +303,8 @@ macro_rules! try { /// # fn long_running_task() {} /// # fn calculate_the_answer() -> int { 42i } /// -/// Thread::spawn(move|| { long_running_task(); tx1.send(()) }).detach(); -/// Thread::spawn(move|| { tx2.send(calculate_the_answer()) }).detach(); +/// Thread::spawn(move|| { long_running_task(); tx1.send(()).unwrap(); }); +/// Thread::spawn(move|| { tx2.send(calculate_the_answer()).unwrap(); }); /// /// select! ( /// _ = rx1.recv() => println!("the long running task finished first"), diff --git a/src/libstd/path/posix.rs b/src/libstd/path/posix.rs index a307132810e..805f45c61c1 100644 --- a/src/libstd/path/posix.rs +++ b/src/libstd/path/posix.rs @@ -510,17 +510,17 @@ mod tests { #[test] fn test_null_byte() { use thread::Thread; - let result = Thread::spawn(move|| { + let result = Thread::scoped(move|| { Path::new(b"foo/bar\0") }).join(); assert!(result.is_err()); - let result = Thread::spawn(move|| { + let result = Thread::scoped(move|| { Path::new("test").set_filename(b"f\0o") }).join(); assert!(result.is_err()); - let result = Thread::spawn(move|| { + let result = Thread::scoped(move|| { Path::new("test").push(b"f\0o"); }).join(); assert!(result.is_err()); diff --git a/src/libstd/path/windows.rs b/src/libstd/path/windows.rs index ff269b73476..bd3382b4288 100644 --- a/src/libstd/path/windows.rs +++ b/src/libstd/path/windows.rs @@ -1298,17 +1298,17 @@ mod tests { #[test] fn test_null_byte() { use thread::Thread; - let result = Thread::spawn(move|| { + let result = Thread::scoped(move|| { Path::new(b"foo/bar\0") }).join(); assert!(result.is_err()); - let result = Thread::spawn(move|| { + let result = Thread::scoped(move|| { Path::new("test").set_filename(b"f\0o") }).join(); assert!(result.is_err()); - let result = Thread::spawn(move || { + let result = Thread::scoped(move || { Path::new("test").push(b"f\0o"); }).join(); assert!(result.is_err()); diff --git a/src/libstd/rand/os.rs b/src/libstd/rand/os.rs index e697a1adeb5..18d40ecd3eb 100644 --- a/src/libstd/rand/os.rs +++ b/src/libstd/rand/os.rs @@ -394,7 +394,7 @@ mod test { r.fill_bytes(&mut v); Thread::yield_now(); } - }).detach(); + }); } // start all the tasks diff --git a/src/libstd/sync/barrier.rs b/src/libstd/sync/barrier.rs index bf5da3e7cba..70939879400 100644 --- a/src/libstd/sync/barrier.rs +++ b/src/libstd/sync/barrier.rs @@ -26,7 +26,7 @@ use sync::{Mutex, Condvar}; /// println!("before wait"); /// c.wait(); /// println!("after wait"); -/// }).detach(); +/// }); /// } /// ``` #[stable] @@ -126,7 +126,7 @@ mod tests { let tx = tx.clone(); Thread::spawn(move|| { tx.send(c.wait().is_leader()).unwrap(); - }).detach(); + }); } // At this point, all spawned tasks should be blocked, diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index e97be51fdbc..3c0ae71255d 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -48,7 +48,7 @@ use sync::{mutex, MutexGuard}; /// let mut started = lock.lock().unwrap(); /// *started = true; /// cvar.notify_one(); -/// }).detach(); +/// }); /// /// // wait for the thread to start up /// let &(ref lock, ref cvar) = &*pair; @@ -338,7 +338,7 @@ mod tests { cnt = cond.wait(cnt).unwrap(); } tx.send(()).unwrap(); - }).detach(); + }); } drop(tx); diff --git a/src/libstd/sync/future.rs b/src/libstd/sync/future.rs index 4c6adcc04f6..568c24446e7 100644 --- a/src/libstd/sync/future.rs +++ b/src/libstd/sync/future.rs @@ -141,7 +141,7 @@ impl<A:Send> Future<A> { Thread::spawn(move |:| { // Don't panic if the other end has hung up let _ = tx.send(blk()); - }).detach(); + }); Future::from_receiver(rx) } diff --git a/src/libstd/sync/mpsc/mod.rs b/src/libstd/sync/mpsc/mod.rs index 5dc58add665..f24a329a390 100644 --- a/src/libstd/sync/mpsc/mod.rs +++ b/src/libstd/sync/mpsc/mod.rs @@ -60,7 +60,7 @@ //! let (tx, rx) = channel(); //! Thread::spawn(move|| { //! tx.send(10i).unwrap(); -//! }).detach(); +//! }); //! assert_eq!(rx.recv().unwrap(), 10i); //! ``` //! @@ -78,7 +78,7 @@ //! let tx = tx.clone(); //! Thread::spawn(move|| { //! tx.send(i).unwrap(); -//! }).detach() +//! }); //! } //! //! for _ in range(0i, 10i) { @@ -109,7 +109,7 @@ //! Thread::spawn(move|| { //! // This will wait for the parent task to start receiving //! tx.send(53).unwrap(); -//! }).detach(); +//! }); //! rx.recv().unwrap(); //! ``` //! @@ -476,7 +476,7 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// Thread::spawn(move|| { /// # fn expensive_computation() {} /// tx.send(expensive_computation()).unwrap(); -/// }).detach(); +/// }); /// /// // Do some useful work for awhile /// @@ -518,7 +518,7 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { /// Thread::spawn(move|| { /// // this will block until the previous message has been received /// tx.send(2i).unwrap(); -/// }).detach(); +/// }); /// /// assert_eq!(rx.recv().unwrap(), 1i); /// assert_eq!(rx.recv().unwrap(), 2i); @@ -1144,7 +1144,7 @@ mod test { #[test] fn stress() { let (tx, rx) = channel::<int>(); - let t = Thread::spawn(move|| { + let t = Thread::scoped(move|| { for _ in range(0u, 10000) { tx.send(1i).unwrap(); } }); for _ in range(0u, 10000) { @@ -1159,7 +1159,7 @@ mod test { static NTHREADS: uint = 8; let (tx, rx) = channel::<int>(); - let t = Thread::spawn(move|| { + let t = Thread::scoped(move|| { for _ in range(0, AMT * NTHREADS) { assert_eq!(rx.recv().unwrap(), 1); } @@ -1173,7 +1173,7 @@ mod test { let tx = tx.clone(); Thread::spawn(move|| { for _ in range(0, AMT) { tx.send(1).unwrap(); } - }).detach(); + }); } drop(tx); t.join().ok().unwrap(); @@ -1183,14 +1183,14 @@ mod test { fn send_from_outside_runtime() { let (tx1, rx1) = channel::<()>(); let (tx2, rx2) = channel::<int>(); - let t1 = Thread::spawn(move|| { + let t1 = Thread::scoped(move|| { tx1.send(()).unwrap(); for _ in range(0i, 40) { assert_eq!(rx2.recv().unwrap(), 1); } }); rx1.recv().unwrap(); - let t2 = Thread::spawn(move|| { + let t2 = Thread::scoped(move|| { for _ in range(0i, 40) { tx2.send(1).unwrap(); } @@ -1202,7 +1202,7 @@ mod test { #[test] fn recv_from_outside_runtime() { let (tx, rx) = channel::<int>(); - let t = Thread::spawn(move|| { + let t = Thread::scoped(move|| { for _ in range(0i, 40) { assert_eq!(rx.recv().unwrap(), 1); } @@ -1217,11 +1217,11 @@ mod test { fn no_runtime() { let (tx1, rx1) = channel::<int>(); let (tx2, rx2) = channel::<int>(); - let t1 = Thread::spawn(move|| { + let t1 = Thread::scoped(move|| { assert_eq!(rx1.recv().unwrap(), 1); tx2.send(2).unwrap(); }); - let t2 = Thread::spawn(move|| { + let t2 = Thread::scoped(move|| { tx1.send(1).unwrap(); assert_eq!(rx2.recv().unwrap(), 2); }); @@ -1254,7 +1254,7 @@ mod test { #[test] fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = Thread::spawn(move|| { + let res = Thread::scoped(move|| { let (tx, rx) = channel::<int>(); drop(tx); rx.recv().unwrap(); @@ -1336,7 +1336,7 @@ mod test { let _t = Thread::spawn(move|| { drop(tx); }); - let res = Thread::spawn(move|| { + let res = Thread::scoped(move|| { assert!(rx.recv().unwrap() == box 10); }).join(); assert!(res.is_err()); @@ -1360,7 +1360,7 @@ mod test { let _t = Thread::spawn(move|| { drop(rx); }); - let _ = Thread::spawn(move|| { + let _ = Thread::scoped(move|| { tx.send(1).unwrap(); }).join(); } @@ -1371,15 +1371,15 @@ mod test { for _ in range(0, stress_factor()) { let (tx, rx) = channel::<int>(); Thread::spawn(move|| { - let res = Thread::spawn(move|| { + let res = Thread::scoped(move|| { rx.recv().unwrap(); }).join(); assert!(res.is_err()); - }).detach(); + }); let _t = Thread::spawn(move|| { Thread::spawn(move|| { drop(tx); - }).detach(); + }); }); } } @@ -1409,7 +1409,7 @@ mod test { Thread::spawn(move|| { tx.send(box i).unwrap(); send(tx, i + 1); - }).detach(); + }); } fn recv(rx: Receiver<Box<int>>, i: int) { @@ -1418,7 +1418,7 @@ mod test { Thread::spawn(move|| { assert!(rx.recv().unwrap() == box i); recv(rx, i + 1); - }).detach(); + }); } } } @@ -1439,7 +1439,7 @@ mod test { let tx = tx.clone(); Thread::spawn(move|| { tx.send(()).unwrap(); - }).detach(); + }); } for _ in range(0, total) { @@ -1644,7 +1644,7 @@ mod sync_tests { Thread::spawn(move|| { tx.send(1).unwrap(); tx.send(1).unwrap(); - }).detach(); + }); while rx.recv().is_ok() {} } @@ -1653,7 +1653,7 @@ mod sync_tests { let (tx, rx) = sync_channel::<int>(0); Thread::spawn(move|| { for _ in range(0u, 10000) { tx.send(1).unwrap(); } - }).detach(); + }); for _ in range(0u, 10000) { assert_eq!(rx.recv().unwrap(), 1); } @@ -1675,13 +1675,13 @@ mod sync_tests { _ => {} } dtx.send(()).unwrap(); - }).detach(); + }); for _ in range(0, NTHREADS) { let tx = tx.clone(); Thread::spawn(move|| { for _ in range(0, AMT) { tx.send(1).unwrap(); } - }).detach(); + }); } drop(tx); drx.recv().unwrap(); @@ -1712,7 +1712,7 @@ mod sync_tests { #[test] fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will panic - let res = Thread::spawn(move|| { + let res = Thread::scoped(move|| { let (tx, rx) = sync_channel::<int>(0); drop(tx); rx.recv().unwrap(); @@ -1800,7 +1800,7 @@ mod sync_tests { let _t = Thread::spawn(move|| { drop(tx); }); - let res = Thread::spawn(move|| { + let res = Thread::scoped(move|| { assert!(rx.recv().unwrap() == box 10); }).join(); assert!(res.is_err()); @@ -1824,7 +1824,7 @@ mod sync_tests { let _t = Thread::spawn(move|| { drop(rx); }); - let _ = Thread::spawn(move || { + let _ = Thread::scoped(move || { tx.send(1).unwrap(); }).join(); } @@ -1835,7 +1835,7 @@ mod sync_tests { for _ in range(0, stress_factor()) { let (tx, rx) = sync_channel::<int>(0); let _t = Thread::spawn(move|| { - let res = Thread::spawn(move|| { + let res = Thread::scoped(move|| { rx.recv().unwrap(); }).join(); assert!(res.is_err()); @@ -1843,7 +1843,7 @@ mod sync_tests { let _t = Thread::spawn(move|| { Thread::spawn(move|| { drop(tx); - }).detach(); + }); }); } } @@ -1873,7 +1873,7 @@ mod sync_tests { Thread::spawn(move|| { tx.send(box i).unwrap(); send(tx, i + 1); - }).detach(); + }); } fn recv(rx: Receiver<Box<int>>, i: int) { @@ -1882,7 +1882,7 @@ mod sync_tests { Thread::spawn(move|| { assert!(rx.recv().unwrap() == box i); recv(rx, i + 1); - }).detach(); + }); } } } @@ -1903,7 +1903,7 @@ mod sync_tests { let tx = tx.clone(); Thread::spawn(move|| { tx.send(()).unwrap(); - }).detach(); + }); } for _ in range(0, total) { diff --git a/src/libstd/sync/mpsc/mpsc_queue.rs b/src/libstd/sync/mpsc/mpsc_queue.rs index 9ad24a5a11e..f8eae1322bf 100644 --- a/src/libstd/sync/mpsc/mpsc_queue.rs +++ b/src/libstd/sync/mpsc/mpsc_queue.rs @@ -188,7 +188,7 @@ mod tests { q.push(i); } tx.send(()).unwrap(); - }).detach(); + }); } let mut i = 0u; diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs index 9756d086193..c1b55c6ff78 100644 --- a/src/libstd/sync/mutex.rs +++ b/src/libstd/sync/mutex.rs @@ -75,7 +75,7 @@ use sys_common::mutex as sys; /// tx.send(()).unwrap(); /// } /// // the lock is unlocked here when `data` goes out of scope. -/// }).detach(); +/// }); /// } /// /// rx.recv().unwrap(); @@ -90,7 +90,7 @@ use sys_common::mutex as sys; /// let lock = Arc::new(Mutex::new(0u)); /// let lock2 = lock.clone(); /// -/// let _ = Thread::spawn(move || -> () { +/// let _ = Thread::scoped(move || -> () { /// // This thread will acquire the mutex first, unwrapping the result of /// // `lock` because the lock has not been poisoned. /// let _lock = lock2.lock().unwrap(); @@ -376,9 +376,9 @@ mod test { let (tx, rx) = channel(); for _ in range(0, K) { let tx2 = tx.clone(); - Thread::spawn(move|| { inc(); tx2.send(()).unwrap(); }).detach(); + Thread::spawn(move|| { inc(); tx2.send(()).unwrap(); }); let tx2 = tx.clone(); - Thread::spawn(move|| { inc(); tx2.send(()).unwrap(); }).detach(); + Thread::spawn(move|| { inc(); tx2.send(()).unwrap(); }); } drop(tx); @@ -453,7 +453,7 @@ mod test { fn test_mutex_arc_poison() { let arc = Arc::new(Mutex::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| { + let _ = Thread::scoped(move|| { let lock = arc2.lock().unwrap(); assert_eq!(*lock, 2); }).join(); @@ -480,7 +480,7 @@ mod test { fn test_mutex_arc_access_in_unwind() { let arc = Arc::new(Mutex::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| -> () { + let _ = Thread::scoped(move|| -> () { struct Unwinder { i: Arc<Mutex<int>>, } diff --git a/src/libstd/sync/once.rs b/src/libstd/sync/once.rs index 15ca4783700..3bf2ae277e0 100644 --- a/src/libstd/sync/once.rs +++ b/src/libstd/sync/once.rs @@ -159,7 +159,7 @@ mod test { assert!(run); } tx.send(()).unwrap(); - }).detach(); + }); } unsafe { diff --git a/src/libstd/sync/rwlock.rs b/src/libstd/sync/rwlock.rs index 36f9d4228b3..7db2111cc46 100644 --- a/src/libstd/sync/rwlock.rs +++ b/src/libstd/sync/rwlock.rs @@ -411,7 +411,7 @@ mod tests { } } drop(tx); - }).detach(); + }); } drop(tx); let _ = rx.recv(); @@ -422,7 +422,7 @@ mod tests { fn test_rw_arc_poison_wr() { let arc = Arc::new(RwLock::new(1i)); let arc2 = arc.clone(); - let _: Result<uint, _> = Thread::spawn(move|| { + let _: Result<uint, _> = Thread::scoped(move|| { let _lock = arc2.write().unwrap(); panic!(); }).join(); @@ -433,7 +433,7 @@ mod tests { fn test_rw_arc_poison_ww() { let arc = Arc::new(RwLock::new(1i)); let arc2 = arc.clone(); - let _: Result<uint, _> = Thread::spawn(move|| { + let _: Result<uint, _> = Thread::scoped(move|| { let _lock = arc2.write().unwrap(); panic!(); }).join(); @@ -444,7 +444,7 @@ mod tests { fn test_rw_arc_no_poison_rr() { let arc = Arc::new(RwLock::new(1i)); let arc2 = arc.clone(); - let _: Result<uint, _> = Thread::spawn(move|| { + let _: Result<uint, _> = Thread::scoped(move|| { let _lock = arc2.read().unwrap(); panic!(); }).join(); @@ -455,7 +455,7 @@ mod tests { fn test_rw_arc_no_poison_rw() { let arc = Arc::new(RwLock::new(1i)); let arc2 = arc.clone(); - let _: Result<uint, _> = Thread::spawn(move|| { + let _: Result<uint, _> = Thread::scoped(move|| { let _lock = arc2.read().unwrap(); panic!() }).join(); @@ -478,13 +478,13 @@ mod tests { *lock = tmp + 1; } tx.send(()).unwrap(); - }).detach(); + }); // Readers try to catch the writer in the act let mut children = Vec::new(); for _ in range(0u, 5) { let arc3 = arc.clone(); - children.push(Thread::spawn(move|| { + children.push(Thread::scoped(move|| { let lock = arc3.read().unwrap(); assert!(*lock >= 0); })); @@ -505,7 +505,7 @@ mod tests { fn test_rw_arc_access_in_unwind() { let arc = Arc::new(RwLock::new(1i)); let arc2 = arc.clone(); - let _ = Thread::spawn(move|| -> () { + let _ = Thread::scoped(move|| -> () { struct Unwinder { i: Arc<RwLock<int>>, } diff --git a/src/libstd/sync/semaphore.rs b/src/libstd/sync/semaphore.rs index 505819fbf8a..8d44084671a 100644 --- a/src/libstd/sync/semaphore.rs +++ b/src/libstd/sync/semaphore.rs @@ -193,7 +193,7 @@ mod tests { tx.send(()).unwrap(); drop(s2.access()); tx.send(()).unwrap(); - }).detach(); + }); rx.recv().unwrap(); // wait for child to come alive } rx.recv().unwrap(); // wait for child to be done diff --git a/src/libstd/sync/task_pool.rs b/src/libstd/sync/task_pool.rs index 088827dc084..278528bdb38 100644 --- a/src/libstd/sync/task_pool.rs +++ b/src/libstd/sync/task_pool.rs @@ -132,7 +132,7 @@ fn spawn_in_pool(jobs: Arc<Mutex<Receiver<Thunk>>>) { } sentinel.cancel(); - }).detach(); + }); } #[cfg(test)] diff --git a/src/libstd/sys/common/helper_thread.rs b/src/libstd/sys/common/helper_thread.rs index bdf1bf3dfd0..f940b6ed368 100644 --- a/src/libstd/sys/common/helper_thread.rs +++ b/src/libstd/sys/common/helper_thread.rs @@ -99,7 +99,7 @@ impl<M: Send> Helper<M> { let _g = self.lock.lock().unwrap(); *self.shutdown.get() = true; self.cond.notify_one() - }).detach(); + }); rt::at_exit(move|:| { self.shutdown() }); *self.initialized.get() = true; diff --git a/src/libstd/thread.rs b/src/libstd/thread.rs index 43abd96e46d..71321ed3b71 100644 --- a/src/libstd/thread.rs +++ b/src/libstd/thread.rs @@ -16,7 +16,7 @@ //! each with their own stack and local state. //! //! Communication between threads can be done through -//! [channels](../../std/comm/index.html), Rust's message-passing +//! [channels](../../std/sync/mpsc/index.html), Rust's message-passing //! types, along with [other forms of thread //! synchronization](../../std/sync/index.html) and shared-memory data //! structures. In particular, types that are guaranteed to be @@ -58,25 +58,45 @@ //! ```rust //! use std::thread::Thread; //! -//! let guard = Thread::spawn(move || { +//! let thread = Thread::spawn(move || { //! println!("Hello, World!"); //! // some computation here //! }); -//! let result = guard.join(); //! ``` //! -//! The `spawn` function doesn't return a `Thread` directly; instead, it returns -//! a *join guard* from which a `Thread` can be extracted. The join guard is an -//! RAII-style guard that will automatically join the child thread (block until -//! it terminates) when it is dropped. You can join the child thread in advance -//! by calling the `join` method on the guard, which will also return the result -//! produced by the thread. +//! The spawned thread is "detached" from the current thread, meaning that it +//! can outlive the thread that spawned it. (Note, however, that when the main +//! thread terminates all detached threads are terminated as well.) The returned +//! `Thread` handle can be used for low-level synchronization as described below. +//! +//! ## Scoped threads //! -//! If you instead wish to *detach* the child thread, allowing it to outlive its -//! parent, you can use the `detach` method on the guard, +//! Often a parent thread uses a child thread to perform some particular task, +//! and at some point must wait for the child to complete before continuing. +//! For this scenario, use the `scoped` constructor: //! -//! A handle to the thread itself is available via the `thread` method on the -//! join guard. +//! ```rust +//! use std::thread::Thread; +//! +//! let guard = Thread::scoped(move || { +//! println!("Hello, World!"); +//! // some computation here +//! }); +//! // do some other work in the meantime +//! let result = guard.join(); +//! ``` +//! +//! The `scoped` function doesn't return a `Thread` directly; instead, it +//! returns a *join guard* from which a `Thread` can be extracted. The join +//! guard is an RAII-style guard that will automatically join the child thread +//! (block until it terminates) when it is dropped. You can join the child +//! thread in advance by calling the `join` method on the guard, which will also +//! return the result produced by the thread. A handle to the thread itself is +//! available via the `thread` method on the join guard. +//! +//! (Note: eventually, the `scoped` constructor will allow the parent and child +//! threads to data that lives on the parent thread's stack, but some language +//! changes are needed before this is possible.) //! //! ## Configuring threads //! @@ -89,7 +109,7 @@ //! //! thread::Builder::new().name("child1".to_string()).spawn(move || { //! println!("Hello, world!") -//! }).detach(); +//! }); //! ``` //! //! ## Blocking support: park and unpark @@ -124,6 +144,8 @@ //! //! * It can be implemented highly efficiently on many platforms. +#![stable] + use any::Any; use boxed::Box; use cell::UnsafeCell; @@ -144,6 +166,7 @@ use sys_common::{stack, thread_info}; /// Thread configuation. Provides detailed control over the properties /// and behavior of new threads. +#[stable] pub struct Builder { // A name for the thread-to-be, for identification in panic messages name: Option<String>, @@ -158,6 +181,7 @@ pub struct Builder { impl Builder { /// Generate the base configuration for spawning a thread, from which /// configuration methods can be chained. + #[stable] pub fn new() -> Builder { Builder { name: None, @@ -169,12 +193,14 @@ impl Builder { /// Name the thread-to-be. Currently the name is used for identification /// only in panic messages. + #[stable] pub fn name(mut self, name: String) -> Builder { self.name = Some(name); self } /// Set the size of the stack for the new thread. + #[stable] pub fn stack_size(mut self, size: uint) -> Builder { self.stack_size = Some(size); self @@ -194,19 +220,41 @@ impl Builder { self } - /// Spawn a new joinable thread, and return a JoinGuard guard for it. + /// Spawn a new detached thread, and return a handle to it. /// /// See `Thead::spawn` and the module doc for more details. - pub fn spawn<T, F>(self, f: F) -> JoinGuard<T> where - T: Send, F: FnOnce() -> T, F: Send - { - self.spawn_inner(Thunk::new(f)) + #[unstable = "may change with specifics of new Send semantics"] + pub fn spawn<F>(self, f: F) -> Thread where F: FnOnce(), F: Send + 'static { + let (native, thread) = self.spawn_inner(Thunk::new(f), Thunk::with_arg(|_| {})); + unsafe { imp::detach(native) }; + thread } - fn spawn_inner<T: Send>(self, f: Thunk<(), T>) -> JoinGuard<T> { + /// Spawn a new child thread that must be joined within a given + /// scope, and return a `JoinGuard`. + /// + /// See `Thead::scoped` and the module doc for more details. + #[unstable = "may change with specifics of new Send semantics"] + pub fn scoped<'a, T, F>(self, f: F) -> JoinGuard<'a, T> where + T: Send + 'a, F: FnOnce() -> T, F: Send + 'a + { let my_packet = Packet(Arc::new(UnsafeCell::new(None))); let their_packet = Packet(my_packet.0.clone()); + let (native, thread) = self.spawn_inner(Thunk::new(f), Thunk::with_arg(move |: ret| unsafe { + *their_packet.0.get() = Some(ret); + })); + JoinGuard { + native: native, + joined: false, + packet: my_packet, + thread: thread, + } + } + + fn spawn_inner<T: Send>(self, f: Thunk<(), T>, finish: Thunk<Result<T>, ()>) + -> (imp::rust_thread, Thread) + { let Builder { name, stack_size, stdout, stderr } = self; let stack_size = stack_size.unwrap_or(rt::min_stack()); @@ -258,21 +306,14 @@ impl Builder { unwind::try(move || *ptr = Some(f.invoke(()))) } }; - unsafe { - *their_packet.0.get() = Some(match (output, try_result) { - (Some(data), Ok(_)) => Ok(data), - (None, Err(cause)) => Err(cause), - _ => unreachable!() - }); - } + finish.invoke(match (output, try_result) { + (Some(data), Ok(_)) => Ok(data), + (None, Err(cause)) => Err(cause), + _ => unreachable!() + }); }; - JoinGuard { - native: unsafe { imp::create(stack_size, Thunk::new(main)) }, - joined: false, - packet: my_packet, - thread: my_thread, - } + (unsafe { imp::create(stack_size, Thunk::new(main)) }, my_thread) } } @@ -285,13 +326,12 @@ struct Inner { unsafe impl Sync for Inner {} #[derive(Clone)] +#[stable] /// A handle to a thread. pub struct Thread { inner: Arc<Inner>, } -unsafe impl Sync for Thread {} - impl Thread { // Used only internally to construct a thread object without spawning fn new(name: Option<String>) -> Thread { @@ -304,30 +344,47 @@ impl Thread { } } - /// Spawn a new joinable thread, returning a `JoinGuard` for it. + /// Spawn a new detached thread, returning a handle to it. + /// + /// The child thread may outlive the parent (unless the parent thread is the + /// main thread; the whole process is terminated when the main thread + /// finishes.) The thread handle can be used for low-level + /// synchronization. See the module documentation for additional details. + #[unstable = "may change with specifics of new Send semantics"] + pub fn spawn<F>(f: F) -> Thread where F: FnOnce(), F: Send + 'static { + Builder::new().spawn(f) + } + + /// Spawn a new *scoped* thread, returning a `JoinGuard` for it. /// /// The join guard can be used to explicitly join the child thread (via /// `join`), returning `Result<T>`, or it will implicitly join the child - /// upon being dropped. To detach the child, allowing it to outlive the - /// current thread, use `detach`. See the module documentation for additional details. - pub fn spawn<T, F>(f: F) -> JoinGuard<T> where - T: Send, F: FnOnce() -> T, F: Send + /// upon being dropped. Because the child thread may refer to data on the + /// current thread's stack (hence the "scoped" name), it cannot be detached; + /// it *must* be joined before the relevant stack frame is popped. See the + /// module documentation for additional details. + #[unstable = "may change with specifics of new Send semantics"] + pub fn scoped<'a, T, F>(f: F) -> JoinGuard<'a, T> where + T: Send + 'a, F: FnOnce() -> T, F: Send + 'a { - Builder::new().spawn(f) + Builder::new().scoped(f) } /// Gets a handle to the thread that invokes it. + #[stable] pub fn current() -> Thread { thread_info::current_thread() } /// Cooperatively give up a timeslice to the OS scheduler. + #[unstable = "name may change"] pub fn yield_now() { unsafe { imp::yield_now() } } /// Determines whether the current thread is panicking. #[inline] + #[stable] pub fn panicking() -> bool { unwind::panicking() } @@ -341,6 +398,7 @@ impl Thread { // future, this will be implemented in a more efficient way, perhaps along the lines of // http://cr.openjdk.java.net/~stefank/6989984.1/raw_files/new/src/os/linux/vm/os_linux.cpp // or futuxes, and in either case may allow spurious wakeups. + #[unstable = "recently introduced"] pub fn park() { let thread = Thread::current(); let mut guard = thread.inner.lock.lock().unwrap(); @@ -353,6 +411,7 @@ impl Thread { /// Atomically makes the handle's token available if it is not already. /// /// See the module doc for more detail. + #[unstable = "recently introduced"] pub fn unpark(&self) { let mut guard = self.inner.lock.lock().unwrap(); if !*guard { @@ -362,6 +421,7 @@ impl Thread { } /// Get the thread's name. + #[stable] pub fn name(&self) -> Option<&str> { self.inner.name.as_ref().map(|s| s.as_slice()) } @@ -375,6 +435,7 @@ impl thread_info::NewThread for Thread { /// Indicates the manner in which a thread exited. /// /// A thread that completes without panicking is considered to exit successfully. +#[stable] pub type Result<T> = ::result::Result<T, Box<Any + Send>>; struct Packet<T>(Arc<UnsafeCell<Option<Result<T>>>>); @@ -382,21 +443,24 @@ struct Packet<T>(Arc<UnsafeCell<Option<Result<T>>>>); unsafe impl<T:'static+Send> Send for Packet<T> {} unsafe impl<T> Sync for Packet<T> {} -#[must_use] /// An RAII-style guard that will block until thread termination when dropped. /// /// The type `T` is the return type for the thread's main function. -pub struct JoinGuard<T> { +#[must_use] +#[unstable = "may change with specifics of new Send semantics"] +pub struct JoinGuard<'a, T: 'a> { native: imp::rust_thread, thread: Thread, joined: bool, packet: Packet<T>, } -unsafe impl<T: Send> Sync for JoinGuard<T> {} +#[stable] +unsafe impl<'a, T: Send + 'a> Sync for JoinGuard<'a, T> {} -impl<T: Send> JoinGuard<T> { +impl<'a, T: Send + 'a> JoinGuard<'a, T> { /// Extract a handle to the thread this guard will join on. + #[stable] pub fn thread(&self) -> &Thread { &self.thread } @@ -406,6 +470,7 @@ impl<T: Send> JoinGuard<T> { /// /// If the child thread panics, `Err` is returned with the parameter given /// to `panic`. + #[stable] pub fn join(mut self) -> Result<T> { assert!(!self.joined); unsafe { imp::join(self.native) }; @@ -414,8 +479,11 @@ impl<T: Send> JoinGuard<T> { (*self.packet.0.get()).take().unwrap() } } +} +impl<T: Send> JoinGuard<'static, T> { /// Detaches the child thread, allowing it to outlive its parent. + #[experimental = "unsure whether this API imposes limitations elsewhere"] pub fn detach(mut self) { unsafe { imp::detach(self.native) }; self.joined = true; // avoid joining in the destructor @@ -424,7 +492,7 @@ impl<T: Send> JoinGuard<T> { #[unsafe_destructor] #[stable] -impl<T: Send> Drop for JoinGuard<T> { +impl<'a, T: Send + 'a> Drop for JoinGuard<'a, T> { fn drop(&mut self) { if !self.joined { unsafe { imp::join(self.native) }; @@ -449,14 +517,14 @@ mod test { #[test] fn test_unnamed_thread() { - Thread::spawn(move|| { + Thread::scoped(move|| { assert!(Thread::current().name().is_none()); }).join().map_err(|_| ()).unwrap(); } #[test] fn test_named_thread() { - Builder::new().name("ada lovelace".to_string()).spawn(move|| { + Builder::new().name("ada lovelace".to_string()).scoped(move|| { assert!(Thread::current().name().unwrap() == "ada lovelace".to_string()); }).join().map_err(|_| ()).unwrap(); } @@ -466,13 +534,13 @@ mod test { let (tx, rx) = channel(); Thread::spawn(move|| { tx.send(()).unwrap(); - }).detach(); + }); rx.recv().unwrap(); } #[test] fn test_join_success() { - match Thread::spawn(move|| -> String { + match Thread::scoped(move|| -> String { "Success!".to_string() }).join().as_ref().map(|s| s.as_slice()) { result::Result::Ok("Success!") => (), @@ -482,7 +550,7 @@ mod test { #[test] fn test_join_panic() { - match Thread::spawn(move|| { + match Thread::scoped(move|| { panic!() }).join() { result::Result::Err(_) => (), @@ -504,7 +572,7 @@ mod test { } else { f(i - 1, tx); } - }).detach(); + }); } f(10, tx); @@ -518,8 +586,8 @@ mod test { Thread::spawn(move|| { Thread::spawn(move|| { tx.send(()).unwrap(); - }).detach(); - }).detach(); + }); + }); rx.recv().unwrap(); } @@ -542,7 +610,7 @@ mod test { #[test] fn test_avoid_copying_the_body_spawn() { avoid_copying_the_body(|v| { - Thread::spawn(move || v.invoke(())).detach(); + Thread::spawn(move || v.invoke(())); }); } @@ -551,14 +619,14 @@ mod test { avoid_copying_the_body(|f| { Thread::spawn(move|| { f.invoke(()); - }).detach(); + }); }) } #[test] fn test_avoid_copying_the_body_join() { avoid_copying_the_body(|f| { - let _ = Thread::spawn(move|| { + let _ = Thread::scoped(move|| { f.invoke(()) }).join(); }) @@ -574,21 +642,21 @@ mod test { fn child_no(x: uint) -> Thunk { return Thunk::new(move|| { if x < GENERATIONS { - Thread::spawn(move|| child_no(x+1).invoke(())).detach(); + Thread::spawn(move|| child_no(x+1).invoke(())); } }); } - Thread::spawn(|| child_no(0).invoke(())).detach(); + Thread::spawn(|| child_no(0).invoke(())); } #[test] fn test_simple_newsched_spawn() { - Thread::spawn(move || {}).detach(); + Thread::spawn(move || {}); } #[test] fn test_try_panic_message_static_str() { - match Thread::spawn(move|| { + match Thread::scoped(move|| { panic!("static string"); }).join() { Err(e) => { @@ -602,7 +670,7 @@ mod test { #[test] fn test_try_panic_message_owned_str() { - match Thread::spawn(move|| { + match Thread::scoped(move|| { panic!("owned string".to_string()); }).join() { Err(e) => { @@ -616,7 +684,7 @@ mod test { #[test] fn test_try_panic_message_any() { - match Thread::spawn(move|| { + match Thread::scoped(move|| { panic!(box 413u16 as Box<Any + Send>); }).join() { Err(e) => { @@ -634,7 +702,7 @@ mod test { fn test_try_panic_message_unit_struct() { struct Juju; - match Thread::spawn(move|| { + match Thread::scoped(move|| { panic!(Juju) }).join() { Err(ref e) if e.is::<Juju>() => {} @@ -648,7 +716,7 @@ mod test { let mut reader = ChanReader::new(rx); let stdout = ChanWriter::new(tx); - let r = Builder::new().stdout(box stdout as Box<Writer + Send>).spawn(move|| { + let r = Builder::new().stdout(box stdout as Box<Writer + Send>).scoped(move|| { print!("Hello, world!"); }).join(); assert!(r.is_ok()); diff --git a/src/libstd/thread_local/mod.rs b/src/libstd/thread_local/mod.rs index 4d47703d30f..e7c4e4ccdfb 100644 --- a/src/libstd/thread_local/mod.rs +++ b/src/libstd/thread_local/mod.rs @@ -86,7 +86,7 @@ pub mod __impl { /// assert_eq!(*f.borrow(), 1); /// *f.borrow_mut() = 3; /// }); -/// }).detach(); +/// }); /// /// // we retain our original value of 2 despite the child thread /// FOO.with(|f| { @@ -580,7 +580,7 @@ mod tests { } thread_local!(static FOO: Foo = foo()); - Thread::spawn(|| { + Thread::scoped(|| { assert!(FOO.state() == State::Uninitialized); FOO.with(|_| { assert!(FOO.state() == State::Valid); @@ -644,7 +644,7 @@ mod tests { } } - Thread::spawn(move|| { + Thread::scoped(move|| { drop(S1); }).join().ok().unwrap(); } @@ -662,7 +662,7 @@ mod tests { } } - Thread::spawn(move|| unsafe { + Thread::scoped(move|| unsafe { K1.with(|s| *s.get() = Some(S1)); }).join().ok().unwrap(); } |
