diff options
Diffstat (limited to 'library/std/src')
| -rw-r--r-- | library/std/src/io/copy.rs | 69 | ||||
| -rw-r--r-- | library/std/src/io/copy/tests.rs | 11 | ||||
| -rw-r--r-- | library/std/src/io/mod.rs | 118 | ||||
| -rw-r--r-- | library/std/src/lib.rs | 1 | ||||
| -rw-r--r-- | library/std/src/process.rs | 2 | ||||
| -rw-r--r-- | library/std/src/sync/once_lock.rs | 50 | ||||
| -rw-r--r-- | library/std/src/sys/xous/mod.rs | 1 | ||||
| -rw-r--r-- | library/std/src/sys/xous/thread_parking.rs | 94 |
8 files changed, 228 insertions, 118 deletions
diff --git a/library/std/src/io/copy.rs b/library/std/src/io/copy.rs index 4d51a719f6c..d49866345cb 100644 --- a/library/std/src/io/copy.rs +++ b/library/std/src/io/copy.rs @@ -1,7 +1,6 @@ use super::{BorrowedBuf, BufReader, BufWriter, Read, Result, Write, DEFAULT_BUF_SIZE}; use crate::alloc::Allocator; use crate::cmp; -use crate::cmp::min; use crate::collections::VecDeque; use crate::io::IoSlice; use crate::mem::MaybeUninit; @@ -256,79 +255,17 @@ impl<I: Write + ?Sized> BufferedWriterSpec for BufWriter<I> { } } -impl<A: Allocator> BufferedWriterSpec for Vec<u8, A> { +impl BufferedWriterSpec for Vec<u8> { fn buffer_size(&self) -> usize { cmp::max(DEFAULT_BUF_SIZE, self.capacity() - self.len()) } fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> { - let mut bytes = 0; - - // avoid inflating empty/small vecs before we have determined that there's anything to read - if self.capacity() < DEFAULT_BUF_SIZE { - let stack_read_limit = DEFAULT_BUF_SIZE as u64; - bytes = stack_buffer_copy(&mut reader.take(stack_read_limit), self)?; - // fewer bytes than requested -> EOF reached - if bytes < stack_read_limit { - return Ok(bytes); - } - } - - // don't immediately offer the vec's whole spare capacity, otherwise - // we might have to fully initialize it if the reader doesn't have a custom read_buf() impl - let mut max_read_size = DEFAULT_BUF_SIZE; - - loop { - self.reserve(DEFAULT_BUF_SIZE); - let mut initialized_spare_capacity = 0; - - loop { - let buf = self.spare_capacity_mut(); - let read_size = min(max_read_size, buf.len()); - let mut buf = BorrowedBuf::from(&mut buf[..read_size]); - // SAFETY: init is either 0 or the init_len from the previous iteration. - unsafe { - buf.set_init(initialized_spare_capacity); - } - match reader.read_buf(buf.unfilled()) { - Ok(()) => { - let bytes_read = buf.len(); - - // EOF - if bytes_read == 0 { - return Ok(bytes); - } - - // the reader is returning short reads but it doesn't call ensure_init() - if buf.init_len() < buf.capacity() { - max_read_size = usize::MAX; - } - // the reader hasn't returned short reads so far - if bytes_read == buf.capacity() { - max_read_size *= 2; - } - - initialized_spare_capacity = buf.init_len() - bytes_read; - bytes += bytes_read as u64; - // SAFETY: BorrowedBuf guarantees all of its filled bytes are init - // and the number of read bytes can't exceed the spare capacity since - // that's what the buffer is borrowing from. - unsafe { self.set_len(self.len() + bytes_read) }; - - // spare capacity full, reserve more - if self.len() == self.capacity() { - break; - } - } - Err(e) if e.is_interrupted() => continue, - Err(e) => return Err(e), - } - } - } + reader.read_to_end(self).map(|bytes| u64::try_from(bytes).expect("usize overflowed u64")) } } -fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>( +pub fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>( reader: &mut R, writer: &mut W, ) -> Result<u64> { diff --git a/library/std/src/io/copy/tests.rs b/library/std/src/io/copy/tests.rs index af137eaf856..a1f909a3c53 100644 --- a/library/std/src/io/copy/tests.rs +++ b/library/std/src/io/copy/tests.rs @@ -82,13 +82,16 @@ fn copy_specializes_bufreader() { #[test] fn copy_specializes_to_vec() { - let cap = 123456; - let mut source = ShortReader { cap, observed_buffer: 0, read_size: 1337 }; + let cap = DEFAULT_BUF_SIZE * 10; + let mut source = ShortReader { cap, observed_buffer: 0, read_size: DEFAULT_BUF_SIZE }; let mut sink = Vec::new(); - assert_eq!(cap as u64, io::copy(&mut source, &mut sink).unwrap()); + let copied = io::copy(&mut source, &mut sink).unwrap(); + assert_eq!(cap as u64, copied); + assert_eq!(sink.len() as u64, copied); assert!( source.observed_buffer > DEFAULT_BUF_SIZE, - "expected a large buffer to be provided to the reader" + "expected a large buffer to be provided to the reader, got {}", + source.observed_buffer ); } diff --git a/library/std/src/io/mod.rs b/library/std/src/io/mod.rs index c8507a956ff..e3aa973741f 100644 --- a/library/std/src/io/mod.rs +++ b/library/std/src/io/mod.rs @@ -397,12 +397,16 @@ where } } -// This uses an adaptive system to extend the vector when it fills. We want to -// avoid paying to allocate and zero a huge chunk of memory if the reader only -// has 4 bytes while still making large reads if the reader does have a ton -// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every -// time is 4,500 times (!) slower than a default reservation size of 32 if the -// reader has a very small amount of data to return. +// Here we must serve many masters with conflicting goals: +// +// - avoid allocating unless necessary +// - avoid overallocating if we know the exact size (#89165) +// - avoid passing large buffers to readers that always initialize the free capacity if they perform short reads (#23815, #23820) +// - pass large buffers to readers that do not initialize the spare capacity. this can amortize per-call overheads +// - and finally pass not-too-small and not-too-large buffers to Windows read APIs because they manage to suffer from both problems +// at the same time, i.e. small reads suffer from syscall overhead, all reads incur initialization cost +// proportional to buffer size (#110650) +// pub(crate) fn default_read_to_end<R: Read + ?Sized>( r: &mut R, buf: &mut Vec<u8>, @@ -412,20 +416,58 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>( let start_cap = buf.capacity(); // Optionally limit the maximum bytes read on each iteration. // This adds an arbitrary fiddle factor to allow for more data than we expect. - let max_read_size = - size_hint.and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE)); + let mut max_read_size = size_hint + .and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE)) + .unwrap_or(DEFAULT_BUF_SIZE); let mut initialized = 0; // Extra initialized bytes from previous loop iteration + + const PROBE_SIZE: usize = 32; + + fn small_probe_read<R: Read + ?Sized>(r: &mut R, buf: &mut Vec<u8>) -> Result<usize> { + let mut probe = [0u8; PROBE_SIZE]; + + loop { + match r.read(&mut probe) { + Ok(n) => { + buf.extend_from_slice(&probe[..n]); + return Ok(n); + } + Err(ref e) if e.is_interrupted() => continue, + Err(e) => return Err(e), + } + } + } + + // avoid inflating empty/small vecs before we have determined that there's anything to read + if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE { + let read = small_probe_read(r, buf)?; + + if read == 0 { + return Ok(0); + } + } + loop { + if buf.len() == buf.capacity() && buf.capacity() == start_cap { + // The buffer might be an exact fit. Let's read into a probe buffer + // and see if it returns `Ok(0)`. If so, we've avoided an + // unnecessary doubling of the capacity. But if not, append the + // probe buffer to the primary buffer and let its capacity grow. + let read = small_probe_read(r, buf)?; + + if read == 0 { + return Ok(buf.len() - start_len); + } + } + if buf.len() == buf.capacity() { - buf.reserve(32); // buf is full, need more space + buf.reserve(PROBE_SIZE); // buf is full, need more space } let mut spare = buf.spare_capacity_mut(); - if let Some(size) = max_read_size { - let len = cmp::min(spare.len(), size); - spare = &mut spare[..len] - } + let buf_len = cmp::min(spare.len(), max_read_size); + spare = &mut spare[..buf_len]; let mut read_buf: BorrowedBuf<'_> = spare.into(); // SAFETY: These bytes were initialized but not filled in the previous loop @@ -434,42 +476,44 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>( } let mut cursor = read_buf.unfilled(); - match r.read_buf(cursor.reborrow()) { - Ok(()) => {} - Err(e) if e.is_interrupted() => continue, - Err(e) => return Err(e), + loop { + match r.read_buf(cursor.reborrow()) { + Ok(()) => break, + Err(e) if e.is_interrupted() => continue, + Err(e) => return Err(e), + } } - if cursor.written() == 0 { + let unfilled_but_initialized = cursor.init_ref().len(); + let bytes_read = cursor.written(); + let was_fully_initialized = read_buf.init_len() == buf_len; + + if bytes_read == 0 { return Ok(buf.len() - start_len); } // store how much was initialized but not filled - initialized = cursor.init_ref().len(); + initialized = unfilled_but_initialized; // SAFETY: BorrowedBuf's invariants mean this much memory is initialized. unsafe { - let new_len = read_buf.filled().len() + buf.len(); + let new_len = bytes_read + buf.len(); buf.set_len(new_len); } - if buf.len() == buf.capacity() && buf.capacity() == start_cap { - // The buffer might be an exact fit. Let's read into a probe buffer - // and see if it returns `Ok(0)`. If so, we've avoided an - // unnecessary doubling of the capacity. But if not, append the - // probe buffer to the primary buffer and let its capacity grow. - let mut probe = [0u8; 32]; - - loop { - match r.read(&mut probe) { - Ok(0) => return Ok(buf.len() - start_len), - Ok(n) => { - buf.extend_from_slice(&probe[..n]); - break; - } - Err(ref e) if e.is_interrupted() => continue, - Err(e) => return Err(e), - } + // Use heuristics to determine the max read size if no initial size hint was provided + if size_hint.is_none() { + // The reader is returning short reads but it doesn't call ensure_init(). + // In that case we no longer need to restrict read sizes to avoid + // initialization costs. + if !was_fully_initialized { + max_read_size = usize::MAX; + } + + // we have passed a larger buffer than previously and the + // reader still hasn't returned a short read + if buf_len >= max_read_size && bytes_read == buf_len { + max_read_size = max_read_size.saturating_mul(2); } } } diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index d06012c14dc..8dc5b07ce10 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -336,6 +336,7 @@ #![feature(portable_simd)] #![feature(prelude_2024)] #![feature(ptr_as_uninit)] +#![feature(ptr_from_ref)] #![feature(raw_os_nonzero)] #![feature(round_ties_even)] #![feature(slice_internals)] diff --git a/library/std/src/process.rs b/library/std/src/process.rs index af6bef1a76e..6004ed51bd1 100644 --- a/library/std/src/process.rs +++ b/library/std/src/process.rs @@ -1108,7 +1108,7 @@ impl fmt::Debug for Command { /// /// The default format approximates a shell invocation of the program along with its /// arguments. It does not include most of the other command properties. The output is not guaranteed to work - /// (e.g. due to lack of shell-escaping or differences in path resolution) + /// (e.g. due to lack of shell-escaping or differences in path resolution). /// On some platforms you can use [the alternate syntax] to show more fields. /// /// Note that the debug implementation is platform-specific. diff --git a/library/std/src/sync/once_lock.rs b/library/std/src/sync/once_lock.rs index f4963090795..52a43913243 100644 --- a/library/std/src/sync/once_lock.rs +++ b/library/std/src/sync/once_lock.rs @@ -13,22 +13,54 @@ use crate::sync::Once; /// /// # Examples /// +/// Using `OnceCell` to store a function’s previously computed value (a.k.a. +/// ‘lazy static’ or ‘memoizing’): +/// +/// ``` +/// use std::collections::HashMap; +/// use std::sync::OnceLock; +/// +/// fn hash_map() -> &'static HashMap<u32, char> { +/// static HASHMAP: OnceLock<HashMap<u32, char>> = OnceLock::new(); +/// HASHMAP.get_or_init(|| { +/// let mut m = HashMap::new(); +/// m.insert(0, 'a'); +/// m.insert(1, 'b'); +/// m.insert(2, 'c'); +/// m +/// }) +/// } +/// +/// // The `HashMap` is built, stored in the `OnceLock`, and returned. +/// let _ = hash_map(); +/// +/// // The `HashMap` is retrieved from the `OnceLock` and returned. +/// let _ = hash_map(); +/// ``` +/// +/// Writing to a `OnceLock` from a separate thread: +/// /// ``` /// use std::sync::OnceLock; /// -/// static CELL: OnceLock<String> = OnceLock::new(); +/// static CELL: OnceLock<usize> = OnceLock::new(); +/// +/// // `OnceLock` has not been written to yet. /// assert!(CELL.get().is_none()); /// +/// // Spawn a thread and write to `OnceLock`. /// std::thread::spawn(|| { -/// let value: &String = CELL.get_or_init(|| { -/// "Hello, World!".to_string() -/// }); -/// assert_eq!(value, "Hello, World!"); -/// }).join().unwrap(); +/// let value = CELL.get_or_init(|| 12345); +/// assert_eq!(value, &12345); +/// }) +/// .join() +/// .unwrap(); /// -/// let value: Option<&String> = CELL.get(); -/// assert!(value.is_some()); -/// assert_eq!(value.unwrap().as_str(), "Hello, World!"); +/// // `OnceLock` now contains the value. +/// assert_eq!( +/// CELL.get(), +/// Some(&12345), +/// ); /// ``` #[stable(feature = "once_cell", since = "1.70.0")] pub struct OnceLock<T> { diff --git a/library/std/src/sys/xous/mod.rs b/library/std/src/sys/xous/mod.rs index 6d5c218d195..c2550dcfd83 100644 --- a/library/std/src/sys/xous/mod.rs +++ b/library/std/src/sys/xous/mod.rs @@ -28,7 +28,6 @@ pub mod process; pub mod stdio; pub mod thread; pub mod thread_local_key; -#[path = "../unsupported/thread_parking.rs"] pub mod thread_parking; pub mod time; diff --git a/library/std/src/sys/xous/thread_parking.rs b/library/std/src/sys/xous/thread_parking.rs new file mode 100644 index 00000000000..aa39c6d2718 --- /dev/null +++ b/library/std/src/sys/xous/thread_parking.rs @@ -0,0 +1,94 @@ +use crate::os::xous::ffi::{blocking_scalar, scalar}; +use crate::os::xous::services::{ticktimer_server, TicktimerScalar}; +use crate::pin::Pin; +use crate::ptr; +use crate::sync::atomic::{ + AtomicI8, + Ordering::{Acquire, Release}, +}; +use crate::time::Duration; + +const NOTIFIED: i8 = 1; +const EMPTY: i8 = 0; +const PARKED: i8 = -1; + +pub struct Parker { + state: AtomicI8, +} + +impl Parker { + pub unsafe fn new_in_place(parker: *mut Parker) { + unsafe { parker.write(Parker { state: AtomicI8::new(EMPTY) }) } + } + + fn index(&self) -> usize { + ptr::from_ref(self).addr() + } + + pub unsafe fn park(self: Pin<&Self>) { + // Change NOTIFIED to EMPTY and EMPTY to PARKED. + let state = self.state.fetch_sub(1, Acquire); + if state == NOTIFIED { + return; + } + + // The state was set to PARKED. Wait until the `unpark` wakes us up. + blocking_scalar( + ticktimer_server(), + TicktimerScalar::WaitForCondition(self.index(), 0).into(), + ) + .expect("failed to send WaitForCondition command"); + + self.state.swap(EMPTY, Acquire); + } + + pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) { + // Change NOTIFIED to EMPTY and EMPTY to PARKED. + let state = self.state.fetch_sub(1, Acquire); + if state == NOTIFIED { + return; + } + + // A value of zero indicates an indefinite wait. Clamp the number of + // milliseconds to the allowed range. + let millis = usize::max(timeout.as_millis().try_into().unwrap_or(usize::MAX), 1); + + let was_timeout = blocking_scalar( + ticktimer_server(), + TicktimerScalar::WaitForCondition(self.index(), millis).into(), + ) + .expect("failed to send WaitForCondition command")[0] + != 0; + + let state = self.state.swap(EMPTY, Acquire); + if was_timeout && state == NOTIFIED { + // The state was set to NOTIFIED after we returned from the wait + // but before we reset the state. Therefore, a wakeup is on its + // way, which we need to consume here. + // NOTICE: this is a priority hole. + blocking_scalar( + ticktimer_server(), + TicktimerScalar::WaitForCondition(self.index(), 0).into(), + ) + .expect("failed to send WaitForCondition command"); + } + } + + pub fn unpark(self: Pin<&Self>) { + let state = self.state.swap(NOTIFIED, Release); + if state == PARKED { + // The thread is parked, wake it up. + blocking_scalar( + ticktimer_server(), + TicktimerScalar::NotifyCondition(self.index(), 1).into(), + ) + .expect("failed to send NotifyCondition command"); + } + } +} + +impl Drop for Parker { + fn drop(&mut self) { + scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok(); + } +} |
