diff options
Diffstat (limited to 'library/std/src')
78 files changed, 2542 insertions, 546 deletions
diff --git a/library/std/src/collections/hash/map.rs b/library/std/src/collections/hash/map.rs index 1a18721b15e..6f2b4100620 100644 --- a/library/std/src/collections/hash/map.rs +++ b/library/std/src/collections/hash/map.rs @@ -909,8 +909,11 @@ where /// Attempts to get mutable references to `N` values in the map at once. /// /// Returns an array of length `N` with the results of each query. For soundness, at most one - /// mutable reference will be returned to any value. `None` will be returned if any of the - /// keys are duplicates or missing. + /// mutable reference will be returned to any value. `None` will be used if the key is missing. + /// + /// # Panics + /// + /// Panics if any keys are overlapping. /// /// # Examples /// @@ -924,16 +927,23 @@ where /// libraries.insert("Herzogin-Anna-Amalia-Bibliothek".to_string(), 1691); /// libraries.insert("Library of Congress".to_string(), 1800); /// + /// // Get Athenæum and Bodleian Library + /// let [Some(a), Some(b)] = libraries.get_many_mut([ + /// "Athenæum", + /// "Bodleian Library", + /// ]) else { panic!() }; + /// + /// // Assert values of Athenæum and Library of Congress /// let got = libraries.get_many_mut([ /// "Athenæum", /// "Library of Congress", /// ]); /// assert_eq!( /// got, - /// Some([ - /// &mut 1807, - /// &mut 1800, - /// ]), + /// [ + /// Some(&mut 1807), + /// Some(&mut 1800), + /// ], /// ); /// /// // Missing keys result in None @@ -941,18 +951,31 @@ where /// "Athenæum", /// "New York Public Library", /// ]); - /// assert_eq!(got, None); + /// assert_eq!( + /// got, + /// [ + /// Some(&mut 1807), + /// None + /// ] + /// ); + /// ``` + /// + /// ```should_panic + /// #![feature(map_many_mut)] + /// use std::collections::HashMap; /// - /// // Duplicate keys result in None + /// let mut libraries = HashMap::new(); + /// libraries.insert("Athenæum".to_string(), 1807); + /// + /// // Duplicate keys panic! /// let got = libraries.get_many_mut([ /// "Athenæum", /// "Athenæum", /// ]); - /// assert_eq!(got, None); /// ``` #[inline] #[unstable(feature = "map_many_mut", issue = "97601")] - pub fn get_many_mut<Q: ?Sized, const N: usize>(&mut self, ks: [&Q; N]) -> Option<[&'_ mut V; N]> + pub fn get_many_mut<Q: ?Sized, const N: usize>(&mut self, ks: [&Q; N]) -> [Option<&'_ mut V>; N] where K: Borrow<Q>, Q: Hash + Eq, @@ -963,10 +986,10 @@ where /// Attempts to get mutable references to `N` values in the map at once, without validating that /// the values are unique. /// - /// Returns an array of length `N` with the results of each query. `None` will be returned if - /// any of the keys are missing. + /// Returns an array of length `N` with the results of each query. `None` will be used if + /// the key is missing. /// - /// For a safe alternative see [`get_many_mut`](Self::get_many_mut). + /// For a safe alternative see [`get_many_mut`](`HashMap::get_many_mut`). /// /// # Safety /// @@ -987,31 +1010,39 @@ where /// libraries.insert("Herzogin-Anna-Amalia-Bibliothek".to_string(), 1691); /// libraries.insert("Library of Congress".to_string(), 1800); /// - /// let got = libraries.get_many_mut([ + /// // SAFETY: The keys do not overlap. + /// let [Some(a), Some(b)] = (unsafe { libraries.get_many_unchecked_mut([ + /// "Athenæum", + /// "Bodleian Library", + /// ]) }) else { panic!() }; + /// + /// // SAFETY: The keys do not overlap. + /// let got = unsafe { libraries.get_many_unchecked_mut([ /// "Athenæum", /// "Library of Congress", - /// ]); + /// ]) }; /// assert_eq!( /// got, - /// Some([ - /// &mut 1807, - /// &mut 1800, - /// ]), + /// [ + /// Some(&mut 1807), + /// Some(&mut 1800), + /// ], /// ); /// - /// // Missing keys result in None - /// let got = libraries.get_many_mut([ + /// // SAFETY: The keys do not overlap. + /// let got = unsafe { libraries.get_many_unchecked_mut([ /// "Athenæum", /// "New York Public Library", - /// ]); - /// assert_eq!(got, None); + /// ]) }; + /// // Missing keys result in None + /// assert_eq!(got, [Some(&mut 1807), None]); /// ``` #[inline] #[unstable(feature = "map_many_mut", issue = "97601")] pub unsafe fn get_many_unchecked_mut<Q: ?Sized, const N: usize>( &mut self, ks: [&Q; N], - ) -> Option<[&'_ mut V; N]> + ) -> [Option<&'_ mut V>; N] where K: Borrow<Q>, Q: Hash + Eq, @@ -2978,64 +3009,6 @@ impl<'a, K, V> OccupiedEntry<'a, K, V> { pub fn remove(self) -> V { self.base.remove() } - - /// Replaces the entry, returning the old key and value. The new key in the hash map will be - /// the key used to create this entry. - /// - /// # Examples - /// - /// ``` - /// #![feature(map_entry_replace)] - /// use std::collections::hash_map::{Entry, HashMap}; - /// use std::rc::Rc; - /// - /// let mut map: HashMap<Rc<String>, u32> = HashMap::new(); - /// map.insert(Rc::new("Stringthing".to_string()), 15); - /// - /// let my_key = Rc::new("Stringthing".to_string()); - /// - /// if let Entry::Occupied(entry) = map.entry(my_key) { - /// // Also replace the key with a handle to our other key. - /// let (old_key, old_value): (Rc<String>, u32) = entry.replace_entry(16); - /// } - /// - /// ``` - #[inline] - #[unstable(feature = "map_entry_replace", issue = "44286")] - pub fn replace_entry(self, value: V) -> (K, V) { - self.base.replace_entry(value) - } - - /// Replaces the key in the hash map with the key used to create this entry. - /// - /// # Examples - /// - /// ``` - /// #![feature(map_entry_replace)] - /// use std::collections::hash_map::{Entry, HashMap}; - /// use std::rc::Rc; - /// - /// let mut map: HashMap<Rc<String>, u32> = HashMap::new(); - /// let known_strings: Vec<Rc<String>> = Vec::new(); - /// - /// // Initialise known strings, run program, etc. - /// - /// reclaim_memory(&mut map, &known_strings); - /// - /// fn reclaim_memory(map: &mut HashMap<Rc<String>, u32>, known_strings: &[Rc<String>] ) { - /// for s in known_strings { - /// if let Entry::Occupied(entry) = map.entry(Rc::clone(s)) { - /// // Replaces the entry's key with our version of it in `known_strings`. - /// entry.replace_key(); - /// } - /// } - /// } - /// ``` - #[inline] - #[unstable(feature = "map_entry_replace", issue = "44286")] - pub fn replace_key(self) -> K { - self.base.replace_key() - } } impl<'a, K: 'a, V: 'a> VacantEntry<'a, K, V> { diff --git a/library/std/src/collections/hash/map/tests.rs b/library/std/src/collections/hash/map/tests.rs index c28dd7b6b50..fa8ea95b891 100644 --- a/library/std/src/collections/hash/map/tests.rs +++ b/library/std/src/collections/hash/map/tests.rs @@ -274,7 +274,7 @@ fn test_lots_of_insertions() { for _ in 0..loops { assert!(m.is_empty()); - let count = if cfg!(miri) { 101 } else { 1001 }; + let count = if cfg!(miri) { 66 } else { 1001 }; for i in 1..count { assert!(m.insert(i, i).is_none()); @@ -1018,6 +1018,7 @@ mod test_extract_if { } #[test] + #[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn drop_panic_leak() { static PREDS: AtomicUsize = AtomicUsize::new(0); static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -1047,6 +1048,7 @@ mod test_extract_if { } #[test] + #[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn pred_panic_leak() { static PREDS: AtomicUsize = AtomicUsize::new(0); static DROPS: AtomicUsize = AtomicUsize::new(0); @@ -1076,6 +1078,7 @@ mod test_extract_if { // Same as above, but attempt to use the iterator again after the panic in the predicate #[test] + #[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn pred_panic_reuse() { static PREDS: AtomicUsize = AtomicUsize::new(0); static DROPS: AtomicUsize = AtomicUsize::new(0); diff --git a/library/std/src/collections/hash/set.rs b/library/std/src/collections/hash/set.rs index 4a113ddea3a..e69fb0878e7 100644 --- a/library/std/src/collections/hash/set.rs +++ b/library/std/src/collections/hash/set.rs @@ -724,38 +724,6 @@ where self.base.get_or_insert(value) } - /// Inserts an owned copy of the given `value` into the set if it is not - /// present, then returns a reference to the value in the set. - /// - /// # Examples - /// - /// ``` - /// #![feature(hash_set_entry)] - /// - /// use std::collections::HashSet; - /// - /// let mut set: HashSet<String> = ["cat", "dog", "horse"] - /// .iter().map(|&pet| pet.to_owned()).collect(); - /// - /// assert_eq!(set.len(), 3); - /// for &pet in &["cat", "dog", "fish"] { - /// let value = set.get_or_insert_owned(pet); - /// assert_eq!(value, pet); - /// } - /// assert_eq!(set.len(), 4); // a new "fish" was inserted - /// ``` - #[inline] - #[unstable(feature = "hash_set_entry", issue = "60896")] - pub fn get_or_insert_owned<Q: ?Sized>(&mut self, value: &Q) -> &T - where - T: Borrow<Q>, - Q: Hash + Eq + ToOwned<Owned = T>, - { - // Although the raw entry gives us `&mut T`, we only return `&T` to be consistent with - // `get`. Key mutation is "raw" because you're not supposed to affect `Eq` or `Hash`. - self.base.get_or_insert_owned(value) - } - /// Inserts a value computed from `f` into the set if the given `value` is /// not present, then returns a reference to the value in the set. /// diff --git a/library/std/src/collections/hash/set/tests.rs b/library/std/src/collections/hash/set/tests.rs index 7aa2167e213..8ee8a3e8bf6 100644 --- a/library/std/src/collections/hash/set/tests.rs +++ b/library/std/src/collections/hash/set/tests.rs @@ -429,6 +429,7 @@ fn test_extract_if() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn test_extract_if_drop_panic_leak() { static PREDS: AtomicU32 = AtomicU32::new(0); static DROPS: AtomicU32 = AtomicU32::new(0); @@ -459,6 +460,7 @@ fn test_extract_if_drop_panic_leak() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn test_extract_if_pred_panic_leak() { static PREDS: AtomicU32 = AtomicU32::new(0); static DROPS: AtomicU32 = AtomicU32::new(0); diff --git a/library/std/src/env/tests.rs b/library/std/src/env/tests.rs index fc7aee29733..d0217261068 100644 --- a/library/std/src/env/tests.rs +++ b/library/std/src/env/tests.rs @@ -1,7 +1,7 @@ use super::*; #[test] -#[cfg_attr(any(target_os = "emscripten", target_env = "sgx"), ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi", target_env = "sgx"), ignore)] fn test_self_exe_path() { let path = current_exe(); assert!(path.is_ok()); diff --git a/library/std/src/ffi/os_str.rs b/library/std/src/ffi/os_str.rs index 0f905803bb8..2243f100643 100644 --- a/library/std/src/ffi/os_str.rs +++ b/library/std/src/ffi/os_str.rs @@ -9,7 +9,6 @@ use crate::borrow::{Borrow, Cow}; use crate::collections::TryReserveError; use crate::hash::{Hash, Hasher}; use crate::ops::{self, Range}; -use crate::ptr::addr_of_mut; use crate::rc::Rc; use crate::str::FromStr; use crate::sync::Arc; @@ -1272,7 +1271,7 @@ unsafe impl CloneToUninit for OsStr { #[cfg_attr(debug_assertions, track_caller)] unsafe fn clone_to_uninit(&self, dst: *mut Self) { // SAFETY: we're just a wrapper around a platform-specific Slice - unsafe { self.inner.clone_to_uninit(addr_of_mut!((*dst).inner)) } + unsafe { self.inner.clone_to_uninit(&raw mut (*dst).inner) } } } diff --git a/library/std/src/fs.rs b/library/std/src/fs.rs index db7867337dd..124ef121b18 100644 --- a/library/std/src/fs.rs +++ b/library/std/src/fs.rs @@ -8,7 +8,15 @@ #![stable(feature = "rust1", since = "1.0.0")] #![deny(unsafe_op_in_unsafe_fn)] -#[cfg(all(test, not(any(target_os = "emscripten", target_env = "sgx", target_os = "xous"))))] +#[cfg(all( + test, + not(any( + target_os = "emscripten", + target_os = "wasi", + target_env = "sgx", + target_os = "xous" + )) +))] mod tests; use crate::ffi::OsString; diff --git a/library/std/src/fs/tests.rs b/library/std/src/fs/tests.rs index 412603ddea3..0672fe6f771 100644 --- a/library/std/src/fs/tests.rs +++ b/library/std/src/fs/tests.rs @@ -1732,7 +1732,7 @@ fn windows_unix_socket_exists() { let bytes = core::slice::from_raw_parts(bytes.as_ptr().cast::<i8>(), bytes.len()); addr.sun_path[..bytes.len()].copy_from_slice(bytes); let len = mem::size_of_val(&addr) as i32; - let result = c::bind(socket, ptr::addr_of!(addr).cast::<c::SOCKADDR>(), len); + let result = c::bind(socket, (&raw const addr).cast::<c::SOCKADDR>(), len); c::closesocket(socket); assert_eq!(result, 0); } diff --git a/library/std/src/io/buffered/bufreader.rs b/library/std/src/io/buffered/bufreader.rs index fcb3e36027b..8b46738ab8a 100644 --- a/library/std/src/io/buffered/bufreader.rs +++ b/library/std/src/io/buffered/bufreader.rs @@ -357,7 +357,7 @@ impl<R: ?Sized + Read> Read for BufReader<R> { let prev = cursor.written(); let mut rem = self.fill_buf()?; - rem.read_buf(cursor.reborrow())?; + rem.read_buf(cursor.reborrow())?; // actually never fails self.consume(cursor.written() - prev); //slice impl of read_buf known to never unfill buf diff --git a/library/std/src/io/buffered/bufreader/buffer.rs b/library/std/src/io/buffered/bufreader/buffer.rs index 3df7e3971da..52fe49985c6 100644 --- a/library/std/src/io/buffered/bufreader/buffer.rs +++ b/library/std/src/io/buffered/bufreader/buffer.rs @@ -143,11 +143,13 @@ impl Buffer { buf.set_init(self.initialized); } - reader.read_buf(buf.unfilled())?; + let result = reader.read_buf(buf.unfilled()); self.pos = 0; self.filled = buf.len(); self.initialized = buf.init_len(); + + result?; } Ok(self.buffer()) } diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs index d89ecd317d6..bff0f823c4b 100644 --- a/library/std/src/io/buffered/tests.rs +++ b/library/std/src/io/buffered/tests.rs @@ -164,6 +164,7 @@ fn test_buffered_reader_stream_position() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn test_buffered_reader_stream_position_panic() { let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(4, io::Cursor::new(inner)); @@ -487,7 +488,7 @@ fn dont_panic_in_drop_on_panicked_flush() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn panic_in_write_doesnt_flush_in_drop() { static WRITES: AtomicUsize = AtomicUsize::new(0); diff --git a/library/std/src/io/error/repr_bitpacked.rs b/library/std/src/io/error/repr_bitpacked.rs index 80ba8455df3..a839a2fbac1 100644 --- a/library/std/src/io/error/repr_bitpacked.rs +++ b/library/std/src/io/error/repr_bitpacked.rs @@ -124,6 +124,7 @@ const TAG_SIMPLE: usize = 0b11; /// is_unwind_safe::<std::io::Error>(); /// ``` #[repr(transparent)] +#[rustc_insignificant_dtor] pub(super) struct Repr(NonNull<()>, PhantomData<ErrorData<Box<Custom>>>); // All the types `Repr` stores internally are Send + Sync, and so is it. diff --git a/library/std/src/io/mod.rs b/library/std/src/io/mod.rs index 0b57d01f273..dd6458c38c6 100644 --- a/library/std/src/io/mod.rs +++ b/library/std/src/io/mod.rs @@ -474,18 +474,28 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>( } let mut cursor = read_buf.unfilled(); - loop { + let result = loop { match r.read_buf(cursor.reborrow()) { - Ok(()) => break, Err(e) if e.is_interrupted() => continue, - Err(e) => return Err(e), + // Do not stop now in case of error: we might have received both data + // and an error + res => break res, } - } + }; let unfilled_but_initialized = cursor.init_ref().len(); let bytes_read = cursor.written(); let was_fully_initialized = read_buf.init_len() == buf_len; + // SAFETY: BorrowedBuf's invariants mean this much memory is initialized. + unsafe { + let new_len = bytes_read + buf.len(); + buf.set_len(new_len); + } + + // Now that all data is pushed to the vector, we can fail without data loss + result?; + if bytes_read == 0 { return Ok(buf.len() - start_len); } @@ -499,12 +509,6 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>( // store how much was initialized but not filled initialized = unfilled_but_initialized; - // SAFETY: BorrowedBuf's invariants mean this much memory is initialized. - unsafe { - let new_len = bytes_read + buf.len(); - buf.set_len(new_len); - } - // Use heuristics to determine the max read size if no initial size hint was provided if size_hint.is_none() { // The reader is returning short reads but it doesn't call ensure_init(). @@ -974,6 +978,8 @@ pub trait Read { /// with uninitialized buffers. The new data will be appended to any existing contents of `buf`. /// /// The default implementation delegates to `read`. + /// + /// This method makes it possible to return both data and an error but it is advised against. #[unstable(feature = "read_buf", issue = "78485")] fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> { default_read_buf(|b| self.read(b), buf) @@ -2941,7 +2947,7 @@ impl<T: Read> Read for Take<T> { } let mut cursor = sliced_buf.unfilled(); - self.inner.read_buf(cursor.reborrow())?; + let result = self.inner.read_buf(cursor.reborrow()); let new_init = cursor.init_ref().len(); let filled = sliced_buf.len(); @@ -2956,13 +2962,14 @@ impl<T: Read> Read for Take<T> { } self.limit -= filled as u64; + + result } else { let written = buf.written(); - self.inner.read_buf(buf.reborrow())?; + let result = self.inner.read_buf(buf.reborrow()); self.limit -= (buf.written() - written) as u64; + result } - - Ok(()) } } diff --git a/library/std/src/io/stdio/tests.rs b/library/std/src/io/stdio/tests.rs index ea76a271d12..bf8f3a5adfb 100644 --- a/library/std/src/io/stdio/tests.rs +++ b/library/std/src/io/stdio/tests.rs @@ -25,7 +25,7 @@ fn stderrlock_unwind_safe() { fn assert_unwind_safe<T: UnwindSafe + RefUnwindSafe>() {} #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn panic_doesnt_poison() { thread::spawn(|| { let _a = stdin(); @@ -48,17 +48,17 @@ fn panic_doesnt_poison() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn test_lock_stderr() { test_lock(stderr, || stderr().lock()); } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn test_lock_stdin() { test_lock(stdin, || stdin().lock()); } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn test_lock_stdout() { test_lock(stdout, || stdout().lock()); } diff --git a/library/std/src/io/tests.rs b/library/std/src/io/tests.rs index f551dcd401e..56b71c47dc7 100644 --- a/library/std/src/io/tests.rs +++ b/library/std/src/io/tests.rs @@ -735,6 +735,69 @@ fn read_buf_full_read() { assert_eq!(BufReader::new(FullRead).fill_buf().unwrap().len(), DEFAULT_BUF_SIZE); } +struct DataAndErrorReader(&'static [u8]); + +impl Read for DataAndErrorReader { + fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> { + panic!("We want tests to use `read_buf`") + } + + fn read_buf(&mut self, buf: io::BorrowedCursor<'_>) -> io::Result<()> { + self.0.read_buf(buf).unwrap(); + Err(io::Error::other("error")) + } +} + +#[test] +fn read_buf_data_and_error_take() { + let mut buf = [0; 64]; + let mut buf = io::BorrowedBuf::from(buf.as_mut_slice()); + + let mut r = DataAndErrorReader(&[4, 5, 6]).take(1); + assert!(r.read_buf(buf.unfilled()).is_err()); + assert_eq!(buf.filled(), &[4]); + + assert!(r.read_buf(buf.unfilled()).is_ok()); + assert_eq!(buf.filled(), &[4]); + assert_eq!(r.get_ref().0, &[5, 6]); +} + +#[test] +fn read_buf_data_and_error_buf() { + let mut r = BufReader::new(DataAndErrorReader(&[4, 5, 6])); + + assert!(r.fill_buf().is_err()); + assert_eq!(r.fill_buf().unwrap(), &[4, 5, 6]); +} + +#[test] +fn read_buf_data_and_error_read_to_end() { + let mut r = DataAndErrorReader(&[4, 5, 6]); + + let mut v = Vec::with_capacity(200); + assert!(r.read_to_end(&mut v).is_err()); + + assert_eq!(v, &[4, 5, 6]); +} + +#[test] +fn read_to_end_error() { + struct ErrorReader; + + impl Read for ErrorReader { + fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> { + Err(io::Error::other("error")) + } + } + + let mut r = [4, 5, 6].chain(ErrorReader); + + let mut v = Vec::with_capacity(200); + assert!(r.read_to_end(&mut v).is_err()); + + assert_eq!(v, &[4, 5, 6]); +} + #[test] // Miri does not support signalling OOM #[cfg_attr(miri, ignore)] diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index b81e7c18abb..65a9aa66c7c 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -153,7 +153,7 @@ //! the [`io`], [`fs`], and [`net`] modules. //! //! The [`thread`] module contains Rust's threading abstractions. [`sync`] -//! contains further primitive shared memory types, including [`atomic`] and +//! contains further primitive shared memory types, including [`atomic`], [`mpmc`] and //! [`mpsc`], which contains the channel types for message passing. //! //! # Use before and after `main()` @@ -177,6 +177,7 @@ //! - after-main use of thread-locals, which also affects additional features: //! - [`thread::current()`] //! - [`thread::scope()`] +//! - [`sync::mpmc`] //! - [`sync::mpsc`] //! - before-main stdio file descriptors are not guaranteed to be open on unix platforms //! @@ -202,6 +203,7 @@ //! [`atomic`]: sync::atomic //! [`for`]: ../book/ch03-05-control-flow.html#looping-through-a-collection-with-for //! [`str`]: prim@str +//! [`mpmc`]: sync::mpmc //! [`mpsc`]: sync::mpsc //! [`std::cmp`]: cmp //! [`std::slice`]: mod@slice diff --git a/library/std/src/net/ip_addr.rs b/library/std/src/net/ip_addr.rs index 8a9426b61f9..4d673a1d66d 100644 --- a/library/std/src/net/ip_addr.rs +++ b/library/std/src/net/ip_addr.rs @@ -1,5 +1,5 @@ // Tests for this module -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", all(target_os = "wasi", target_env = "p1")))))] mod tests; #[stable(feature = "ip_addr", since = "1.7.0")] diff --git a/library/std/src/net/socket_addr.rs b/library/std/src/net/socket_addr.rs index 84922aabdb5..ba9c948a2e9 100644 --- a/library/std/src/net/socket_addr.rs +++ b/library/std/src/net/socket_addr.rs @@ -1,5 +1,5 @@ // Tests for this module -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", all(target_os = "wasi", target_env = "p1")))))] mod tests; #[stable(feature = "rust1", since = "1.0.0")] diff --git a/library/std/src/net/tcp.rs b/library/std/src/net/tcp.rs index 06ed4f6a03d..67a0f7e439d 100644 --- a/library/std/src/net/tcp.rs +++ b/library/std/src/net/tcp.rs @@ -1,6 +1,13 @@ #![deny(unsafe_op_in_unsafe_fn)] -#[cfg(all(test, not(any(target_os = "emscripten", target_os = "xous"))))] +#[cfg(all( + test, + not(any( + target_os = "emscripten", + all(target_os = "wasi", target_env = "p1"), + target_os = "xous" + )) +))] mod tests; use crate::fmt; @@ -561,7 +568,7 @@ impl TcpStream { /// Moves this TCP stream into or out of nonblocking mode. /// - /// This will result in `read`, `write`, `recv` and `send` operations + /// This will result in `read`, `write`, `recv` and `send` system operations /// becoming nonblocking, i.e., immediately returning from their calls. /// If the IO operation is successful, `Ok` is returned and no further /// action is required. If the IO operation could not be completed and needs diff --git a/library/std/src/net/tcp/tests.rs b/library/std/src/net/tcp/tests.rs index d26517d74e4..a7b5cdf4ec0 100644 --- a/library/std/src/net/tcp/tests.rs +++ b/library/std/src/net/tcp/tests.rs @@ -57,6 +57,7 @@ fn connect_timeout_error() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn listen_localhost() { let socket_addr = next_test_ip4(); let listener = t!(TcpListener::bind(&socket_addr)); @@ -73,6 +74,7 @@ fn listen_localhost() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn connect_loopback() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); @@ -94,6 +96,7 @@ fn connect_loopback() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn smoke_test() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); @@ -114,6 +117,7 @@ fn smoke_test() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn read_eof() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); @@ -133,6 +137,7 @@ fn read_eof() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn write_close() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); @@ -161,6 +166,7 @@ fn write_close() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn multiple_connect_serial() { each_ip(&mut |addr| { let max = 10; @@ -183,6 +189,7 @@ fn multiple_connect_serial() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn multiple_connect_interleaved_greedy_schedule() { const MAX: usize = 10; each_ip(&mut |addr| { @@ -220,6 +227,7 @@ fn multiple_connect_interleaved_greedy_schedule() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn multiple_connect_interleaved_lazy_schedule() { const MAX: usize = 10; each_ip(&mut |addr| { @@ -255,6 +263,7 @@ fn multiple_connect_interleaved_lazy_schedule() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn socket_and_peer_name() { each_ip(&mut |addr| { let listener = t!(TcpListener::bind(&addr)); @@ -270,6 +279,7 @@ fn socket_and_peer_name() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn partial_read() { each_ip(&mut |addr| { let (tx, rx) = channel(); @@ -291,6 +301,7 @@ fn partial_read() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn read_buf() { each_ip(&mut |addr| { let srv = t!(TcpListener::bind(&addr)); @@ -389,6 +400,7 @@ fn double_bind() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn tcp_clone_smoke() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); @@ -420,6 +432,7 @@ fn tcp_clone_smoke() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn tcp_clone_two_read() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); @@ -454,6 +467,7 @@ fn tcp_clone_two_read() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn tcp_clone_two_write() { each_ip(&mut |addr| { let acceptor = t!(TcpListener::bind(&addr)); @@ -483,6 +497,7 @@ fn tcp_clone_two_write() { #[test] // FIXME: https://github.com/fortanix/rust-sgx/issues/110 #[cfg_attr(target_env = "sgx", ignore)] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn shutdown_smoke() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); @@ -505,6 +520,7 @@ fn shutdown_smoke() { #[test] // FIXME: https://github.com/fortanix/rust-sgx/issues/110 #[cfg_attr(target_env = "sgx", ignore)] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn close_readwrite_smoke() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); @@ -547,6 +563,7 @@ fn close_readwrite_smoke() { #[cfg_attr(target_env = "sgx", ignore)] // On windows, shutdown will not wake up blocking I/O operations. #[cfg_attr(windows, ignore)] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn close_read_wakes_up() { each_ip(&mut |addr| { let listener = t!(TcpListener::bind(&addr)); @@ -574,6 +591,7 @@ fn close_read_wakes_up() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn clone_while_reading() { each_ip(&mut |addr| { let accept = t!(TcpListener::bind(&addr)); @@ -614,6 +632,7 @@ fn clone_while_reading() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn clone_accept_smoke() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); @@ -632,6 +651,7 @@ fn clone_accept_smoke() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn clone_accept_concurrent() { each_ip(&mut |addr| { let a = t!(TcpListener::bind(&addr)); @@ -670,10 +690,10 @@ fn debug() { addr.to_string() } + #[cfg(any(unix, target_os = "wasi"))] + use crate::os::fd::AsRawFd; #[cfg(target_env = "sgx")] use crate::os::fortanix_sgx::io::AsRawFd; - #[cfg(unix)] - use crate::os::unix::io::AsRawFd; #[cfg(not(windows))] fn render_inner(addr: &dyn AsRawFd) -> impl fmt::Debug { addr.as_raw_fd() @@ -714,6 +734,7 @@ fn debug() { ignore )] #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +#[cfg_attr(target_os = "wasi", ignore)] // timeout not supported #[test] fn timeouts() { let addr = next_test_ip4(); @@ -742,6 +763,7 @@ fn timeouts() { #[test] #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +#[cfg_attr(target_os = "wasi", ignore)] // timeout not supported fn test_read_timeout() { let addr = next_test_ip4(); let listener = t!(TcpListener::bind(&addr)); @@ -763,6 +785,7 @@ fn test_read_timeout() { #[test] #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +#[cfg_attr(target_os = "wasi", ignore)] // timeout not supported fn test_read_with_timeout() { let addr = next_test_ip4(); let listener = t!(TcpListener::bind(&addr)); @@ -810,6 +833,7 @@ fn test_timeout_zero_duration() { #[test] #[cfg_attr(target_env = "sgx", ignore)] +#[cfg_attr(target_os = "wasi", ignore)] // linger not supported fn linger() { let addr = next_test_ip4(); let _listener = t!(TcpListener::bind(&addr)); @@ -879,6 +903,7 @@ fn set_nonblocking() { #[test] #[cfg_attr(target_env = "sgx", ignore)] // FIXME: https://github.com/fortanix/rust-sgx/issues/31 +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn peek() { each_ip(&mut |addr| { let (txdone, rxdone) = channel(); diff --git a/library/std/src/net/udp.rs b/library/std/src/net/udp.rs index d4252cb87ac..6df47d7b0e0 100644 --- a/library/std/src/net/udp.rs +++ b/library/std/src/net/udp.rs @@ -1,4 +1,12 @@ -#[cfg(all(test, not(any(target_os = "emscripten", target_env = "sgx", target_os = "xous"))))] +#[cfg(all( + test, + not(any( + target_os = "emscripten", + all(target_os = "wasi", target_env = "p1"), + target_env = "sgx", + target_os = "xous" + )) +))] mod tests; use crate::fmt; @@ -579,8 +587,8 @@ impl UdpSocket { /// This function specifies a new multicast group for this socket to join. /// The address must be a valid multicast address, and `interface` is the /// address of the local interface with which the system should join the - /// multicast group. If it's equal to `INADDR_ANY` then an appropriate - /// interface is chosen by the system. + /// multicast group. If it's equal to [`UNSPECIFIED`](Ipv4Addr::UNSPECIFIED) + /// then an appropriate interface is chosen by the system. #[stable(feature = "net2_mutators", since = "1.9.0")] pub fn join_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { self.0.join_multicast_v4(multiaddr, interface) @@ -764,7 +772,7 @@ impl UdpSocket { /// Moves this UDP socket into or out of nonblocking mode. /// - /// This will result in `recv`, `recv_from`, `send`, and `send_to` + /// This will result in `recv`, `recv_from`, `send`, and `send_to` system /// operations becoming nonblocking, i.e., immediately returning from their /// calls. If the IO operation is successful, `Ok` is returned and no /// further action is required. If the IO operation could not be completed diff --git a/library/std/src/net/udp/tests.rs b/library/std/src/net/udp/tests.rs index 0cf99366452..1c8c58d1879 100644 --- a/library/std/src/net/udp/tests.rs +++ b/library/std/src/net/udp/tests.rs @@ -27,6 +27,7 @@ fn bind_error() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn socket_smoke_test_ip4() { each_ip(&mut |server_ip, client_ip| { let (tx1, rx1) = channel(); @@ -69,6 +70,7 @@ fn socket_peer() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn udp_clone_smoke() { each_ip(&mut |addr1, addr2| { let sock1 = t!(UdpSocket::bind(&addr1)); @@ -98,6 +100,7 @@ fn udp_clone_smoke() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn udp_clone_two_read() { each_ip(&mut |addr1, addr2| { let sock1 = t!(UdpSocket::bind(&addr1)); @@ -130,6 +133,7 @@ fn udp_clone_two_read() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // no threads fn udp_clone_two_write() { each_ip(&mut |addr1, addr2| { let sock1 = t!(UdpSocket::bind(&addr1)); @@ -183,6 +187,7 @@ fn debug() { any(target_os = "netbsd", target_os = "openbsd", target_os = "vxworks", target_os = "nto"), ignore )] +#[cfg_attr(target_os = "wasi", ignore)] // timeout not supported #[test] fn timeouts() { let addr = next_test_ip4(); @@ -208,6 +213,7 @@ fn timeouts() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // timeout not supported fn test_read_timeout() { let addr = next_test_ip4(); @@ -232,6 +238,7 @@ fn test_read_timeout() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // timeout not supported fn test_read_with_timeout() { let addr = next_test_ip4(); @@ -291,6 +298,7 @@ fn connect_send_recv() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // peek not supported fn connect_send_peek_recv() { each_ip(&mut |addr, _| { let socket = t!(UdpSocket::bind(&addr)); @@ -313,6 +321,7 @@ fn connect_send_peek_recv() { } #[test] +#[cfg_attr(target_os = "wasi", ignore)] // peek_from not supported fn peek_from() { each_ip(&mut |addr, _| { let socket = t!(UdpSocket::bind(&addr)); diff --git a/library/std/src/os/unix/net/addr.rs b/library/std/src/os/unix/net/addr.rs index 79f2c365025..253e1503cf7 100644 --- a/library/std/src/os/unix/net/addr.rs +++ b/library/std/src/os/unix/net/addr.rs @@ -15,15 +15,12 @@ mod libc { pub type socklen_t = u32; pub struct sockaddr; #[derive(Clone)] - pub struct sockaddr_un; + pub struct sockaddr_un { + pub sun_path: [u8; 1], + } } -fn sun_path_offset(addr: &libc::sockaddr_un) -> usize { - // Work with an actual instance of the type since using a null pointer is UB - let base = (addr as *const libc::sockaddr_un).addr(); - let path = core::ptr::addr_of!(addr.sun_path).addr(); - path - base -} +const SUN_PATH_OFFSET: usize = mem::offset_of!(libc::sockaddr_un, sun_path); pub(super) fn sockaddr_un(path: &Path) -> io::Result<(libc::sockaddr_un, libc::socklen_t)> { // SAFETY: All zeros is a valid representation for `sockaddr_un`. @@ -53,7 +50,7 @@ pub(super) fn sockaddr_un(path: &Path) -> io::Result<(libc::sockaddr_un, libc::s ptr::copy_nonoverlapping(bytes.as_ptr(), addr.sun_path.as_mut_ptr().cast(), bytes.len()) }; - let mut len = sun_path_offset(&addr) + bytes.len(); + let mut len = SUN_PATH_OFFSET + bytes.len(); match bytes.get(0) { Some(&0) | None => {} Some(_) => len += 1, @@ -98,7 +95,7 @@ impl SocketAddr { unsafe { let mut addr: libc::sockaddr_un = mem::zeroed(); let mut len = mem::size_of::<libc::sockaddr_un>() as libc::socklen_t; - cvt(f(core::ptr::addr_of_mut!(addr) as *mut _, &mut len))?; + cvt(f((&raw mut addr) as *mut _, &mut len))?; SocketAddr::from_parts(addr, len) } } @@ -114,13 +111,13 @@ impl SocketAddr { let sun_path: &[u8] = unsafe { mem::transmute::<&[libc::c_char], &[u8]>(&addr.sun_path) }; len = core::slice::memchr::memchr(0, sun_path) - .map_or(len, |new_len| (new_len + sun_path_offset(&addr)) as libc::socklen_t); + .map_or(len, |new_len| (new_len + SUN_PATH_OFFSET) as libc::socklen_t); } if len == 0 { // When there is a datagram from unnamed unix socket // linux returns zero bytes of address - len = sun_path_offset(&addr) as libc::socklen_t; // i.e., zero-length address + len = SUN_PATH_OFFSET as libc::socklen_t; // i.e., zero-length address } else if addr.sun_family != libc::AF_UNIX as libc::sa_family_t { return Err(io::const_io_error!( io::ErrorKind::InvalidInput, @@ -238,7 +235,7 @@ impl SocketAddr { } fn address(&self) -> AddressKind<'_> { - let len = self.len as usize - sun_path_offset(&self.addr); + let len = self.len as usize - SUN_PATH_OFFSET; let path = unsafe { mem::transmute::<&[libc::c_char], &[u8]>(&self.addr.sun_path) }; // macOS seems to return a len of 16 and a zeroed sun_path for unnamed addresses @@ -287,7 +284,7 @@ impl linux_ext::addr::SocketAddrExt for SocketAddr { addr.sun_path.as_mut_ptr().add(1) as *mut u8, name.len(), ); - let len = (sun_path_offset(&addr) + 1 + name.len()) as libc::socklen_t; + let len = (SUN_PATH_OFFSET + 1 + name.len()) as libc::socklen_t; SocketAddr::from_parts(addr, len) } } diff --git a/library/std/src/os/unix/net/ancillary.rs b/library/std/src/os/unix/net/ancillary.rs index c34a3b4e184..36967fc3f98 100644 --- a/library/std/src/os/unix/net/ancillary.rs +++ b/library/std/src/os/unix/net/ancillary.rs @@ -37,7 +37,7 @@ pub(super) fn recv_vectored_with_ancillary_from( unsafe { let mut msg_name: libc::sockaddr_un = zeroed(); let mut msg: libc::msghdr = zeroed(); - msg.msg_name = core::ptr::addr_of_mut!(msg_name) as *mut _; + msg.msg_name = (&raw mut msg_name) as *mut _; msg.msg_namelen = size_of::<libc::sockaddr_un>() as libc::socklen_t; msg.msg_iov = bufs.as_mut_ptr().cast(); msg.msg_iovlen = bufs.len() as _; @@ -70,7 +70,7 @@ pub(super) fn send_vectored_with_ancillary_to( if let Some(path) = path { sockaddr_un(path)? } else { (zeroed(), 0) }; let mut msg: libc::msghdr = zeroed(); - msg.msg_name = core::ptr::addr_of_mut!(msg_name) as *mut _; + msg.msg_name = (&raw mut msg_name) as *mut _; msg.msg_namelen = msg_namelen; msg.msg_iov = bufs.as_ptr() as *mut _; msg.msg_iovlen = bufs.len() as _; diff --git a/library/std/src/os/unix/net/datagram.rs b/library/std/src/os/unix/net/datagram.rs index 48aaddd2d52..82446ea107f 100644 --- a/library/std/src/os/unix/net/datagram.rs +++ b/library/std/src/os/unix/net/datagram.rs @@ -100,7 +100,7 @@ impl UnixDatagram { let socket = UnixDatagram::unbound()?; let (addr, len) = sockaddr_un(path.as_ref())?; - cvt(libc::bind(socket.as_raw_fd(), core::ptr::addr_of!(addr) as *const _, len as _))?; + cvt(libc::bind(socket.as_raw_fd(), (&raw const addr) as *const _, len as _))?; Ok(socket) } @@ -133,7 +133,7 @@ impl UnixDatagram { let socket = UnixDatagram::unbound()?; cvt(libc::bind( socket.as_raw_fd(), - core::ptr::addr_of!(socket_addr.addr) as *const _, + (&raw const socket_addr.addr) as *const _, socket_addr.len as _, ))?; Ok(socket) @@ -215,7 +215,7 @@ impl UnixDatagram { unsafe { let (addr, len) = sockaddr_un(path.as_ref())?; - cvt(libc::connect(self.as_raw_fd(), core::ptr::addr_of!(addr) as *const _, len))?; + cvt(libc::connect(self.as_raw_fd(), (&raw const addr) as *const _, len))?; } Ok(()) } @@ -247,7 +247,7 @@ impl UnixDatagram { unsafe { cvt(libc::connect( self.as_raw_fd(), - core::ptr::addr_of!(socket_addr.addr) as *const _, + (&raw const socket_addr.addr) as *const _, socket_addr.len, ))?; } @@ -514,7 +514,7 @@ impl UnixDatagram { buf.as_ptr() as *const _, buf.len(), MSG_NOSIGNAL, - core::ptr::addr_of!(addr) as *const _, + (&raw const addr) as *const _, len, ))?; Ok(count as usize) @@ -549,7 +549,7 @@ impl UnixDatagram { buf.as_ptr() as *const _, buf.len(), MSG_NOSIGNAL, - core::ptr::addr_of!(socket_addr.addr) as *const _, + (&raw const socket_addr.addr) as *const _, socket_addr.len, ))?; Ok(count as usize) diff --git a/library/std/src/os/unix/net/listener.rs b/library/std/src/os/unix/net/listener.rs index 440408eb13f..be236317d04 100644 --- a/library/std/src/os/unix/net/listener.rs +++ b/library/std/src/os/unix/net/listener.rs @@ -103,11 +103,7 @@ impl UnixListener { )))] const backlog: libc::c_int = libc::SOMAXCONN; - cvt(libc::bind( - inner.as_inner().as_raw_fd(), - core::ptr::addr_of!(addr) as *const _, - len as _, - ))?; + cvt(libc::bind(inner.as_inner().as_raw_fd(), (&raw const addr) as *const _, len as _))?; cvt(libc::listen(inner.as_inner().as_raw_fd(), backlog))?; Ok(UnixListener(inner)) @@ -147,7 +143,7 @@ impl UnixListener { const backlog: core::ffi::c_int = 128; cvt(libc::bind( inner.as_raw_fd(), - core::ptr::addr_of!(socket_addr.addr) as *const _, + (&raw const socket_addr.addr) as *const _, socket_addr.len as _, ))?; cvt(libc::listen(inner.as_raw_fd(), backlog))?; @@ -182,7 +178,7 @@ impl UnixListener { pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { let mut storage: libc::sockaddr_un = unsafe { mem::zeroed() }; let mut len = mem::size_of_val(&storage) as libc::socklen_t; - let sock = self.0.accept(core::ptr::addr_of_mut!(storage) as *mut _, &mut len)?; + let sock = self.0.accept((&raw mut storage) as *mut _, &mut len)?; let addr = SocketAddr::from_parts(storage, len)?; Ok((UnixStream(sock), addr)) } diff --git a/library/std/src/os/unix/net/stream.rs b/library/std/src/os/unix/net/stream.rs index 4967c5b89ec..cb210b41eae 100644 --- a/library/std/src/os/unix/net/stream.rs +++ b/library/std/src/os/unix/net/stream.rs @@ -84,7 +84,7 @@ impl UnixStream { let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?; let (addr, len) = sockaddr_un(path.as_ref())?; - cvt(libc::connect(inner.as_raw_fd(), core::ptr::addr_of!(addr) as *const _, len))?; + cvt(libc::connect(inner.as_raw_fd(), (&raw const addr) as *const _, len))?; Ok(UnixStream(inner)) } } @@ -118,7 +118,7 @@ impl UnixStream { let inner = Socket::new_raw(libc::AF_UNIX, libc::SOCK_STREAM)?; cvt(libc::connect( inner.as_raw_fd(), - core::ptr::addr_of!(socket_addr.addr) as *const _, + (&raw const socket_addr.addr) as *const _, socket_addr.len, ))?; Ok(UnixStream(inner)) diff --git a/library/std/src/os/unix/net/ucred.rs b/library/std/src/os/unix/net/ucred.rs index c818bd05858..e1014a4f296 100644 --- a/library/std/src/os/unix/net/ucred.rs +++ b/library/std/src/os/unix/net/ucred.rs @@ -60,7 +60,7 @@ mod impl_linux { socket.as_raw_fd(), SOL_SOCKET, SO_PEERCRED, - core::ptr::addr_of_mut!(ucred) as *mut c_void, + (&raw mut ucred) as *mut c_void, &mut ucred_size, ); @@ -121,7 +121,7 @@ mod impl_apple { socket.as_raw_fd(), SOL_LOCAL, LOCAL_PEERPID, - core::ptr::addr_of_mut!(pid) as *mut c_void, + (&raw mut pid) as *mut c_void, &mut pid_size, ); diff --git a/library/std/src/panic.rs b/library/std/src/panic.rs index 015cab89485..d649357a56d 100644 --- a/library/std/src/panic.rs +++ b/library/std/src/panic.rs @@ -288,45 +288,55 @@ pub use core::panic::abort_unwind; /// Invokes a closure, capturing the cause of an unwinding panic if one occurs. /// -/// This function will return `Ok` with the closure's result if the closure -/// does not panic, and will return `Err(cause)` if the closure panics. The -/// `cause` returned is the object with which panic was originally invoked. +/// This function will return `Ok` with the closure's result if the closure does +/// not panic, and will return `Err(cause)` if the closure panics. The `cause` +/// returned is the object with which panic was originally invoked. /// -/// It is currently undefined behavior to unwind from Rust code into foreign -/// code, so this function is particularly useful when Rust is called from -/// another language (normally C). This can run arbitrary Rust code, capturing a -/// panic and allowing a graceful handling of the error. +/// Rust functions that are expected to be called from foreign code that does +/// not support unwinding (such as C compiled with `-fno-exceptions`) should be +/// defined using `extern "C"`, which ensures that if the Rust code panics, it +/// is automatically caught and the process is aborted. If this is the desired +/// behavior, it is not necessary to use `catch_unwind` explicitly. This +/// function should instead be used when more graceful error-handling is needed. /// /// It is **not** recommended to use this function for a general try/catch /// mechanism. The [`Result`] type is more appropriate to use for functions that /// can fail on a regular basis. Additionally, this function is not guaranteed /// to catch all panics, see the "Notes" section below. /// -/// The closure provided is required to adhere to the [`UnwindSafe`] trait to ensure -/// that all captured variables are safe to cross this boundary. The purpose of -/// this bound is to encode the concept of [exception safety][rfc] in the type -/// system. Most usage of this function should not need to worry about this -/// bound as programs are naturally unwind safe without `unsafe` code. If it -/// becomes a problem the [`AssertUnwindSafe`] wrapper struct can be used to quickly -/// assert that the usage here is indeed unwind safe. +/// The closure provided is required to adhere to the [`UnwindSafe`] trait to +/// ensure that all captured variables are safe to cross this boundary. The +/// purpose of this bound is to encode the concept of [exception safety][rfc] in +/// the type system. Most usage of this function should not need to worry about +/// this bound as programs are naturally unwind safe without `unsafe` code. If +/// it becomes a problem the [`AssertUnwindSafe`] wrapper struct can be used to +/// quickly assert that the usage here is indeed unwind safe. /// /// [rfc]: https://github.com/rust-lang/rfcs/blob/master/text/1236-stabilize-catch-panic.md /// /// # Notes /// -/// Note that this function **might not catch all panics** in Rust. A panic in -/// Rust is not always implemented via unwinding, but can be implemented by -/// aborting the process as well. This function *only* catches unwinding panics, -/// not those that abort the process. +/// This function **might not catch all Rust panics**. A Rust panic is not +/// always implemented via unwinding, but can be implemented by aborting the +/// process as well. This function *only* catches unwinding panics, not those +/// that abort the process. /// -/// Note that if a custom panic hook has been set, it will be invoked before -/// the panic is caught, before unwinding. +/// If a custom panic hook has been set, it will be invoked before the panic is +/// caught, before unwinding. /// -/// Also note that unwinding into Rust code with a foreign exception (e.g. -/// an exception thrown from C++ code) is undefined behavior. +/// Although unwinding into Rust code with a foreign exception (e.g. an +/// exception thrown from C++ code, or a `panic!` in Rust code compiled or +/// linked with a different runtime) via an appropriate ABI (e.g. `"C-unwind"`) +/// is permitted, catching such an exception using this function will have one +/// of two behaviors, and it is unspecified which will occur: /// -/// Finally, be **careful in how you drop the result of this function**. -/// If it is `Err`, it contains the panic payload, and dropping that may in turn panic! +/// * The process aborts, after executing all destructors of `f` and the +/// functions it called. +/// * The function returns a `Result::Err` containing an opaque type. +/// +/// Finally, be **careful in how you drop the result of this function**. If it +/// is `Err`, it contains the panic payload, and dropping that may in turn +/// panic! /// /// # Examples /// diff --git a/library/std/src/panicking.rs b/library/std/src/panicking.rs index 336e34d7b95..ac1f547c914 100644 --- a/library/std/src/panicking.rs +++ b/library/std/src/panicking.rs @@ -23,8 +23,8 @@ use crate::mem::{self, ManuallyDrop}; use crate::panic::{BacktraceStyle, PanicHookInfo}; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::{PoisonError, RwLock}; +use crate::sys::backtrace; use crate::sys::stdio::panic_output; -use crate::sys::{backtrace, dbg}; use crate::{fmt, intrinsics, process, thread}; // Binary interface to the panic runtime that the standard library depends on. @@ -506,7 +506,7 @@ pub unsafe fn r#try<R, F: FnOnce() -> R>(f: F) -> Result<R, Box<dyn Any + Send>> // method of calling a catch panic whilst juggling ownership. let mut data = Data { f: ManuallyDrop::new(f) }; - let data_ptr = core::ptr::addr_of_mut!(data) as *mut u8; + let data_ptr = (&raw mut data) as *mut u8; // SAFETY: // // Access to the union's fields: this is `std` and we know that the `r#try` @@ -859,14 +859,6 @@ pub fn rust_panic_without_hook(payload: Box<dyn Any + Send>) -> ! { #[cfg_attr(not(test), rustc_std_internal_symbol)] #[cfg(not(feature = "panic_immediate_abort"))] fn rust_panic(msg: &mut dyn PanicPayload) -> ! { - // Break into the debugger if it is attached. - // The return value is not currently used. - // - // This function isn't used anywhere else, and - // using inside `#[panic_handler]` doesn't seem - // to count, so a warning is issued. - let _ = dbg::breakpoint_if_debugging(); - let code = unsafe { __rust_start_panic(msg) }; rtabort!("failed to initiate panic, error {code}") } @@ -874,14 +866,6 @@ fn rust_panic(msg: &mut dyn PanicPayload) -> ! { #[cfg_attr(not(test), rustc_std_internal_symbol)] #[cfg(feature = "panic_immediate_abort")] fn rust_panic(_: &mut dyn PanicPayload) -> ! { - // Break into the debugger if it is attached. - // The return value is not currently used. - // - // This function isn't used anywhere else, and - // using inside `#[panic_handler]` doesn't seem - // to count, so a warning is issued. - let _ = dbg::breakpoint_if_debugging(); - unsafe { crate::intrinsics::abort(); } diff --git a/library/std/src/path.rs b/library/std/src/path.rs index e3ff7d199cc..63edfdb82f3 100644 --- a/library/std/src/path.rs +++ b/library/std/src/path.rs @@ -3144,7 +3144,7 @@ unsafe impl CloneToUninit for Path { #[cfg_attr(debug_assertions, track_caller)] unsafe fn clone_to_uninit(&self, dst: *mut Self) { // SAFETY: Path is just a wrapper around OsStr - unsafe { self.inner.clone_to_uninit(core::ptr::addr_of_mut!((*dst).inner)) } + unsafe { self.inner.clone_to_uninit(&raw mut (*dst).inner) } } } diff --git a/library/std/src/path/tests.rs b/library/std/src/path/tests.rs index 6436872087d..b75793d2bc9 100644 --- a/library/std/src/path/tests.rs +++ b/library/std/src/path/tests.rs @@ -139,7 +139,7 @@ fn test_pathbuf_leak() { } #[test] -#[cfg(unix)] +#[cfg(any(unix, target_os = "wasi"))] pub fn test_decompositions_unix() { t!("", iter: [], @@ -1201,7 +1201,10 @@ pub fn test_push() { }); ); - if cfg!(unix) || cfg!(all(target_env = "sgx", target_vendor = "fortanix")) { + if cfg!(unix) + || cfg!(target_os = "wasi") + || cfg!(all(target_env = "sgx", target_vendor = "fortanix")) + { tp!("", "foo", "foo"); tp!("foo", "bar", "foo/bar"); tp!("foo/", "bar", "foo/bar"); @@ -1358,7 +1361,10 @@ pub fn test_set_file_name() { tfn!("foo", "bar", "bar"); tfn!("foo", "", ""); tfn!("", "foo", "foo"); - if cfg!(unix) || cfg!(all(target_env = "sgx", target_vendor = "fortanix")) { + if cfg!(unix) + || cfg!(target_os = "wasi") + || cfg!(all(target_env = "sgx", target_vendor = "fortanix")) + { tfn!(".", "foo", "./foo"); tfn!("foo/", "bar", "bar"); tfn!("foo/.", "bar", "bar"); @@ -1758,7 +1764,7 @@ fn test_components_debug() { assert_eq!(expected, actual); } -#[cfg(unix)] +#[cfg(any(unix, target_os = "wasi"))] #[test] fn test_iter_debug() { let path = Path::new("/tmp"); @@ -1859,7 +1865,7 @@ fn test_ord() { } #[test] -#[cfg(unix)] +#[cfg(any(unix, target_os = "wasi"))] fn test_unix_absolute() { use crate::path::absolute; diff --git a/library/std/src/process.rs b/library/std/src/process.rs index c84a5c65263..f24fe353e55 100644 --- a/library/std/src/process.rs +++ b/library/std/src/process.rs @@ -148,7 +148,15 @@ #![stable(feature = "process", since = "1.0.0")] #![deny(unsafe_op_in_unsafe_fn)] -#[cfg(all(test, not(any(target_os = "emscripten", target_env = "sgx", target_os = "xous"))))] +#[cfg(all( + test, + not(any( + target_os = "emscripten", + target_os = "wasi", + target_env = "sgx", + target_os = "xous" + )) +))] mod tests; use crate::convert::Infallible; diff --git a/library/std/src/sync/barrier/tests.rs b/library/std/src/sync/barrier/tests.rs index c5620cd91d8..0fbcd998812 100644 --- a/library/std/src/sync/barrier/tests.rs +++ b/library/std/src/sync/barrier/tests.rs @@ -3,7 +3,7 @@ use crate::sync::{Arc, Barrier}; use crate::thread; #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn test_barrier() { const N: usize = 10; diff --git a/library/std/src/sync/condvar/tests.rs b/library/std/src/sync/condvar/tests.rs index 12d13a6b20b..f9e9066bc92 100644 --- a/library/std/src/sync/condvar/tests.rs +++ b/library/std/src/sync/condvar/tests.rs @@ -12,7 +12,7 @@ fn smoke() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn notify_one() { let m = Arc::new(Mutex::new(())); let m2 = m.clone(); @@ -29,7 +29,7 @@ fn notify_one() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn notify_all() { const N: usize = 10; @@ -66,7 +66,7 @@ fn notify_all() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn wait_while() { let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair2 = pair.clone(); @@ -87,7 +87,7 @@ fn wait_while() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported fn wait_timeout_wait() { let m = Arc::new(Mutex::new(())); let c = Arc::new(Condvar::new()); @@ -106,7 +106,7 @@ fn wait_timeout_wait() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported fn wait_timeout_while_wait() { let m = Arc::new(Mutex::new(())); let c = Arc::new(Condvar::new()); @@ -118,7 +118,7 @@ fn wait_timeout_while_wait() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // condvar wait not supported fn wait_timeout_while_instant_satisfy() { let m = Arc::new(Mutex::new(())); let c = Arc::new(Condvar::new()); @@ -130,7 +130,7 @@ fn wait_timeout_while_instant_satisfy() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn wait_timeout_while_wake() { let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair_copy = pair.clone(); @@ -153,7 +153,7 @@ fn wait_timeout_while_wake() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn wait_timeout_wake() { let m = Arc::new(Mutex::new(())); let c = Arc::new(Condvar::new()); diff --git a/library/std/src/sync/lazy_lock/tests.rs b/library/std/src/sync/lazy_lock/tests.rs index 94044368305..7d7dde54349 100644 --- a/library/std/src/sync/lazy_lock/tests.rs +++ b/library/std/src/sync/lazy_lock/tests.rs @@ -34,6 +34,7 @@ fn lazy_default() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn lazy_poisoning() { let x: LazyCell<String> = LazyCell::new(|| panic!("kaboom")); for _ in 0..2 { @@ -43,7 +44,7 @@ fn lazy_poisoning() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn sync_lazy_new() { static CALLED: AtomicUsize = AtomicUsize::new(0); static SYNC_LAZY: LazyLock<i32> = LazyLock::new(|| { @@ -90,7 +91,7 @@ fn sync_lazy_default() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn static_sync_lazy() { static XS: LazyLock<Vec<i32>> = LazyLock::new(|| { let mut xs = Vec::new(); @@ -123,6 +124,7 @@ fn static_sync_lazy_via_fn() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn sync_lazy_poisoning() { let x: LazyLock<String> = LazyLock::new(|| panic!("kaboom")); for _ in 0..2 { diff --git a/library/std/src/sync/mod.rs b/library/std/src/sync/mod.rs index 0fb8e669bf8..0fb77331293 100644 --- a/library/std/src/sync/mod.rs +++ b/library/std/src/sync/mod.rs @@ -133,6 +133,11 @@ //! inter-thread synchronisation mechanism, at the cost of some //! extra memory. //! +//! - [`mpmc`]: Multi-producer, multi-consumer queues, used for +//! message-based communication. Can provide a lightweight +//! inter-thread synchronisation mechanism, at the cost of some +//! extra memory. +//! //! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at //! most one thread at a time is able to access some data. //! @@ -153,6 +158,7 @@ //! [`Arc`]: crate::sync::Arc //! [`Barrier`]: crate::sync::Barrier //! [`Condvar`]: crate::sync::Condvar +//! [`mpmc`]: crate::sync::mpmc //! [`mpsc`]: crate::sync::mpsc //! [`Mutex`]: crate::sync::Mutex //! [`Once`]: crate::sync::Once @@ -193,12 +199,13 @@ pub use self::rwlock::{MappedRwLockReadGuard, MappedRwLockWriteGuard}; #[stable(feature = "rust1", since = "1.0.0")] pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +#[unstable(feature = "mpmc_channel", issue = "126840")] +pub mod mpmc; pub mod mpsc; mod barrier; mod condvar; mod lazy_lock; -mod mpmc; mod mutex; pub(crate) mod once; mod once_lock; diff --git a/library/std/src/sync/mpmc/error.rs b/library/std/src/sync/mpmc/error.rs index e3aec7e7623..e34b56d0831 100644 --- a/library/std/src/sync/mpmc/error.rs +++ b/library/std/src/sync/mpmc/error.rs @@ -7,6 +7,7 @@ use crate::{error, fmt}; /// /// [`send_timeout`]: super::Sender::send_timeout #[derive(PartialEq, Eq, Clone, Copy)] +#[unstable(feature = "mpmc_channel", issue = "126840")] pub enum SendTimeoutError<T> { /// The message could not be sent because the channel is full and the operation timed out. /// @@ -18,12 +19,14 @@ pub enum SendTimeoutError<T> { Disconnected(T), } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> fmt::Debug for SendTimeoutError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { "SendTimeoutError(..)".fmt(f) } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> fmt::Display for SendTimeoutError<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { @@ -33,8 +36,10 @@ impl<T> fmt::Display for SendTimeoutError<T> { } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> error::Error for SendTimeoutError<T> {} +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> From<SendError<T>> for SendTimeoutError<T> { fn from(err: SendError<T>) -> SendTimeoutError<T> { match err { diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs index c640e07348e..44e146a89ba 100644 --- a/library/std/src/sync/mpmc/mod.rs +++ b/library/std/src/sync/mpmc/mod.rs @@ -1,8 +1,114 @@ -//! Multi-producer multi-consumer channels. +//! Multi-producer, multi-consumer FIFO queue communication primitives. +//! +//! This module provides message-based communication over channels, concretely +//! defined by two types: +//! +//! * [`Sender`] +//! * [`Receiver`] +//! +//! [`Sender`]s are used to send data to a set of [`Receiver`]s. Both +//! sender and receiver are cloneable (multi-producer) such that many threads can send +//! simultaneously to receivers (multi-consumer). +//! +//! These channels come in two flavors: +//! +//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function +//! will return a `(Sender, Receiver)` tuple where all sends will be +//! **asynchronous** (they never block). The channel conceptually has an +//! infinite buffer. +//! +//! 2. A synchronous, bounded channel. The [`sync_channel`] function will +//! return a `(SyncSender, Receiver)` tuple where the storage for pending +//! messages is a pre-allocated buffer of a fixed size. All sends will be +//! **synchronous** by blocking until there is buffer space available. Note +//! that a bound of 0 is allowed, causing the channel to become a "rendezvous" +//! channel where each sender atomically hands off a message to a receiver. +//! +//! [`send`]: Sender::send +//! +//! ## Disconnection +//! +//! The send and receive operations on channels will all return a [`Result`] +//! indicating whether the operation succeeded or not. An unsuccessful operation +//! is normally indicative of the other half of a channel having "hung up" by +//! being dropped in its corresponding thread. +//! +//! Once half of a channel has been deallocated, most operations can no longer +//! continue to make progress, so [`Err`] will be returned. Many applications +//! will continue to [`unwrap`] the results returned from this module, +//! instigating a propagation of failure among threads if one unexpectedly dies. +//! +//! [`unwrap`]: Result::unwrap +//! +//! # Examples +//! +//! Simple usage: +//! +//! ``` +//! #![feature(mpmc_channel)] +//! +//! use std::thread; +//! use std::sync::mpmc::channel; +//! +//! // Create a simple streaming channel +//! let (tx, rx) = channel(); +//! thread::spawn(move || { +//! tx.send(10).unwrap(); +//! }); +//! assert_eq!(rx.recv().unwrap(), 10); +//! ``` +//! +//! Shared usage: +//! +//! ``` +//! #![feature(mpmc_channel)] +//! +//! use std::thread; +//! use std::sync::mpmc::channel; +//! +//! thread::scope(|s| { +//! // Create a shared channel that can be sent along from many threads +//! // where tx is the sending half (tx for transmission), and rx is the receiving +//! // half (rx for receiving). +//! let (tx, rx) = channel(); +//! for i in 0..10 { +//! let tx = tx.clone(); +//! s.spawn(move || { +//! tx.send(i).unwrap(); +//! }); +//! } +//! +//! for _ in 0..5 { +//! let rx1 = rx.clone(); +//! let rx2 = rx.clone(); +//! s.spawn(move || { +//! let j = rx1.recv().unwrap(); +//! assert!(0 <= j && j < 10); +//! }); +//! s.spawn(move || { +//! let j = rx2.recv().unwrap(); +//! assert!(0 <= j && j < 10); +//! }); +//! } +//! }) +//! ``` +//! +//! Propagating panics: +//! +//! ``` +//! #![feature(mpmc_channel)] +//! +//! use std::sync::mpmc::channel; +//! +//! // The call to recv() will return an error because the channel has already +//! // hung up (or been deallocated) +//! let (tx, rx) = channel::<i32>(); +//! drop(tx); +//! assert!(rx.recv().is_err()); +//! ``` -// This module is not currently exposed publicly, but is used -// as the implementation for the channels in `sync::mpsc`. The -// implementation comes from the crossbeam-channel crate: +// This module is used as the implementation for the channels in `sync::mpsc`. +// The implementation comes from the crossbeam-channel crate: // // Copyright (c) 2019 The Crossbeam Project Developers // @@ -46,9 +152,47 @@ use crate::fmt; use crate::panic::{RefUnwindSafe, UnwindSafe}; use crate::time::{Duration, Instant}; -/// Creates a channel of unbounded capacity. +/// Creates a new asynchronous channel, returning the sender/receiver halves. +/// All data sent on the [`Sender`] will become available on the [`Receiver`] in +/// the same order as it was sent, and no [`send`] will block the calling thread +/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will +/// block after its buffer limit is reached). [`recv`] will block until a message +/// is available while there is at least one [`Sender`] alive (including clones). /// -/// This channel has a growable buffer that can hold any number of messages at a time. +/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times. +/// The [`Receiver`] also can be cloned to have multi receivers. +/// +/// If the [`Receiver`] is disconnected while trying to [`send`] with the +/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the +/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will +/// return a [`RecvError`]. +/// +/// [`send`]: Sender::send +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ``` +/// #![feature(mpmc_channel)] +/// +/// use std::sync::mpmc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// +/// // Spawn off an expensive computation +/// thread::spawn(move || { +/// # fn expensive_computation() {} +/// sender.send(expensive_computation()).unwrap(); +/// }); +/// +/// // Do some useful work for awhile +/// +/// // Let's see what that answer was +/// println!("{:?}", receiver.recv().unwrap()); +/// ``` +#[must_use] +#[unstable(feature = "mpmc_channel", issue = "126840")] pub fn channel<T>() -> (Sender<T>, Receiver<T>) { let (s, r) = counter::new(list::Channel::new()); let s = Sender { flavor: SenderFlavor::List(s) }; @@ -56,12 +200,50 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) { (s, r) } -/// Creates a channel of bounded capacity. +/// Creates a new synchronous, bounded channel. +/// All data sent on the [`Sender`] will become available on the [`Receiver`] +/// in the same order as it was sent. Like asynchronous [`channel`]s, the +/// [`Receiver`] will block until a message becomes available. `sync_channel` +/// differs greatly in the semantics of the sender, however. +/// +/// This channel has an internal buffer on which messages will be queued. +/// `bound` specifies the buffer size. When the internal buffer becomes full, +/// future sends will *block* waiting for the buffer to open up. Note that a +/// buffer size of 0 is valid, in which case this becomes "rendezvous channel" +/// where each [`send`] will not return until a [`recv`] is paired with it. +/// +/// The [`Sender`] can be cloned to [`send`] to the same channel multiple +/// times. The [`Receiver`] also can be cloned to have multi receivers. +/// +/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying +/// to [`send`] with the [`Sender`], the [`send`] method will return a +/// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying +/// to [`recv`], the [`recv`] method will return a [`RecvError`]. +/// +/// [`send`]: Sender::send +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ``` +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// let (sender, receiver) = sync_channel(1); /// -/// This channel has a buffer that can hold at most `cap` messages at a time. +/// // this returns immediately +/// sender.send(1).unwrap(); /// -/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and -/// receive operations must appear at the same time in order to pair up and pass the message over. +/// thread::spawn(move || { +/// // this will block until the previous message has been received +/// sender.send(2).unwrap(); +/// }); +/// +/// assert_eq!(receiver.recv().unwrap(), 1); +/// assert_eq!(receiver.recv().unwrap(), 2); +/// ``` +#[must_use] +#[unstable(feature = "mpmc_channel", issue = "126840")] pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) { if cap == 0 { let (s, r) = counter::new(zero::Channel::new()); @@ -76,7 +258,42 @@ pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) { } } -/// The sending side of a channel. +/// The sending-half of Rust's synchronous [`channel`] type. +/// +/// Messages can be sent through this channel with [`send`]. +/// +/// Note: all senders (the original and its clones) need to be dropped for the receiver +/// to stop blocking to receive messages with [`Receiver::recv`]. +/// +/// [`send`]: Sender::send +/// +/// # Examples +/// +/// ```rust +/// #![feature(mpmc_channel)] +/// +/// use std::sync::mpmc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// let sender2 = sender.clone(); +/// +/// // First thread owns sender +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// }); +/// +/// // Second thread owns sender2 +/// thread::spawn(move || { +/// sender2.send(2).unwrap(); +/// }); +/// +/// let msg = receiver.recv().unwrap(); +/// let msg2 = receiver.recv().unwrap(); +/// +/// assert_eq!(3, msg + msg2); +/// ``` +#[unstable(feature = "mpmc_channel", issue = "126840")] pub struct Sender<T> { flavor: SenderFlavor<T>, } @@ -93,10 +310,14 @@ enum SenderFlavor<T> { Zero(counter::Sender<zero::Channel<T>>), } +#[unstable(feature = "mpmc_channel", issue = "126840")] unsafe impl<T: Send> Send for Sender<T> {} +#[unstable(feature = "mpmc_channel", issue = "126840")] unsafe impl<T: Send> Sync for Sender<T> {} +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> UnwindSafe for Sender<T> {} +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> RefUnwindSafe for Sender<T> {} impl<T> Sender<T> { @@ -107,6 +328,19 @@ impl<T> Sender<T> { /// /// If called on a zero-capacity channel, this method will send the message only if there /// happens to be a receive operation on the other side of the channel at the same time. + /// + /// # Examples + /// + /// ```rust + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc::{channel, Receiver, Sender}; + /// + /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel(); + /// + /// assert!(sender.try_send(1).is_ok()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { match &self.flavor { SenderFlavor::Array(chan) => chan.try_send(msg), @@ -115,14 +349,36 @@ impl<T> Sender<T> { } } - /// Blocks the current thread until a message is sent or the channel is disconnected. + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. /// - /// If the channel is full and not disconnected, this call will block until the send operation - /// can proceed. If the channel becomes disconnected, this call will wake up and return an - /// error. The returned error contains the original message. + /// A successful send occurs when it is determined that the other end of + /// the channel has not hung up already. An unsuccessful send would be one + /// where the corresponding receiver has already been deallocated. Note + /// that a return value of [`Err`] means that the data will never be + /// received, but a return value of [`Ok`] does *not* mean that the data + /// will be received. It is possible for the corresponding receiver to + /// hang up immediately after this function returns [`Ok`]. /// - /// If called on a zero-capacity channel, this method will wait for a receive operation to - /// appear on the other side of the channel. + /// This method will never block the current thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc::channel; + /// + /// let (tx, rx) = channel(); + /// + /// // This send is always successful + /// tx.send(1).unwrap(); + /// + /// // This send will fail because the receiver is gone + /// drop(rx); + /// assert!(tx.send(1).is_err()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn send(&self, msg: T) -> Result<(), SendError<T>> { match &self.flavor { SenderFlavor::Array(chan) => chan.send(msg, None), @@ -136,10 +392,6 @@ impl<T> Sender<T> { } } -// The methods below are not used by `sync::mpsc`, but -// are useful and we'll likely want to expose them -// eventually -#[allow(unused)] impl<T> Sender<T> { /// Waits for a message to be sent into the channel, but only for a limited time. /// @@ -149,6 +401,20 @@ impl<T> Sender<T> { /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc::channel; + /// use std::time::Duration; + /// + /// let (tx, rx) = channel(); + /// + /// tx.send_timeout(1, Duration::from_millis(400)).unwrap(); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> { match Instant::now().checked_add(timeout) { Some(deadline) => self.send_deadline(msg, deadline), @@ -165,6 +431,21 @@ impl<T> Sender<T> { /// /// If called on a zero-capacity channel, this method will wait for a receive operation to /// appear on the other side of the channel. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc::channel; + /// use std::time::{Duration, Instant}; + /// + /// let (tx, rx) = channel(); + /// + /// let t = Instant::now() + Duration::from_millis(400); + /// tx.send_deadline(1, t).unwrap(); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { match &self.flavor { SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), @@ -176,6 +457,31 @@ impl<T> Sender<T> { /// Returns `true` if the channel is empty. /// /// Note: Zero-capacity channels are always empty. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, _recv) = mpmc::channel(); + /// + /// let tx1 = send.clone(); + /// let tx2 = send.clone(); + /// + /// assert!(tx1.is_empty()); + /// + /// let handle = thread::spawn(move || { + /// tx2.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert!(!tx1.is_empty()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn is_empty(&self) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.is_empty(), @@ -187,6 +493,29 @@ impl<T> Sender<T> { /// Returns `true` if the channel is full. /// /// Note: Zero-capacity channels are always full. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, _recv) = mpmc::sync_channel(1); + /// + /// let (tx1, tx2) = (send.clone(), send.clone()); + /// assert!(!tx1.is_full()); + /// + /// let handle = thread::spawn(move || { + /// tx2.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert!(tx1.is_full()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn is_full(&self) -> bool { match &self.flavor { SenderFlavor::Array(chan) => chan.is_full(), @@ -196,6 +525,29 @@ impl<T> Sender<T> { } /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, _recv) = mpmc::channel(); + /// let (tx1, tx2) = (send.clone(), send.clone()); + /// + /// assert_eq!(tx1.len(), 0); + /// + /// let handle = thread::spawn(move || { + /// tx2.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(tx1.len(), 1); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn len(&self) -> usize { match &self.flavor { SenderFlavor::Array(chan) => chan.len(), @@ -205,6 +557,29 @@ impl<T> Sender<T> { } /// If the channel is bounded, returns its capacity. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, _recv) = mpmc::sync_channel(3); + /// let (tx1, tx2) = (send.clone(), send.clone()); + /// + /// assert_eq!(tx1.capacity(), Some(3)); + /// + /// let handle = thread::spawn(move || { + /// tx2.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(tx1.capacity(), Some(3)); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn capacity(&self) -> Option<usize> { match &self.flavor { SenderFlavor::Array(chan) => chan.capacity(), @@ -214,6 +589,21 @@ impl<T> Sender<T> { } /// Returns `true` if senders belong to the same channel. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// + /// let (tx1, _) = mpmc::channel::<i32>(); + /// let (tx2, _) = mpmc::channel::<i32>(); + /// + /// assert!(tx1.same_channel(&tx1)); + /// assert!(!tx1.same_channel(&tx2)); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn same_channel(&self, other: &Sender<T>) -> bool { match (&self.flavor, &other.flavor) { (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, @@ -224,6 +614,7 @@ impl<T> Sender<T> { } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> Drop for Sender<T> { fn drop(&mut self) { unsafe { @@ -236,6 +627,7 @@ impl<T> Drop for Sender<T> { } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> Clone for Sender<T> { fn clone(&self) -> Self { let flavor = match &self.flavor { @@ -248,17 +640,216 @@ impl<T> Clone for Sender<T> { } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> fmt::Debug for Sender<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Sender { .. }") } } -/// The receiving side of a channel. +/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type. +/// Different threads can share this [`Sender`] by cloning it. +/// +/// Messages sent to the channel can be retrieved using [`recv`]. +/// +/// [`recv`]: Receiver::recv +/// +/// # Examples +/// +/// ```rust +/// #![feature(mpmc_channel)] +/// +/// use std::sync::mpmc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (send, recv) = channel(); +/// +/// let tx_thread = thread::spawn(move || { +/// send.send("Hello world!").unwrap(); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// send.send("Delayed for 2 seconds").unwrap(); +/// }); +/// +/// let (rx1, rx2) = (recv.clone(), recv.clone()); +/// let rx_thread_1 = thread::spawn(move || { +/// println!("{}", rx1.recv().unwrap()); // Received immediately +/// }); +/// let rx_thread_2 = thread::spawn(move || { +/// println!("{}", rx2.recv().unwrap()); // Received after 2 seconds +/// }); +/// +/// tx_thread.join().unwrap(); +/// rx_thread_1.join().unwrap(); +/// rx_thread_2.join().unwrap(); +/// ``` +#[unstable(feature = "mpmc_channel", issue = "126840")] pub struct Receiver<T> { flavor: ReceiverFlavor<T>, } +/// An iterator over messages on a [`Receiver`], created by [`iter`]. +/// +/// This iterator will block whenever [`next`] is called, +/// waiting for a new message, and [`None`] will be returned +/// when the corresponding channel has hung up. +/// +/// [`iter`]: Receiver::iter +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ```rust +/// #![feature(mpmc_channel)] +/// +/// use std::sync::mpmc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[unstable(feature = "mpmc_channel", issue = "126840")] +#[derive(Debug)] +pub struct Iter<'a, T: 'a> { + rx: &'a Receiver<T>, +} + +/// An iterator that attempts to yield all pending values for a [`Receiver`], +/// created by [`try_iter`]. +/// +/// [`None`] will be returned when there are no pending values remaining or +/// if the corresponding channel has hung up. +/// +/// This iterator will never block the caller in order to wait for data to +/// become available. Instead, it will return [`None`]. +/// +/// [`try_iter`]: Receiver::try_iter +/// +/// # Examples +/// +/// ```rust +/// #![feature(mpmc_channel)] +/// +/// use std::sync::mpmc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (sender, receiver) = channel(); +/// +/// // Nothing is in the buffer yet +/// assert!(receiver.try_iter().next().is_none()); +/// println!("Nothing in the buffer..."); +/// +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// sender.send(2).unwrap(); +/// sender.send(3).unwrap(); +/// }); +/// +/// println!("Going to sleep..."); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// +/// for x in receiver.try_iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[unstable(feature = "mpmc_channel", issue = "126840")] +#[derive(Debug)] +pub struct TryIter<'a, T: 'a> { + rx: &'a Receiver<T>, +} + +/// An owning iterator over messages on a [`Receiver`], +/// created by [`into_iter`]. +/// +/// This iterator will block whenever [`next`] +/// is called, waiting for a new message, and [`None`] will be +/// returned if the corresponding channel has hung up. +/// +/// [`into_iter`]: Receiver::into_iter +/// [`next`]: Iterator::next +/// +/// # Examples +/// +/// ```rust +/// #![feature(mpmc_channel)] +/// +/// use std::sync::mpmc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.into_iter() { +/// println!("Got: {x}"); +/// } +/// ``` +#[unstable(feature = "mpmc_channel", issue = "126840")] +#[derive(Debug)] +pub struct IntoIter<T> { + rx: Receiver<T>, +} + +#[unstable(feature = "mpmc_channel", issue = "126840")] +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } +} + +#[unstable(feature = "mpmc_channel", issue = "126840")] +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.rx.try_recv().ok() + } +} + +#[unstable(feature = "mpmc_channel", issue = "126840")] +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } +} + +#[unstable(feature = "mpmc_channel", issue = "126840")] +impl<T> Iterator for IntoIter<T> { + type Item = T; + fn next(&mut self) -> Option<T> { + self.rx.recv().ok() + } +} + +#[unstable(feature = "mpmc_channel", issue = "126840")] +impl<T> IntoIterator for Receiver<T> { + type Item = T; + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> IntoIter<T> { + IntoIter { rx: self } + } +} + /// Receiver flavors. enum ReceiverFlavor<T> { /// Bounded channel based on a preallocated array. @@ -271,20 +862,46 @@ enum ReceiverFlavor<T> { Zero(counter::Receiver<zero::Channel<T>>), } +#[unstable(feature = "mpmc_channel", issue = "126840")] unsafe impl<T: Send> Send for Receiver<T> {} +#[unstable(feature = "mpmc_channel", issue = "126840")] unsafe impl<T: Send> Sync for Receiver<T> {} +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> UnwindSafe for Receiver<T> {} +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> RefUnwindSafe for Receiver<T> {} impl<T> Receiver<T> { /// Attempts to receive a message from the channel without blocking. /// - /// This method will either receive a message from the channel immediately or return an error - /// if the channel is empty. + /// This method will never block the caller in order to wait for data to + /// become available. Instead, this will always return immediately with a + /// possible option of pending data on the channel. /// /// If called on a zero-capacity channel, this method will receive a message only if there /// happens to be a send operation on the other side of the channel at the same time. + /// + /// This is useful for a flavor of "optimistic check" before deciding to + /// block on a receiver. + /// + /// Compared with [`recv`], this function has two failure cases instead of one + /// (one for disconnection, one for an empty buffer). + /// + /// [`recv`]: Self::recv + /// + /// # Examples + /// + /// ```rust + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc::{Receiver, channel}; + /// + /// let (_, receiver): (_, Receiver<i32>) = channel(); + /// + /// assert!(receiver.try_recv().is_err()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn try_recv(&self) -> Result<T, TryRecvError> { match &self.flavor { ReceiverFlavor::Array(chan) => chan.try_recv(), @@ -293,15 +910,64 @@ impl<T> Receiver<T> { } } - /// Blocks the current thread until a message is received or the channel is empty and - /// disconnected. + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up. /// - /// If the channel is empty and not disconnected, this call will block until the receive - /// operation can proceed. If the channel is empty and becomes disconnected, this call will - /// wake up and return an error. + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent (at least one sender + /// still exists). Once a message is sent to the corresponding [`Sender`], + /// this receiver will wake up and return that message. /// - /// If called on a zero-capacity channel, this method will wait for a send operation to appear - /// on the other side of the channel. + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, recv) = mpmc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// ``` + /// + /// Buffering behavior: + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// use std::sync::mpmc::RecvError; + /// + /// let (send, recv) = mpmc::channel(); + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// drop(send); + /// }); + /// + /// // wait for the thread to join so we ensure the sender is dropped + /// handle.join().unwrap(); + /// + /// assert_eq!(Ok(1), recv.recv()); + /// assert_eq!(Ok(2), recv.recv()); + /// assert_eq!(Ok(3), recv.recv()); + /// assert_eq!(Err(RecvError), recv.recv()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn recv(&self) -> Result<T, RecvError> { match &self.flavor { ReceiverFlavor::Array(chan) => chan.recv(None), @@ -311,14 +977,65 @@ impl<T> Receiver<T> { .map_err(|_| RecvError) } - /// Waits for a message to be received from the channel, but only for a limited time. + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if it waits more than `timeout`. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent (at least one sender + /// still exists). Once a message is sent to the corresponding [`Sender`], + /// this receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// Successfully receiving value before encountering timeout: /// - /// If the channel is empty and not disconnected, this call will block until the receive - /// operation can proceed or the operation times out. If the channel is empty and becomes - /// disconnected, this call will wake up and return an error. + /// ```no_run + /// #![feature(mpmc_channel)] /// - /// If called on a zero-capacity channel, this method will wait for a send operation to appear - /// on the other side of the channel. + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpmc; + /// + /// let (send, recv) = mpmc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching timeout: + /// + /// ```no_run + /// #![feature(mpmc_channel)] + /// + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpmc; + /// + /// let (send, recv) = mpmc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Err(mpmc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { match Instant::now().checked_add(timeout) { Some(deadline) => self.recv_deadline(deadline), @@ -327,14 +1044,65 @@ impl<T> Receiver<T> { } } - /// Waits for a message to be received from the channel, but only for a limited time. + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if `deadline` is reached. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`], then this receiver will wake up + /// and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// # Examples + /// + /// Successfully receiving value before reaching deadline: + /// + /// ```no_run + /// #![feature(mpmc_channel)] + /// + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpmc; /// - /// If the channel is empty and not disconnected, this call will block until the receive - /// operation can proceed or the operation times out. If the channel is empty and becomes - /// disconnected, this call will wake up and return an error. + /// let (send, recv) = mpmc::channel(); /// - /// If called on a zero-capacity channel, this method will wait for a send operation to appear - /// on the other side of the channel. + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching deadline: + /// + /// ```no_run + /// #![feature(mpmc_channel)] + /// + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpmc; + /// + /// let (send, recv) = mpmc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Err(mpmc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { match &self.flavor { ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), @@ -342,16 +1110,77 @@ impl<T> Receiver<T> { ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), } } + + /// Returns an iterator that will attempt to yield all pending values. + /// It will return `None` if there are no more pending values or if the + /// channel has hung up. The iterator will never [`panic!`] or block the + /// user by waiting for values. + /// + /// # Examples + /// + /// ```no_run + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc::channel; + /// use std::thread; + /// use std::time::Duration; + /// + /// let (sender, receiver) = channel(); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// sender.send(1).unwrap(); + /// sender.send(2).unwrap(); + /// sender.send(3).unwrap(); + /// }); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// // block for two seconds + /// thread::sleep(Duration::from_secs(2)); + /// + /// let mut iter = receiver.try_iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] + pub fn try_iter(&self) -> TryIter<'_, T> { + TryIter { rx: self } + } } -// The methods below are not used by `sync::mpsc`, but -// are useful and we'll likely want to expose them -// eventually -#[allow(unused)] impl<T> Receiver<T> { /// Returns `true` if the channel is empty. /// /// Note: Zero-capacity channels are always empty. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, recv) = mpmc::channel(); + /// + /// assert!(recv.is_empty()); + /// + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert!(!recv.is_empty()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn is_empty(&self) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.is_empty(), @@ -363,6 +1192,28 @@ impl<T> Receiver<T> { /// Returns `true` if the channel is full. /// /// Note: Zero-capacity channels are always full. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, recv) = mpmc::sync_channel(1); + /// + /// assert!(!recv.is_full()); + /// + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert!(recv.is_full()); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn is_full(&self) -> bool { match &self.flavor { ReceiverFlavor::Array(chan) => chan.is_full(), @@ -372,6 +1223,28 @@ impl<T> Receiver<T> { } /// Returns the number of messages in the channel. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, recv) = mpmc::channel(); + /// + /// assert_eq!(recv.len(), 0); + /// + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(recv.len(), 1); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn len(&self) -> usize { match &self.flavor { ReceiverFlavor::Array(chan) => chan.len(), @@ -381,6 +1254,28 @@ impl<T> Receiver<T> { } /// If the channel is bounded, returns its capacity. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// use std::thread; + /// + /// let (send, recv) = mpmc::sync_channel(3); + /// + /// assert_eq!(recv.capacity(), Some(3)); + /// + /// let handle = thread::spawn(move || { + /// send.send(1u8).unwrap(); + /// }); + /// + /// handle.join().unwrap(); + /// + /// assert_eq!(recv.capacity(), Some(3)); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn capacity(&self) -> Option<usize> { match &self.flavor { ReceiverFlavor::Array(chan) => chan.capacity(), @@ -390,6 +1285,21 @@ impl<T> Receiver<T> { } /// Returns `true` if receivers belong to the same channel. + /// + /// # Examples + /// + /// ``` + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc; + /// + /// let (_, rx1) = mpmc::channel::<i32>(); + /// let (_, rx2) = mpmc::channel::<i32>(); + /// + /// assert!(rx1.same_channel(&rx1)); + /// assert!(!rx1.same_channel(&rx2)); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] pub fn same_channel(&self, other: &Receiver<T>) -> bool { match (&self.flavor, &other.flavor) { (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, @@ -398,8 +1308,39 @@ impl<T> Receiver<T> { _ => false, } } + + /// Returns an iterator that will block waiting for messages, but never + /// [`panic!`]. It will return [`None`] when the channel has hung up. + /// + /// # Examples + /// + /// ```rust + /// #![feature(mpmc_channel)] + /// + /// use std::sync::mpmc::channel; + /// use std::thread; + /// + /// let (send, recv) = channel(); + /// + /// thread::spawn(move || { + /// send.send(1).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// }); + /// + /// let mut iter = recv.iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` + #[unstable(feature = "mpmc_channel", issue = "126840")] + pub fn iter(&self) -> Iter<'_, T> { + Iter { rx: self } + } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> Drop for Receiver<T> { fn drop(&mut self) { unsafe { @@ -412,6 +1353,7 @@ impl<T> Drop for Receiver<T> { } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> Clone for Receiver<T> { fn clone(&self) -> Self { let flavor = match &self.flavor { @@ -424,6 +1366,7 @@ impl<T> Clone for Receiver<T> { } } +#[unstable(feature = "mpmc_channel", issue = "126840")] impl<T> fmt::Debug for Receiver<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Receiver { .. }") diff --git a/library/std/src/sync/mpmc/tests.rs b/library/std/src/sync/mpmc/tests.rs new file mode 100644 index 00000000000..ab14050df6c --- /dev/null +++ b/library/std/src/sync/mpmc/tests.rs @@ -0,0 +1,728 @@ +use super::*; +use crate::{env, thread}; + +pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } +} + +#[test] +fn smoke() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn drop_full() { + let (tx, _rx) = channel::<Box<isize>>(); + tx.send(Box::new(1)).unwrap(); +} + +#[test] +fn drop_full_shared() { + let (tx, _rx) = channel::<Box<isize>>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(Box::new(1)).unwrap(); +} + +#[test] +fn smoke_shared() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn smoke_threads() { + let (tx, rx) = channel::<i32>(); + let t1 = thread::spawn(move || { + for i in 0..2 { + tx.send(i).unwrap(); + } + }); + let t2 = thread::spawn(move || { + assert_eq!(rx.recv().unwrap(), 0); + assert_eq!(rx.recv().unwrap(), 1); + }); + t1.join().unwrap(); + t2.join().unwrap(); +} + +#[test] +fn smoke_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()); +} + +#[test] +fn smoke_shared_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()) +} + +#[test] +fn smoke_shared_port_gone2() { + let (tx, rx) = channel::<i32>(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); +} + +#[test] +fn port_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} +} + +#[test] +fn port_gone_concurrent_shared() { + let (tx, rx) = channel::<i32>(); + let tx2 = tx.clone(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} +} + +#[test] +fn smoke_chan_gone() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); +} + +#[test] +fn chan_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} +} + +#[test] +fn stress() { + let count = if cfg!(miri) { 100 } else { 10000 }; + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..count { + tx.send(1).unwrap(); + } + }); + for _ in 0..count { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().expect("thread panicked"); +} + +#[test] +fn stress_shared() { + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; + const NTHREADS: u32 = 8; + let (tx, rx) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + match rx.try_recv() { + Ok(..) => panic!(), + _ => {} + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + } + drop(tx); + t.join().ok().expect("thread panicked"); +} + +#[test] +fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + tx1.send(()).unwrap(); + for _ in 0..40 { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = thread::spawn(move || { + for _ in 0..40 { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); +} + +#[test] +fn recv_from_outside_runtime() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..40 { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in 0..40 { + tx.send(1).unwrap(); + } + t.join().ok().expect("thread panicked"); +} + +#[test] +fn no_runtime() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = thread::spawn(move || { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().expect("thread panicked"); + t2.join().ok().expect("thread panicked"); +} + +#[test] +fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::<i32>(); + drop(rx); +} + +#[test] +fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::<i32>(); + drop(tx); +} + +#[test] +fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::<Box<i32>>(); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); +} + +#[test] +fn oneshot_single_thread_recv_chan_close() { + // Receiving on a closed chan will panic + let res = thread::spawn(move || { + let (tx, rx) = channel::<i32>(); + drop(tx); + rx.recv().unwrap(); + }) + .join(); + // What is our res? + assert!(res.is_err()); +} + +#[test] +fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::<Box<i32>>(); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::<i32>(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); +} + +#[test] +fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(10).is_err()); +} + +#[test] +fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::<i32>(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); +} + +#[test] +fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); +} + +#[test] +fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::<Box<i32>>(); + let _t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); +} + +#[test] +fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::<Box<i32>>(); + let _t = thread::spawn(move || { + drop(tx); + }); + let res = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }) + .join(); + assert!(res.is_err()); +} + +#[test] +fn oneshot_multi_thread_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + drop(rx); + }); + drop(tx); + } +} + +#[test] +fn oneshot_multi_thread_send_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + let _t = thread::spawn(move || { + drop(rx); + }); + let _ = thread::spawn(move || { + tx.send(1).unwrap(); + }) + .join(); + } +} + +#[test] +fn oneshot_multi_thread_recv_close_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<i32>(); + thread::spawn(move || { + let res = thread::spawn(move || { + rx.recv().unwrap(); + }) + .join(); + assert!(res.is_err()); + }); + let _t = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + } +} + +#[test] +fn oneshot_multi_thread_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel::<Box<isize>>(); + let _t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + assert!(*rx.recv().unwrap() == 10); + } +} + +#[test] +fn stream_send_recv_stress() { + for _ in 0..stress_factor() { + let (tx, rx) = channel(); + + send(tx, 0); + recv(rx, 0); + + fn send(tx: Sender<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + }); + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) { + if i == 10 { + return; + } + + thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + }); + } + } +} + +#[test] +fn oneshot_single_thread_recv_timeout() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); +} + +#[test] +fn stress_recv_timeout_two_threads() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + let timeout = Duration::from_millis(100); + + thread::spawn(move || { + for i in 0..stress { + if i % 2 == 0 { + thread::sleep(timeout * 2); + } + tx.send(1usize).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(timeout) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); +} + +#[test] +fn recv_timeout_upgrade() { + let (tx, rx) = channel::<()>(); + let timeout = Duration::from_millis(1); + let _tx_clone = tx.clone(); + + let start = Instant::now(); + assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); + assert!(Instant::now() >= start + timeout); +} + +#[test] +fn stress_recv_timeout_shared() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + + for i in 0..stress { + let tx = tx.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(i as u64 * 10)); + tx.send(1usize).unwrap(); + }); + } + + drop(tx); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); +} + +#[test] +fn very_long_recv_timeout_wont_panic() { + let (tx, rx) = channel::<()>(); + let join_handle = thread::spawn(move || rx.recv_timeout(Duration::from_secs(u64::MAX))); + thread::sleep(Duration::from_secs(1)); + assert!(tx.send(()).is_ok()); + assert_eq!(join_handle.join().unwrap(), Ok(())); +} + +#[test] +fn recv_a_lot() { + let count = if cfg!(miri) { 1000 } else { 10000 }; + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in 0..count { + tx.send(()).unwrap(); + } + for _ in 0..count { + rx.recv().unwrap(); + } +} + +#[test] +fn shared_recv_timeout() { + let (tx, rx) = channel(); + let total = 5; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout)); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); +} + +#[test] +fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + for _ in 0..total { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + } + + for _ in 0..total { + rx.recv().unwrap(); + } +} + +#[test] +fn test_nested_recv_iter() { + let (tx, rx) = channel::<i32>(); + let (total_tx, total_rx) = channel::<i32>(); + + let _t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); +} + +#[test] +fn test_recv_iter_break() { + let (tx, rx) = channel::<i32>(); + let (count_tx, count_rx) = channel(); + + let _t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); +} + +#[test] +fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move || { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + if response_tx.send(2).is_err() { + break; + } + } + + assert_eq!(t.join().unwrap(), 6); +} + +#[test] +fn test_recv_into_iter_owned() { + let mut iter = { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + rx.into_iter() + }; + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} + +#[test] +fn test_recv_into_iter_borrowed() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + let mut iter = (&rx).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert_eq!(iter.next().is_none(), true); +} + +#[test] +fn try_recv_states() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let _t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); +} + +// This bug used to end up in a livelock inside of the Receiver destructor +// because the internal state of the Shared packet was corrupted +#[test] +fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let _t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let t = tx.clone(); + drop(tx); + t.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); +} + +#[test] +fn issue_32114() { + let (tx, _) = channel(); + let _ = tx.send(123); + assert_eq!(tx.send(123), Err(SendError(123))); +} + +#[test] +fn issue_39364() { + let (tx, rx) = channel::<()>(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(300)); + let _ = tx.clone(); + // Don't drop; hand back to caller. + tx + }); + + let _ = rx.recv_timeout(Duration::from_millis(500)); + let _tx = t.join().unwrap(); // delay dropping until end of test + let _ = rx.recv_timeout(Duration::from_millis(500)); +} diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs index 2451d7b79d1..446881291e6 100644 --- a/library/std/src/sync/mpmc/zero.rs +++ b/library/std/src/sync/mpmc/zero.rs @@ -185,11 +185,7 @@ impl<T> Channel<T> { // Prepare for blocking until a receiver wakes us up. let oper = Operation::hook(token); let mut packet = Packet::<T>::message_on_stack(msg); - inner.senders.register_with_packet( - oper, - core::ptr::addr_of_mut!(packet) as *mut (), - cx, - ); + inner.senders.register_with_packet(oper, (&raw mut packet) as *mut (), cx); inner.receivers.notify(); drop(inner); @@ -256,11 +252,7 @@ impl<T> Channel<T> { // Prepare for blocking until a sender wakes us up. let oper = Operation::hook(token); let mut packet = Packet::<T>::empty_on_stack(); - inner.receivers.register_with_packet( - oper, - core::ptr::addr_of_mut!(packet) as *mut (), - cx, - ); + inner.receivers.register_with_packet(oper, (&raw mut packet) as *mut (), cx); inner.senders.notify(); drop(inner); diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs index 26d5b9515a2..83a93a06369 100644 --- a/library/std/src/sync/mpsc/mod.rs +++ b/library/std/src/sync/mpsc/mod.rs @@ -137,10 +137,10 @@ #![stable(feature = "rust1", since = "1.0.0")] -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod tests; -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod sync_tests; // MPSC channels are built as a wrapper around MPMC channels, which diff --git a/library/std/src/sync/mutex.rs b/library/std/src/sync/mutex.rs index f3de1f7bf49..fe2aca031a2 100644 --- a/library/std/src/sync/mutex.rs +++ b/library/std/src/sync/mutex.rs @@ -1,4 +1,4 @@ -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod tests; use crate::cell::UnsafeCell; diff --git a/library/std/src/sync/once.rs b/library/std/src/sync/once.rs index 5a1cd7d0b8b..993df9314fc 100644 --- a/library/std/src/sync/once.rs +++ b/library/std/src/sync/once.rs @@ -3,7 +3,7 @@ //! This primitive is meant to be used to run one-time initialization. An //! example use case would be for initializing an FFI library. -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod tests; use crate::fmt; diff --git a/library/std/src/sync/once_lock/tests.rs b/library/std/src/sync/once_lock/tests.rs index 1fff3273d20..5113d436c3c 100644 --- a/library/std/src/sync/once_lock/tests.rs +++ b/library/std/src/sync/once_lock/tests.rs @@ -9,7 +9,7 @@ fn spawn_and_wait<R: Send + 'static>(f: impl FnOnce() -> R + Send + 'static) -> } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn sync_once_cell() { static ONCE_CELL: OnceLock<i32> = OnceLock::new(); @@ -43,7 +43,7 @@ fn sync_once_cell_get_unchecked() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn sync_once_cell_drop() { static DROP_CNT: AtomicUsize = AtomicUsize::new(0); struct Dropper; @@ -81,6 +81,7 @@ fn clone() { } #[test] +#[cfg_attr(not(panic = "unwind"), ignore = "test requires unwinding support")] fn get_or_try_init() { let cell: OnceLock<String> = OnceLock::new(); assert!(cell.get().is_none()); @@ -154,7 +155,7 @@ fn eval_once_macro() { } #[test] -#[cfg_attr(target_os = "emscripten", ignore)] +#[cfg_attr(any(target_os = "emscripten", target_os = "wasi"), ignore)] // no threads fn sync_once_cell_does_not_leak_partially_constructed_boxes() { static ONCE_CELL: OnceLock<String> = OnceLock::new(); diff --git a/library/std/src/sync/reentrant_lock.rs b/library/std/src/sync/reentrant_lock.rs index 39f23a14441..0140e0d2129 100644 --- a/library/std/src/sync/reentrant_lock.rs +++ b/library/std/src/sync/reentrant_lock.rs @@ -1,4 +1,4 @@ -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod tests; use cfg_if::cfg_if; diff --git a/library/std/src/sync/rwlock.rs b/library/std/src/sync/rwlock.rs index 143bdef736d..da2da6f9dfc 100644 --- a/library/std/src/sync/rwlock.rs +++ b/library/std/src/sync/rwlock.rs @@ -1,4 +1,4 @@ -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod tests; use crate::cell::UnsafeCell; diff --git a/library/std/src/sys/dbg.rs b/library/std/src/sys/dbg.rs deleted file mode 100644 index 7266a739e78..00000000000 --- a/library/std/src/sys/dbg.rs +++ /dev/null @@ -1,152 +0,0 @@ -//! Debugging aids. - -/// Presence of a debugger. The debugger being concerned -/// is expected to use the OS API to debug this process. -#[derive(Copy, Clone, Debug)] -#[allow(unused)] -pub(crate) enum DebuggerPresence { - /// The debugger is attached to this process. - Detected, - /// The debugger is not attached to this process. - NotDetected, -} - -#[cfg(target_os = "windows")] -mod os { - use super::DebuggerPresence; - - #[link(name = "kernel32")] - extern "system" { - fn IsDebuggerPresent() -> i32; - } - - pub(super) fn is_debugger_present() -> Option<DebuggerPresence> { - // SAFETY: No state is shared between threads. The call reads - // a field from the Thread Environment Block using the OS API - // as required by the documentation. - if unsafe { IsDebuggerPresent() } != 0 { - Some(DebuggerPresence::Detected) - } else { - Some(DebuggerPresence::NotDetected) - } - } -} - -#[cfg(any(target_vendor = "apple", target_os = "freebsd"))] -mod os { - use libc::{CTL_KERN, KERN_PROC, KERN_PROC_PID, c_int, sysctl}; - - use super::DebuggerPresence; - use crate::io::{Cursor, Read, Seek, SeekFrom}; - use crate::process; - - const P_TRACED: i32 = 0x00000800; - - // The assumption is that the kernel structures available to the - // user space may not shrink or repurpose the existing fields over - // time. The kernels normally adhere to that for the backward - // compatibility of the user space. - - // The macOS 14.5 SDK comes with a header `MacOSX14.5.sdk/usr/include/sys/sysctl.h` - // that defines `struct kinfo_proc` be of `648` bytes on the 64-bit system. That has - // not changed since macOS 10.13 (released in 2017) at least, validated by building - // a C program in XCode while changing the build target. Apple provides this example - // for reference: https://developer.apple.com/library/archive/qa/qa1361/_index.html. - #[cfg(target_vendor = "apple")] - const KINFO_PROC_SIZE: usize = if cfg!(target_pointer_width = "64") { 648 } else { 492 }; - #[cfg(target_vendor = "apple")] - const KINFO_PROC_FLAGS_OFFSET: u64 = if cfg!(target_pointer_width = "64") { 32 } else { 16 }; - - // Works for FreeBSD stable (13.3, 13.4) and current (14.0, 14.1). - // The size of the structure has stayed the same for a long time, - // at least since 2005: - // https://lists.freebsd.org/pipermail/freebsd-stable/2005-November/019899.html - #[cfg(target_os = "freebsd")] - const KINFO_PROC_SIZE: usize = if cfg!(target_pointer_width = "64") { 1088 } else { 768 }; - #[cfg(target_os = "freebsd")] - const KINFO_PROC_FLAGS_OFFSET: u64 = if cfg!(target_pointer_width = "64") { 368 } else { 296 }; - - pub(super) fn is_debugger_present() -> Option<DebuggerPresence> { - debug_assert_ne!(KINFO_PROC_SIZE, 0); - - let mut flags = [0u8; 4]; // `ki_flag` under FreeBSD and `p_flag` under macOS. - let mut mib = [CTL_KERN, KERN_PROC, KERN_PROC_PID, process::id() as c_int]; - let mut info_size = KINFO_PROC_SIZE; - let mut kinfo_proc = [0u8; KINFO_PROC_SIZE]; - - // SAFETY: No state is shared with other threads. The sysctl call - // is safe according to the documentation. - if unsafe { - sysctl( - mib.as_mut_ptr(), - mib.len() as u32, - kinfo_proc.as_mut_ptr().cast(), - &mut info_size, - core::ptr::null_mut(), - 0, - ) - } != 0 - { - return None; - } - debug_assert_eq!(info_size, KINFO_PROC_SIZE); - - let mut reader = Cursor::new(kinfo_proc); - reader.seek(SeekFrom::Start(KINFO_PROC_FLAGS_OFFSET)).ok()?; - reader.read_exact(&mut flags).ok()?; - // Just in case, not limiting this to the little-endian systems. - let flags = i32::from_ne_bytes(flags); - - if flags & P_TRACED != 0 { - Some(DebuggerPresence::Detected) - } else { - Some(DebuggerPresence::NotDetected) - } - } -} - -#[cfg(not(any(target_os = "windows", target_vendor = "apple", target_os = "freebsd")))] -mod os { - pub(super) fn is_debugger_present() -> Option<super::DebuggerPresence> { - None - } -} - -/// Detect the debugger presence. -/// -/// The code does not try to detect the debugger at all costs (e.g., when anti-debugger -/// tricks are at play), it relies on the interfaces provided by the OS. -/// -/// Return value: -/// * `None`: it's not possible to conclude whether the debugger is attached to this -/// process or not. When checking for the presence of the debugger, the detection logic -/// encountered an issue, such as the OS API throwing an error or the feature not being -/// implemented. -/// * `Some(DebuggerPresence::Detected)`: yes, the debugger is attached -/// to this process. -/// * `Some(DebuggerPresence::NotDetected)`: no, the debugger is not -/// attached to this process. -pub(crate) fn is_debugger_present() -> Option<DebuggerPresence> { - if cfg!(miri) { None } else { os::is_debugger_present() } -} - -/// Execute the breakpoint instruction if the debugger presence is detected. -/// Useful for breaking into the debugger without the need to set a breakpoint -/// in the debugger. -/// -/// Note that there is a race between attaching or detaching the debugger, and running the -/// breakpoint instruction. This is nonetheless memory-safe, like [`crate::process::abort`] -/// is. In case the debugger is attached and the function is about -/// to run the breakpoint instruction yet right before that the debugger detaches, the -/// process will crash due to running the breakpoint instruction and the debugger not -/// handling the trap exception. -pub(crate) fn breakpoint_if_debugging() -> Option<DebuggerPresence> { - let debugger_present = is_debugger_present(); - if let Some(DebuggerPresence::Detected) = debugger_present { - // SAFETY: Executing the breakpoint instruction. No state is shared - // or modified by this code. - unsafe { core::intrinsics::breakpoint() }; - } - - debugger_present -} diff --git a/library/std/src/sys/mod.rs b/library/std/src/sys/mod.rs index df25b84fbbe..f17dd47dece 100644 --- a/library/std/src/sys/mod.rs +++ b/library/std/src/sys/mod.rs @@ -11,7 +11,6 @@ mod personality; pub mod anonymous_pipe; pub mod backtrace; pub mod cmath; -pub mod dbg; pub mod exit_guard; pub mod os_str; pub mod path; diff --git a/library/std/src/sys/os_str/bytes.rs b/library/std/src/sys/os_str/bytes.rs index 992767211d0..8e0609fe48c 100644 --- a/library/std/src/sys/os_str/bytes.rs +++ b/library/std/src/sys/os_str/bytes.rs @@ -2,7 +2,6 @@ //! systems: just a `Vec<u8>`/`[u8]`. use core::clone::CloneToUninit; -use core::ptr::addr_of_mut; use crate::borrow::Cow; use crate::collections::TryReserveError; @@ -355,6 +354,6 @@ unsafe impl CloneToUninit for Slice { #[cfg_attr(debug_assertions, track_caller)] unsafe fn clone_to_uninit(&self, dst: *mut Self) { // SAFETY: we're just a wrapper around [u8] - unsafe { self.inner.clone_to_uninit(addr_of_mut!((*dst).inner)) } + unsafe { self.inner.clone_to_uninit(&raw mut (*dst).inner) } } } diff --git a/library/std/src/sys/os_str/wtf8.rs b/library/std/src/sys/os_str/wtf8.rs index 6fbbec7a945..b3834388df6 100644 --- a/library/std/src/sys/os_str/wtf8.rs +++ b/library/std/src/sys/os_str/wtf8.rs @@ -1,7 +1,6 @@ //! The underlying OsString/OsStr implementation on Windows is a //! wrapper around the "WTF-8" encoding; see the `wtf8` module for more. use core::clone::CloneToUninit; -use core::ptr::addr_of_mut; use crate::borrow::Cow; use crate::collections::TryReserveError; @@ -278,6 +277,6 @@ unsafe impl CloneToUninit for Slice { #[cfg_attr(debug_assertions, track_caller)] unsafe fn clone_to_uninit(&self, dst: *mut Self) { // SAFETY: we're just a wrapper around Wtf8 - unsafe { self.inner.clone_to_uninit(addr_of_mut!((*dst).inner)) } + unsafe { self.inner.clone_to_uninit(&raw mut (*dst).inner) } } } diff --git a/library/std/src/sys/pal/hermit/net.rs b/library/std/src/sys/pal/hermit/net.rs index 416469c0037..d9baa091a23 100644 --- a/library/std/src/sys/pal/hermit/net.rs +++ b/library/std/src/sys/pal/hermit/net.rs @@ -192,7 +192,7 @@ impl Socket { buf.as_mut_ptr(), buf.len(), flags, - core::ptr::addr_of_mut!(storage) as *mut _, + (&raw mut storage) as *mut _, &mut addrlen, ) })?; @@ -298,7 +298,7 @@ impl Socket { netc::ioctl( self.as_raw_fd(), netc::FIONBIO, - core::ptr::addr_of_mut!(nonblocking) as *mut core::ffi::c_void, + (&raw mut nonblocking) as *mut core::ffi::c_void, ) }) .map(drop) diff --git a/library/std/src/sys/pal/hermit/time.rs b/library/std/src/sys/pal/hermit/time.rs index 99166b15602..e0b6eb76b03 100644 --- a/library/std/src/sys/pal/hermit/time.rs +++ b/library/std/src/sys/pal/hermit/time.rs @@ -107,8 +107,7 @@ pub struct Instant(Timespec); impl Instant { pub fn now() -> Instant { let mut time: Timespec = Timespec::zero(); - let _ = - unsafe { hermit_abi::clock_gettime(CLOCK_MONOTONIC, core::ptr::addr_of_mut!(time.t)) }; + let _ = unsafe { hermit_abi::clock_gettime(CLOCK_MONOTONIC, &raw mut time.t) }; Instant(time) } @@ -209,8 +208,7 @@ impl SystemTime { pub fn now() -> SystemTime { let mut time: Timespec = Timespec::zero(); - let _ = - unsafe { hermit_abi::clock_gettime(CLOCK_REALTIME, core::ptr::addr_of_mut!(time.t)) }; + let _ = unsafe { hermit_abi::clock_gettime(CLOCK_REALTIME, &raw mut time.t) }; SystemTime(time) } diff --git a/library/std/src/sys/pal/unix/fs.rs b/library/std/src/sys/pal/unix/fs.rs index 86342c2add0..39aabf0b2d6 100644 --- a/library/std/src/sys/pal/unix/fs.rs +++ b/library/std/src/sys/pal/unix/fs.rs @@ -740,7 +740,7 @@ impl Iterator for ReadDir { // // Like for uninitialized contents, converting entry_ptr to `&dirent64` // would not be legal. However, unique to dirent64 is that we don't even - // get to use `addr_of!((*entry_ptr).d_name)` because that operation + // get to use `&raw const (*entry_ptr).d_name` because that operation // requires the full extent of *entry_ptr to be in bounds of the same // allocation, which is not necessarily the case here. // @@ -754,7 +754,7 @@ impl Iterator for ReadDir { } else { #[allow(deref_nullptr)] { - ptr::addr_of!((*ptr::null::<dirent64>()).$field) + &raw const (*ptr::null::<dirent64>()).$field } } }}; @@ -1385,7 +1385,7 @@ impl File { } cvt(unsafe { libc::fsetattrlist( self.as_raw_fd(), - core::ptr::addr_of!(attrlist).cast::<libc::c_void>().cast_mut(), + (&raw const attrlist).cast::<libc::c_void>().cast_mut(), buf.as_ptr().cast::<libc::c_void>().cast_mut(), num_times * mem::size_of::<libc::timespec>(), 0 @@ -1944,7 +1944,7 @@ pub fn copy(from: &Path, to: &Path) -> io::Result<u64> { libc::copyfile_state_get( state.0, libc::COPYFILE_STATE_COPIED as u32, - core::ptr::addr_of_mut!(bytes_copied) as *mut libc::c_void, + (&raw mut bytes_copied) as *mut libc::c_void, ) })?; Ok(bytes_copied as u64) diff --git a/library/std/src/sys/pal/unix/net.rs b/library/std/src/sys/pal/unix/net.rs index 0f2e015bbcd..6a67bb0a101 100644 --- a/library/std/src/sys/pal/unix/net.rs +++ b/library/std/src/sys/pal/unix/net.rs @@ -329,7 +329,7 @@ impl Socket { buf.as_mut_ptr() as *mut c_void, buf.len(), flags, - core::ptr::addr_of_mut!(storage) as *mut _, + (&raw mut storage) as *mut _, &mut addrlen, ) })?; diff --git a/library/std/src/sys/pal/unix/os.rs b/library/std/src/sys/pal/unix/os.rs index d99bde2f9a5..f983d174ed6 100644 --- a/library/std/src/sys/pal/unix/os.rs +++ b/library/std/src/sys/pal/unix/os.rs @@ -612,7 +612,7 @@ pub unsafe fn environ() -> *mut *const *const c_char { extern "C" { static mut environ: *const *const c_char; } - ptr::addr_of_mut!(environ) + &raw mut environ } static ENV_LOCK: RwLock<()> = RwLock::new(()); diff --git a/library/std/src/sys/pal/unix/process/process_fuchsia.rs b/library/std/src/sys/pal/unix/process/process_fuchsia.rs index 34ff464aa37..5d0110cf55d 100644 --- a/library/std/src/sys/pal/unix/process/process_fuchsia.rs +++ b/library/std/src/sys/pal/unix/process/process_fuchsia.rs @@ -178,7 +178,7 @@ impl Process { zx_cvt(zx_object_get_info( self.handle.raw(), ZX_INFO_PROCESS, - core::ptr::addr_of_mut!(proc_info) as *mut libc::c_void, + (&raw mut proc_info) as *mut libc::c_void, mem::size_of::<zx_info_process_t>(), &mut actual, &mut avail, @@ -215,7 +215,7 @@ impl Process { zx_cvt(zx_object_get_info( self.handle.raw(), ZX_INFO_PROCESS, - core::ptr::addr_of_mut!(proc_info) as *mut libc::c_void, + (&raw mut proc_info) as *mut libc::c_void, mem::size_of::<zx_info_process_t>(), &mut actual, &mut avail, diff --git a/library/std/src/sys/pal/unix/process/process_unix.rs b/library/std/src/sys/pal/unix/process/process_unix.rs index d812aa0e02c..5d30f388da1 100644 --- a/library/std/src/sys/pal/unix/process/process_unix.rs +++ b/library/std/src/sys/pal/unix/process/process_unix.rs @@ -788,15 +788,15 @@ impl Command { let mut iov = [IoSlice::new(b"")]; let mut msg: libc::msghdr = mem::zeroed(); - msg.msg_iov = core::ptr::addr_of_mut!(iov) as *mut _; + msg.msg_iov = (&raw mut iov) as *mut _; msg.msg_iovlen = 1; // only attach cmsg if we successfully acquired the pidfd if pidfd >= 0 { msg.msg_controllen = mem::size_of_val(&cmsg.buf) as _; - msg.msg_control = core::ptr::addr_of_mut!(cmsg.buf) as *mut _; + msg.msg_control = (&raw mut cmsg.buf) as *mut _; - let hdr = CMSG_FIRSTHDR(core::ptr::addr_of_mut!(msg) as *mut _); + let hdr = CMSG_FIRSTHDR((&raw mut msg) as *mut _); (*hdr).cmsg_level = SOL_SOCKET; (*hdr).cmsg_type = SCM_RIGHTS; (*hdr).cmsg_len = CMSG_LEN(SCM_MSG_LEN as _) as _; @@ -838,17 +838,17 @@ impl Command { let mut msg: libc::msghdr = mem::zeroed(); - msg.msg_iov = core::ptr::addr_of_mut!(iov) as *mut _; + msg.msg_iov = (&raw mut iov) as *mut _; msg.msg_iovlen = 1; msg.msg_controllen = mem::size_of::<Cmsg>() as _; - msg.msg_control = core::ptr::addr_of_mut!(cmsg) as *mut _; + msg.msg_control = (&raw mut cmsg) as *mut _; match cvt_r(|| libc::recvmsg(sock.as_raw(), &mut msg, libc::MSG_CMSG_CLOEXEC)) { Err(_) => return -1, Ok(_) => {} } - let hdr = CMSG_FIRSTHDR(core::ptr::addr_of_mut!(msg) as *mut _); + let hdr = CMSG_FIRSTHDR((&raw mut msg) as *mut _); if hdr.is_null() || (*hdr).cmsg_level != SOL_SOCKET || (*hdr).cmsg_type != SCM_RIGHTS diff --git a/library/std/src/sys/pal/unix/stack_overflow.rs b/library/std/src/sys/pal/unix/stack_overflow.rs index e0a0d0973c6..ac0858e1de8 100644 --- a/library/std/src/sys/pal/unix/stack_overflow.rs +++ b/library/std/src/sys/pal/unix/stack_overflow.rs @@ -426,8 +426,8 @@ mod imp { match sysctlbyname.get() { Some(fcn) if unsafe { fcn(oid.as_ptr(), - ptr::addr_of_mut!(guard).cast(), - ptr::addr_of_mut!(size), + (&raw mut guard).cast(), + &raw mut size, ptr::null_mut(), 0) == 0 } => guard, diff --git a/library/std/src/sys/pal/unix/thread.rs b/library/std/src/sys/pal/unix/thread.rs index cb6133274d9..2f2d6e6add3 100644 --- a/library/std/src/sys/pal/unix/thread.rs +++ b/library/std/src/sys/pal/unix/thread.rs @@ -258,7 +258,7 @@ impl Thread { tv_nsec: nsecs, }; secs -= ts.tv_sec as u64; - let ts_ptr = core::ptr::addr_of_mut!(ts); + let ts_ptr = &raw mut ts; if libc::nanosleep(ts_ptr, ts_ptr) == -1 { assert_eq!(os::errno(), libc::EINTR); secs += ts.tv_sec as u64; @@ -447,8 +447,8 @@ pub fn available_parallelism() -> io::Result<NonZero<usize>> { libc::sysctl( mib.as_mut_ptr(), 2, - core::ptr::addr_of_mut!(cpus) as *mut _, - core::ptr::addr_of_mut!(cpus_size) as *mut _, + (&raw mut cpus) as *mut _, + (&raw mut cpus_size) as *mut _, ptr::null_mut(), 0, ) diff --git a/library/std/src/sys/pal/wasip2/mod.rs b/library/std/src/sys/pal/wasip2/mod.rs index 17b26543bd7..320712fdcc9 100644 --- a/library/std/src/sys/pal/wasip2/mod.rs +++ b/library/std/src/sys/pal/wasip2/mod.rs @@ -20,7 +20,6 @@ pub mod futex; #[path = "../wasi/io.rs"] pub mod io; -#[path = "../wasi/net.rs"] pub mod net; #[path = "../wasi/os.rs"] pub mod os; diff --git a/library/std/src/sys/pal/wasip2/net.rs b/library/std/src/sys/pal/wasip2/net.rs new file mode 100644 index 00000000000..c40eb229ba9 --- /dev/null +++ b/library/std/src/sys/pal/wasip2/net.rs @@ -0,0 +1,379 @@ +#![deny(unsafe_op_in_unsafe_fn)] + +use libc::{c_int, c_void, size_t}; + +use super::fd::WasiFd; +use crate::ffi::CStr; +use crate::io::{self, BorrowedBuf, BorrowedCursor, IoSlice, IoSliceMut}; +use crate::net::{Shutdown, SocketAddr}; +use crate::os::wasi::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; +use crate::sys::unsupported; +use crate::sys_common::net::{TcpListener, getsockopt, setsockopt, sockaddr_to_addr}; +use crate::sys_common::{AsInner, FromInner, IntoInner}; +use crate::time::{Duration, Instant}; +use crate::{cmp, mem, str}; + +pub extern crate libc as netc; + +#[allow(non_camel_case_types)] +pub type wrlen_t = size_t; + +#[doc(hidden)] +pub trait IsMinusOne { + fn is_minus_one(&self) -> bool; +} + +macro_rules! impl_is_minus_one { + ($($t:ident)*) => ($(impl IsMinusOne for $t { + fn is_minus_one(&self) -> bool { + *self == -1 + } + })*) +} + +impl_is_minus_one! { i8 i16 i32 i64 isize } + +pub fn cvt<T: IsMinusOne>(t: T) -> crate::io::Result<T> { + if t.is_minus_one() { Err(crate::io::Error::last_os_error()) } else { Ok(t) } +} + +pub fn cvt_r<T, F>(mut f: F) -> crate::io::Result<T> +where + T: IsMinusOne, + F: FnMut() -> T, +{ + loop { + match cvt(f()) { + Err(ref e) if e.is_interrupted() => {} + other => return other, + } + } +} + +pub fn cvt_gai(err: c_int) -> io::Result<()> { + if err == 0 { + return Ok(()); + } + + if err == netc::EAI_SYSTEM { + return Err(io::Error::last_os_error()); + } + + let detail = unsafe { + str::from_utf8(CStr::from_ptr(netc::gai_strerror(err)).to_bytes()).unwrap().to_owned() + }; + + Err(io::Error::new( + io::ErrorKind::Uncategorized, + &format!("failed to lookup address information: {detail}")[..], + )) +} + +pub fn init() {} + +pub struct Socket(WasiFd); + +impl Socket { + pub fn new(addr: &SocketAddr, ty: c_int) -> io::Result<Socket> { + let fam = match *addr { + SocketAddr::V4(..) => netc::AF_INET, + SocketAddr::V6(..) => netc::AF_INET6, + }; + Socket::new_raw(fam, ty) + } + + pub fn new_raw(fam: c_int, ty: c_int) -> io::Result<Socket> { + let fd = cvt(unsafe { netc::socket(fam, ty, 0) })?; + Ok(unsafe { Self::from_raw_fd(fd) }) + } + + pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> { + let (addr, len) = addr.into_inner(); + cvt_r(|| unsafe { netc::connect(self.as_raw_fd(), addr.as_ptr(), len) })?; + Ok(()) + } + + pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> { + self.set_nonblocking(true)?; + let r = self.connect(addr); + self.set_nonblocking(false)?; + + match r { + Ok(_) => return Ok(()), + // there's no ErrorKind for EINPROGRESS + Err(ref e) if e.raw_os_error() == Some(netc::EINPROGRESS) => {} + Err(e) => return Err(e), + } + + let mut pollfd = netc::pollfd { fd: self.as_raw_fd(), events: netc::POLLOUT, revents: 0 }; + + if timeout.as_secs() == 0 && timeout.subsec_nanos() == 0 { + return Err(io::Error::ZERO_TIMEOUT); + } + + let start = Instant::now(); + + loop { + let elapsed = start.elapsed(); + if elapsed >= timeout { + return Err(io::const_io_error!(io::ErrorKind::TimedOut, "connection timed out")); + } + + let timeout = timeout - elapsed; + let mut timeout = timeout + .as_secs() + .saturating_mul(1_000) + .saturating_add(timeout.subsec_nanos() as u64 / 1_000_000); + if timeout == 0 { + timeout = 1; + } + + let timeout = cmp::min(timeout, c_int::MAX as u64) as c_int; + + match unsafe { netc::poll(&mut pollfd, 1, timeout) } { + -1 => { + let err = io::Error::last_os_error(); + if !err.is_interrupted() { + return Err(err); + } + } + 0 => {} + _ => { + // WASI poll does not return POLLHUP or POLLERR in revents. Check if the + // connnection actually succeeded and return ok only when the socket is + // ready and no errors were found. + if let Some(e) = self.take_error()? { + return Err(e); + } + + return Ok(()); + } + } + } + } + + pub fn accept( + &self, + storage: *mut netc::sockaddr, + len: *mut netc::socklen_t, + ) -> io::Result<Socket> { + let fd = cvt_r(|| unsafe { netc::accept(self.as_raw_fd(), storage, len) })?; + Ok(unsafe { Self::from_raw_fd(fd) }) + } + + pub fn duplicate(&self) -> io::Result<Socket> { + unsupported() + } + + fn recv_with_flags(&self, mut buf: BorrowedCursor<'_>, flags: c_int) -> io::Result<()> { + let ret = cvt(unsafe { + netc::recv( + self.as_raw_fd(), + buf.as_mut().as_mut_ptr() as *mut c_void, + buf.capacity(), + flags, + ) + })?; + unsafe { + buf.advance_unchecked(ret as usize); + } + Ok(()) + } + + pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { + let mut buf = BorrowedBuf::from(buf); + self.recv_with_flags(buf.unfilled(), 0)?; + Ok(buf.len()) + } + + pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { + let mut buf = BorrowedBuf::from(buf); + self.recv_with_flags(buf.unfilled(), netc::MSG_PEEK)?; + Ok(buf.len()) + } + + pub fn read_buf(&self, buf: BorrowedCursor<'_>) -> io::Result<()> { + self.recv_with_flags(buf, 0) + } + + pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + io::default_read_vectored(|b| self.read(b), bufs) + } + + #[inline] + pub fn is_read_vectored(&self) -> bool { + false + } + + fn recv_from_with_flags( + &self, + buf: &mut [u8], + flags: c_int, + ) -> io::Result<(usize, SocketAddr)> { + let mut storage: netc::sockaddr_storage = unsafe { mem::zeroed() }; + let mut addrlen = mem::size_of_val(&storage) as netc::socklen_t; + + let n = cvt(unsafe { + netc::recvfrom( + self.as_raw_fd(), + buf.as_mut_ptr() as *mut c_void, + buf.len(), + flags, + core::ptr::addr_of_mut!(storage) as *mut _, + &mut addrlen, + ) + })?; + Ok((n as usize, sockaddr_to_addr(&storage, addrlen as usize)?)) + } + + pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.recv_from_with_flags(buf, 0) + } + + pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.recv_from_with_flags(buf, netc::MSG_PEEK) + } + + fn write(&self, buf: &[u8]) -> io::Result<usize> { + let len = cmp::min(buf.len(), <wrlen_t>::MAX as usize) as wrlen_t; + let ret = cvt(unsafe { + netc::send(self.as_raw(), buf.as_ptr() as *const c_void, len, netc::MSG_NOSIGNAL) + })?; + Ok(ret as usize) + } + + pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + io::default_write_vectored(|b| self.write(b), bufs) + } + + #[inline] + pub fn is_write_vectored(&self) -> bool { + false + } + + pub fn set_timeout(&self, dur: Option<Duration>, kind: c_int) -> io::Result<()> { + let timeout = match dur { + Some(dur) => { + if dur.as_secs() == 0 && dur.subsec_nanos() == 0 { + return Err(io::Error::ZERO_TIMEOUT); + } + + let secs = dur.as_secs().try_into().unwrap_or(netc::time_t::MAX); + let mut timeout = netc::timeval { + tv_sec: secs, + tv_usec: dur.subsec_micros() as netc::suseconds_t, + }; + if timeout.tv_sec == 0 && timeout.tv_usec == 0 { + timeout.tv_usec = 1; + } + timeout + } + None => netc::timeval { tv_sec: 0, tv_usec: 0 }, + }; + setsockopt(self, netc::SOL_SOCKET, kind, timeout) + } + + pub fn timeout(&self, kind: c_int) -> io::Result<Option<Duration>> { + let raw: netc::timeval = getsockopt(self, netc::SOL_SOCKET, kind)?; + if raw.tv_sec == 0 && raw.tv_usec == 0 { + Ok(None) + } else { + let sec = raw.tv_sec as u64; + let nsec = (raw.tv_usec as u32) * 1000; + Ok(Some(Duration::new(sec, nsec))) + } + } + + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + let how = match how { + Shutdown::Write => netc::SHUT_WR, + Shutdown::Read => netc::SHUT_RD, + Shutdown::Both => netc::SHUT_RDWR, + }; + cvt(unsafe { netc::shutdown(self.as_raw_fd(), how) })?; + Ok(()) + } + + pub fn set_linger(&self, _linger: Option<Duration>) -> io::Result<()> { + unsupported() + } + + pub fn linger(&self) -> io::Result<Option<Duration>> { + unsupported() + } + + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + setsockopt(self, netc::IPPROTO_TCP, netc::TCP_NODELAY, nodelay as c_int) + } + + pub fn nodelay(&self) -> io::Result<bool> { + let raw: c_int = getsockopt(self, netc::IPPROTO_TCP, netc::TCP_NODELAY)?; + Ok(raw != 0) + } + + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + let mut nonblocking = nonblocking as c_int; + cvt(unsafe { netc::ioctl(self.as_raw_fd(), netc::FIONBIO, &mut nonblocking) }).map(drop) + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + let raw: c_int = getsockopt(self, netc::SOL_SOCKET, netc::SO_ERROR)?; + if raw == 0 { Ok(None) } else { Ok(Some(io::Error::from_raw_os_error(raw as i32))) } + } + + // This is used by sys_common code to abstract over Windows and Unix. + pub fn as_raw(&self) -> RawFd { + self.as_raw_fd() + } +} + +impl AsInner<WasiFd> for Socket { + #[inline] + fn as_inner(&self) -> &WasiFd { + &self.0 + } +} + +impl IntoInner<WasiFd> for Socket { + fn into_inner(self) -> WasiFd { + self.0 + } +} + +impl FromInner<WasiFd> for Socket { + fn from_inner(inner: WasiFd) -> Socket { + Socket(inner) + } +} + +impl AsFd for Socket { + fn as_fd(&self) -> BorrowedFd<'_> { + self.0.as_fd() + } +} + +impl AsRawFd for Socket { + #[inline] + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +impl IntoRawFd for Socket { + fn into_raw_fd(self) -> RawFd { + self.0.into_raw_fd() + } +} + +impl FromRawFd for Socket { + unsafe fn from_raw_fd(raw_fd: RawFd) -> Self { + unsafe { Self(FromRawFd::from_raw_fd(raw_fd)) } + } +} + +impl AsInner<Socket> for TcpListener { + #[inline] + fn as_inner(&self) -> &Socket { + &self.socket() + } +} diff --git a/library/std/src/sys/pal/windows/api.rs b/library/std/src/sys/pal/windows/api.rs index 9e336ff2d47..ebe207fde93 100644 --- a/library/std/src/sys/pal/windows/api.rs +++ b/library/std/src/sys/pal/windows/api.rs @@ -30,7 +30,6 @@ //! should go in sys/pal/windows/mod.rs rather than here. See `IoResult` as an example. use core::ffi::c_void; -use core::ptr::addr_of; use super::c; @@ -186,7 +185,7 @@ unsafe trait SizedSetFileInformation: Sized { unsafe impl<T: SizedSetFileInformation> SetFileInformation for T { const CLASS: i32 = T::CLASS; fn as_ptr(&self) -> *const c_void { - addr_of!(*self).cast::<c_void>() + (&raw const *self).cast::<c_void>() } fn size(&self) -> u32 { win32_size_of::<Self>() diff --git a/library/std/src/sys/pal/windows/fs.rs b/library/std/src/sys/pal/windows/fs.rs index be26356bb40..aab471e28ea 100644 --- a/library/std/src/sys/pal/windows/fs.rs +++ b/library/std/src/sys/pal/windows/fs.rs @@ -1,5 +1,3 @@ -use core::ptr::addr_of; - use super::api::{self, WinError}; use super::{IoResult, to_u16s}; use crate::borrow::Cow; @@ -325,7 +323,7 @@ impl File { let result = c::SetFileInformationByHandle( handle.as_raw_handle(), c::FileEndOfFileInfo, - ptr::addr_of!(eof).cast::<c_void>(), + (&raw const eof).cast::<c_void>(), mem::size_of::<c::FILE_END_OF_FILE_INFO>() as u32, ); if result == 0 { @@ -364,7 +362,7 @@ impl File { cvt(c::GetFileInformationByHandleEx( self.handle.as_raw_handle(), c::FileAttributeTagInfo, - ptr::addr_of_mut!(attr_tag).cast(), + (&raw mut attr_tag).cast(), mem::size_of::<c::FILE_ATTRIBUTE_TAG_INFO>().try_into().unwrap(), ))?; if attr_tag.FileAttributes & c::FILE_ATTRIBUTE_REPARSE_POINT != 0 { @@ -396,7 +394,7 @@ impl File { cvt(c::GetFileInformationByHandleEx( self.handle.as_raw_handle(), c::FileBasicInfo, - core::ptr::addr_of_mut!(info) as *mut c_void, + (&raw mut info) as *mut c_void, size as u32, ))?; let mut attr = FileAttr { @@ -428,7 +426,7 @@ impl File { cvt(c::GetFileInformationByHandleEx( self.handle.as_raw_handle(), c::FileStandardInfo, - core::ptr::addr_of_mut!(info) as *mut c_void, + (&raw mut info) as *mut c_void, size as u32, ))?; attr.file_size = info.AllocationSize as u64; @@ -438,7 +436,7 @@ impl File { cvt(c::GetFileInformationByHandleEx( self.handle.as_raw_handle(), c::FileAttributeTagInfo, - ptr::addr_of_mut!(attr_tag).cast(), + (&raw mut attr_tag).cast(), mem::size_of::<c::FILE_ATTRIBUTE_TAG_INFO>().try_into().unwrap(), ))?; if attr_tag.FileAttributes & c::FILE_ATTRIBUTE_REPARSE_POINT != 0 { @@ -545,22 +543,20 @@ impl File { unsafe { let (path_buffer, subst_off, subst_len, relative) = match (*buf).ReparseTag { c::IO_REPARSE_TAG_SYMLINK => { - let info: *mut c::SYMBOLIC_LINK_REPARSE_BUFFER = - ptr::addr_of_mut!((*buf).rest).cast(); + let info: *mut c::SYMBOLIC_LINK_REPARSE_BUFFER = (&raw mut (*buf).rest).cast(); assert!(info.is_aligned()); ( - ptr::addr_of_mut!((*info).PathBuffer).cast::<u16>(), + (&raw mut (*info).PathBuffer).cast::<u16>(), (*info).SubstituteNameOffset / 2, (*info).SubstituteNameLength / 2, (*info).Flags & c::SYMLINK_FLAG_RELATIVE != 0, ) } c::IO_REPARSE_TAG_MOUNT_POINT => { - let info: *mut c::MOUNT_POINT_REPARSE_BUFFER = - ptr::addr_of_mut!((*buf).rest).cast(); + let info: *mut c::MOUNT_POINT_REPARSE_BUFFER = (&raw mut (*buf).rest).cast(); assert!(info.is_aligned()); ( - ptr::addr_of_mut!((*info).PathBuffer).cast::<u16>(), + (&raw mut (*info).PathBuffer).cast::<u16>(), (*info).SubstituteNameOffset / 2, (*info).SubstituteNameLength / 2, false, @@ -643,7 +639,7 @@ impl File { cvt(c::GetFileInformationByHandleEx( self.handle.as_raw_handle(), c::FileBasicInfo, - core::ptr::addr_of_mut!(info) as *mut c_void, + (&raw mut info) as *mut c_void, size as u32, ))?; Ok(info) @@ -790,11 +786,11 @@ impl<'a> Iterator for DirBuffIter<'a> { // it does not seem that reality is so kind, and assuming this // caused crashes in some cases (https://github.com/rust-lang/rust/issues/104530) // presumably, this can be blamed on buggy filesystem drivers, but who knows. - let next_entry = ptr::addr_of!((*info).NextEntryOffset).read_unaligned() as usize; - let length = ptr::addr_of!((*info).FileNameLength).read_unaligned() as usize; - let attrs = ptr::addr_of!((*info).FileAttributes).read_unaligned(); + let next_entry = (&raw const (*info).NextEntryOffset).read_unaligned() as usize; + let length = (&raw const (*info).FileNameLength).read_unaligned() as usize; + let attrs = (&raw const (*info).FileAttributes).read_unaligned(); let name = from_maybe_unaligned( - ptr::addr_of!((*info).FileName).cast::<u16>(), + (&raw const (*info).FileName).cast::<u16>(), length / size_of::<u16>(), ); let is_directory = (attrs & c::FILE_ATTRIBUTE_DIRECTORY) != 0; @@ -1326,7 +1322,7 @@ pub fn copy(from: &Path, to: &Path) -> io::Result<u64> { pfrom.as_ptr(), pto.as_ptr(), Some(callback), - core::ptr::addr_of_mut!(size) as *mut _, + (&raw mut size) as *mut _, ptr::null_mut(), 0, ) @@ -1405,7 +1401,7 @@ pub fn junction_point(original: &Path, link: &Path) -> io::Result<()> { cvt(c::DeviceIoControl( d.as_raw_handle(), c::FSCTL_SET_REPARSE_POINT, - addr_of!(header).cast::<c_void>(), + (&raw const header).cast::<c_void>(), data_len as u32 + 8, ptr::null_mut(), 0, diff --git a/library/std/src/sys/pal/windows/futex.rs b/library/std/src/sys/pal/windows/futex.rs index f16a9f534a3..4d6c4df9a5a 100644 --- a/library/std/src/sys/pal/windows/futex.rs +++ b/library/std/src/sys/pal/windows/futex.rs @@ -57,7 +57,7 @@ pub fn wait_on_address<W: Waitable>( unsafe { let addr = ptr::from_ref(address).cast::<c_void>(); let size = mem::size_of::<W>(); - let compare_addr = ptr::addr_of!(compare).cast::<c_void>(); + let compare_addr = (&raw const compare).cast::<c_void>(); let timeout = timeout.map(dur2timeout).unwrap_or(c::INFINITE); c::WaitOnAddress(addr, compare_addr, size, timeout) == c::TRUE } diff --git a/library/std/src/sys/pal/windows/io.rs b/library/std/src/sys/pal/windows/io.rs index 785a3f6768b..1e7d02908f6 100644 --- a/library/std/src/sys/pal/windows/io.rs +++ b/library/std/src/sys/pal/windows/io.rs @@ -122,7 +122,7 @@ fn msys_tty_on(handle: BorrowedHandle<'_>) -> bool { c::GetFileInformationByHandleEx( handle.as_raw_handle(), c::FileNameInfo, - core::ptr::addr_of_mut!(name_info) as *mut c_void, + (&raw mut name_info) as *mut c_void, size_of::<FILE_NAME_INFO>() as u32, ) }; diff --git a/library/std/src/sys/pal/windows/net.rs b/library/std/src/sys/pal/windows/net.rs index 61a4504cf65..fd62d1f407c 100644 --- a/library/std/src/sys/pal/windows/net.rs +++ b/library/std/src/sys/pal/windows/net.rs @@ -390,7 +390,7 @@ impl Socket { buf.as_mut_ptr() as *mut _, length, flags, - core::ptr::addr_of_mut!(storage) as *mut _, + (&raw mut storage) as *mut _, &mut addrlen, ) }; diff --git a/library/std/src/sys/pal/windows/pipe.rs b/library/std/src/sys/pal/windows/pipe.rs index d8200ef9ca4..a8f6617c9dc 100644 --- a/library/std/src/sys/pal/windows/pipe.rs +++ b/library/std/src/sys/pal/windows/pipe.rs @@ -375,7 +375,7 @@ impl AnonPipe { let mut overlapped: c::OVERLAPPED = unsafe { crate::mem::zeroed() }; // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`. // Therefore the documentation suggests using it to smuggle a pointer to the callback. - overlapped.hEvent = core::ptr::addr_of_mut!(async_result) as *mut _; + overlapped.hEvent = (&raw mut async_result) as *mut _; // Asynchronous read of the pipe. // If successful, `callback` will be called once it completes. diff --git a/library/std/src/sys/pal/windows/process.rs b/library/std/src/sys/pal/windows/process.rs index 93a6c45ce30..95b51e704f9 100644 --- a/library/std/src/sys/pal/windows/process.rs +++ b/library/std/src/sys/pal/windows/process.rs @@ -368,10 +368,10 @@ impl Command { StartupInfo: si, lpAttributeList: proc_thread_attribute_list.0.as_mut_ptr() as _, }; - si_ptr = core::ptr::addr_of_mut!(si_ex) as _; + si_ptr = (&raw mut si_ex) as _; } else { si.cb = mem::size_of::<c::STARTUPINFOW>() as u32; - si_ptr = core::ptr::addr_of_mut!(si) as _; + si_ptr = (&raw mut si) as _; } unsafe { @@ -953,7 +953,7 @@ fn make_proc_thread_attribute_list( // It's theoretically possible for the attribute count to exceed a u32 value. // Therefore, we ensure that we don't add more attributes than the buffer was initialized for. for (&attribute, value) in attributes.iter().take(attribute_count as usize) { - let value_ptr = core::ptr::addr_of!(*value.data) as _; + let value_ptr = (&raw const *value.data) as _; cvt(unsafe { c::UpdateProcThreadAttribute( proc_thread_attribute_list.0.as_mut_ptr() as _, diff --git a/library/std/src/sys/sync/thread_parking/pthread.rs b/library/std/src/sys/sync/thread_parking/pthread.rs index c64600e9e29..5f195d0bb0c 100644 --- a/library/std/src/sys/sync/thread_parking/pthread.rs +++ b/library/std/src/sys/sync/thread_parking/pthread.rs @@ -3,7 +3,6 @@ use crate::cell::UnsafeCell; use crate::marker::PhantomPinned; use crate::pin::Pin; -use crate::ptr::addr_of_mut; use crate::sync::atomic::AtomicUsize; use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; #[cfg(not(target_os = "nto"))] @@ -101,8 +100,8 @@ impl Parker { // This could lead to undefined behaviour when deadlocking. This is avoided // by not deadlocking. Note in particular the unlocking operation before any // panic, as code after the panic could try to park again. - addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY)); - addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER)); + (&raw mut (*parker).state).write(AtomicUsize::new(EMPTY)); + (&raw mut (*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER)); cfg_if::cfg_if! { if #[cfg(any( @@ -112,9 +111,9 @@ impl Parker { target_os = "vita", target_vendor = "apple", ))] { - addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER)); + (&raw mut (*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER)); } else if #[cfg(any(target_os = "espidf", target_os = "horizon"))] { - let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null()); + let r = libc::pthread_cond_init((&raw mut (*parker).cvar).cast(), crate::ptr::null()); assert_eq!(r, 0); } else { use crate::mem::MaybeUninit; @@ -123,7 +122,7 @@ impl Parker { assert_eq!(r, 0); let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC); assert_eq!(r, 0); - let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr()); + let r = libc::pthread_cond_init((&raw mut (*parker).cvar).cast(), attr.as_ptr()); assert_eq!(r, 0); let r = libc::pthread_condattr_destroy(attr.as_mut_ptr()); assert_eq!(r, 0); diff --git a/library/std/src/sys/sync/thread_parking/windows7.rs b/library/std/src/sys/sync/thread_parking/windows7.rs index cdd59757fe2..8f7e66c46ef 100644 --- a/library/std/src/sys/sync/thread_parking/windows7.rs +++ b/library/std/src/sys/sync/thread_parking/windows7.rs @@ -178,7 +178,7 @@ impl Parker { } fn ptr(&self) -> *const c_void { - core::ptr::addr_of!(self.state).cast::<c_void>() + (&raw const self.state).cast::<c_void>() } } diff --git a/library/std/src/sys/thread_local/destructors/linux_like.rs b/library/std/src/sys/thread_local/destructors/linux_like.rs index c381be0bf8c..f473dc4d79d 100644 --- a/library/std/src/sys/thread_local/destructors/linux_like.rs +++ b/library/std/src/sys/thread_local/destructors/linux_like.rs @@ -47,7 +47,7 @@ pub unsafe fn register(t: *mut u8, dtor: unsafe extern "C" fn(*mut u8)) { dtor, ), t.cast(), - core::ptr::addr_of!(__dso_handle) as *mut _, + (&raw const __dso_handle) as *mut _, ); } } else { diff --git a/library/std/src/sys_common/io.rs b/library/std/src/sys_common/io.rs index e386c955f37..6f6f282d432 100644 --- a/library/std/src/sys_common/io.rs +++ b/library/std/src/sys_common/io.rs @@ -3,7 +3,7 @@ pub const DEFAULT_BUF_SIZE: usize = if cfg!(target_os = "espidf") { 512 } else { 8 * 1024 }; #[cfg(test)] -#[allow(dead_code)] // not used on emscripten +#[allow(dead_code)] // not used on emscripten and wasi pub mod test { use rand::RngCore; diff --git a/library/std/src/sys_common/mod.rs b/library/std/src/sys_common/mod.rs index 1c884f107be..aa27886ff6f 100644 --- a/library/std/src/sys_common/mod.rs +++ b/library/std/src/sys_common/mod.rs @@ -32,7 +32,8 @@ cfg_if::cfg_if! { all(unix, not(target_os = "l4re")), windows, target_os = "hermit", - target_os = "solid_asp3" + target_os = "solid_asp3", + all(target_os = "wasi", target_env = "p2") ))] { pub mod net; } else { diff --git a/library/std/src/sys_common/net.rs b/library/std/src/sys_common/net.rs index 57f07d05cae..5a0ad907581 100644 --- a/library/std/src/sys_common/net.rs +++ b/library/std/src/sys_common/net.rs @@ -74,7 +74,7 @@ pub fn setsockopt<T>( sock.as_raw(), level, option_name, - core::ptr::addr_of!(option_value) as *const _, + (&raw const option_value) as *const _, mem::size_of::<T>() as c::socklen_t, ))?; Ok(()) @@ -89,7 +89,7 @@ pub fn getsockopt<T: Copy>(sock: &Socket, level: c_int, option_name: c_int) -> i sock.as_raw(), level, option_name, - core::ptr::addr_of_mut!(option_value) as *mut _, + (&raw mut option_value) as *mut _, &mut option_len, ))?; Ok(option_value) @@ -103,7 +103,7 @@ where unsafe { let mut storage: c::sockaddr_storage = mem::zeroed(); let mut len = mem::size_of_val(&storage) as c::socklen_t; - cvt(f(core::ptr::addr_of_mut!(storage) as *mut _, &mut len))?; + cvt(f((&raw mut storage) as *mut _, &mut len))?; sockaddr_to_addr(&storage, len as usize) } } @@ -452,7 +452,7 @@ impl TcpListener { pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { let mut storage: c::sockaddr_storage = unsafe { mem::zeroed() }; let mut len = mem::size_of_val(&storage) as c::socklen_t; - let sock = self.inner.accept(core::ptr::addr_of_mut!(storage) as *mut _, &mut len)?; + let sock = self.inner.accept((&raw mut storage) as *mut _, &mut len)?; let addr = sockaddr_to_addr(&storage, len as usize)?; Ok((TcpStream { inner: sock }, addr)) } diff --git a/library/std/src/sys_common/wtf8.rs b/library/std/src/sys_common/wtf8.rs index 554e07c1e59..19d4c94f450 100644 --- a/library/std/src/sys_common/wtf8.rs +++ b/library/std/src/sys_common/wtf8.rs @@ -26,7 +26,6 @@ use crate::borrow::Cow; use crate::collections::TryReserveError; use crate::hash::{Hash, Hasher}; use crate::iter::FusedIterator; -use crate::ptr::addr_of_mut; use crate::rc::Rc; use crate::sync::Arc; use crate::sys_common::AsInner; @@ -1055,6 +1054,6 @@ unsafe impl CloneToUninit for Wtf8 { #[cfg_attr(debug_assertions, track_caller)] unsafe fn clone_to_uninit(&self, dst: *mut Self) { // SAFETY: we're just a wrapper around [u8] - unsafe { self.bytes.clone_to_uninit(addr_of_mut!((*dst).bytes)) } + unsafe { self.bytes.clone_to_uninit(&raw mut (*dst).bytes) } } } diff --git a/library/std/src/thread/local.rs b/library/std/src/thread/local.rs index f147c5fdcd1..88bf186700f 100644 --- a/library/std/src/thread/local.rs +++ b/library/std/src/thread/local.rs @@ -2,7 +2,7 @@ #![unstable(feature = "thread_local_internals", issue = "none")] -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod tests; #[cfg(test)] diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs index a53e3565dfe..22d65583365 100644 --- a/library/std/src/thread/mod.rs +++ b/library/std/src/thread/mod.rs @@ -155,7 +155,7 @@ // Under `test`, `__FastLocalKeyInner` seems unused. #![cfg_attr(test, allow(dead_code))] -#[cfg(all(test, not(target_os = "emscripten")))] +#[cfg(all(test, not(any(target_os = "emscripten", target_os = "wasi"))))] mod tests; use crate::any::Any; @@ -165,7 +165,6 @@ use crate::marker::PhantomData; use crate::mem::{self, ManuallyDrop, forget}; use crate::num::NonZero; use crate::pin::Pin; -use crate::ptr::addr_of_mut; use crate::sync::Arc; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sys::sync::Parker; @@ -665,6 +664,19 @@ impl Builder { /// println!("{result}"); /// ``` /// +/// # Notes +/// +/// This function has the same minimal guarantee regarding "foreign" unwinding operations (e.g. +/// an exception thrown from C++ code, or a `panic!` in Rust code compiled or linked with a +/// different runtime) as [`catch_unwind`]; namely, if the thread created with `thread::spawn` +/// unwinds all the way to the root with such an exception, one of two behaviors are possible, +/// and it is unspecified which will occur: +/// +/// * The process aborts. +/// * The process does not abort, and [`join`] will return a `Result::Err` +/// containing an opaque type. +/// +/// [`catch_unwind`]: ../../std/panic/fn.catch_unwind.html /// [`channels`]: crate::sync::mpsc /// [`join`]: JoinHandle::join /// [`Err`]: crate::result::Result::Err @@ -1386,9 +1398,9 @@ impl Thread { let inner = unsafe { let mut arc = Arc::<Inner>::new_uninit(); let ptr = Arc::get_mut_unchecked(&mut arc).as_mut_ptr(); - addr_of_mut!((*ptr).name).write(name); - addr_of_mut!((*ptr).id).write(ThreadId::new()); - Parker::new_in_place(addr_of_mut!((*ptr).parker)); + (&raw mut (*ptr).name).write(name); + (&raw mut (*ptr).id).write(ThreadId::new()); + Parker::new_in_place(&raw mut (*ptr).parker); Pin::new_unchecked(arc.assume_init()) }; @@ -1785,7 +1797,7 @@ impl<T> JoinHandle<T> { /// operations that happen after `join` returns. /// /// If the associated thread panics, [`Err`] is returned with the parameter given - /// to [`panic!`]. + /// to [`panic!`] (though see the Notes below). /// /// [`Err`]: crate::result::Result::Err /// [atomic memory orderings]: crate::sync::atomic @@ -1807,6 +1819,18 @@ impl<T> JoinHandle<T> { /// }).unwrap(); /// join_handle.join().expect("Couldn't join on the associated thread"); /// ``` + /// + /// # Notes + /// + /// If a "foreign" unwinding operation (e.g. an exception thrown from C++ + /// code, or a `panic!` in Rust code compiled or linked with a different + /// runtime) unwinds all the way to the thread root, the process may be + /// aborted; see the Notes on [`thread::spawn`]. If the process is not + /// aborted, this function will return a `Result::Err` containing an opaque + /// type. + /// + /// [`catch_unwind`]: ../../std/panic/fn.catch_unwind.html + /// [`thread::spawn`]: spawn #[stable(feature = "rust1", since = "1.0.0")] pub fn join(self) -> Result<T> { self.0.join() |
