about summary refs log tree commit diff
path: root/library/std/src/sys
diff options
context:
space:
mode:
authorbors <bors@rust-lang.org>2022-04-29 03:06:45 +0000
committerbors <bors@rust-lang.org>2022-04-29 03:06:45 +0000
commitddb7fbe8434be481607ae199fe2aee976ee2fc2e (patch)
tree627e0d2da7fcaa67116ee3ec175754f900f1cae1 /library/std/src/sys
parentbaaa3b682986879c7784b5733ecea942e9ae7de3 (diff)
parent1e7c15634d3b81b595d669382e45e6e136c730e1 (diff)
downloadrust-ddb7fbe8434be481607ae199fe2aee976ee2fc2e.tar.gz
rust-ddb7fbe8434be481607ae199fe2aee976ee2fc2e.zip
Auto merge of #96441 - ChrisDenton:sync-pipes, r=m-ou-se
Windows: Make stdin pipes synchronous

Stdin pipes do not need to be used asynchronously within the standard library. This is a first step in making pipes mostly synchronous.

r? `@m-ou-se`
Diffstat (limited to 'library/std/src/sys')
-rw-r--r--library/std/src/sys/windows/c.rs6
-rw-r--r--library/std/src/sys/windows/handle.rs4
-rw-r--r--library/std/src/sys/windows/pipe.rs90
-rw-r--r--library/std/src/sys/windows/process.rs38
4 files changed, 107 insertions, 31 deletions
diff --git a/library/std/src/sys/windows/c.rs b/library/std/src/sys/windows/c.rs
index 5f14edaf067..0692da1d795 100644
--- a/library/std/src/sys/windows/c.rs
+++ b/library/std/src/sys/windows/c.rs
@@ -1022,6 +1022,12 @@ extern "system" {
         bWaitAll: BOOL,
         dwMilliseconds: DWORD,
     ) -> DWORD;
+    pub fn CreatePipe(
+        hReadPipe: *mut HANDLE,
+        hWritePipe: *mut HANDLE,
+        lpPipeAttributes: *const SECURITY_ATTRIBUTES,
+        nSize: DWORD,
+    ) -> BOOL;
     pub fn CreateNamedPipeW(
         lpName: LPCWSTR,
         dwOpenMode: DWORD,
diff --git a/library/std/src/sys/windows/handle.rs b/library/std/src/sys/windows/handle.rs
index ef9a8bd6900..3b609825a79 100644
--- a/library/std/src/sys/windows/handle.rs
+++ b/library/std/src/sys/windows/handle.rs
@@ -221,6 +221,10 @@ impl Handle {
         Ok(Self(self.0.duplicate(access, inherit, options)?))
     }
 
+    pub(crate) fn set_inheritable(&self) -> io::Result<()> {
+        self.0.set_inheritable()
+    }
+
     /// Performs a synchronous read.
     ///
     /// If the handle is opened for asynchronous I/O then this abort the process.
diff --git a/library/std/src/sys/windows/pipe.rs b/library/std/src/sys/windows/pipe.rs
index 013c776c476..928bf2439c3 100644
--- a/library/std/src/sys/windows/pipe.rs
+++ b/library/std/src/sys/windows/pipe.rs
@@ -18,13 +18,20 @@ use crate::sys_common::IntoInner;
 // Anonymous pipes
 ////////////////////////////////////////////////////////////////////////////////
 
-pub struct AnonPipe {
-    inner: Handle,
+// A 64kb pipe capacity is the same as a typical Linux default.
+const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
+
+pub enum AnonPipe {
+    Sync(Handle),
+    Async(Handle),
 }
 
 impl IntoInner<Handle> for AnonPipe {
     fn into_inner(self) -> Handle {
-        self.inner
+        match self {
+            Self::Sync(handle) => handle,
+            Self::Async(handle) => handle,
+        }
     }
 }
 
@@ -32,6 +39,35 @@ pub struct Pipes {
     pub ours: AnonPipe,
     pub theirs: AnonPipe,
 }
+impl Pipes {
+    /// Create a new pair of pipes where both pipes are synchronous.
+    ///
+    /// These must not be used asynchronously.
+    pub fn new_synchronous(
+        ours_readable: bool,
+        their_handle_inheritable: bool,
+    ) -> io::Result<Self> {
+        unsafe {
+            // If `CreatePipe` succeeds, these will be our pipes.
+            let mut read = ptr::null_mut();
+            let mut write = ptr::null_mut();
+
+            if c::CreatePipe(&mut read, &mut write, ptr::null(), PIPE_BUFFER_CAPACITY) == 0 {
+                Err(io::Error::last_os_error())
+            } else {
+                let (ours, theirs) = if ours_readable { (read, write) } else { (write, read) };
+                let ours = Handle::from_raw_handle(ours);
+                let theirs = Handle::from_raw_handle(theirs);
+
+                if their_handle_inheritable {
+                    theirs.set_inheritable()?;
+                }
+
+                Ok(Pipes { ours: AnonPipe::Sync(ours), theirs: AnonPipe::Sync(theirs) })
+            }
+        }
+    }
+}
 
 /// Although this looks similar to `anon_pipe` in the Unix module it's actually
 /// subtly different. Here we'll return two pipes in the `Pipes` return value,
@@ -53,9 +89,6 @@ pub struct Pipes {
 /// with `OVERLAPPED` instances, but also works out ok if it's only ever used
 /// once at a time (which we do indeed guarantee).
 pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
-    // A 64kb pipe capacity is the same as a typical Linux default.
-    const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
-
     // Note that we specifically do *not* use `CreatePipe` here because
     // unfortunately the anonymous pipes returned do not support overlapped
     // operations. Instead, we create a "hopefully unique" name and create a
@@ -156,12 +189,9 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
         };
         opts.security_attributes(&mut sa);
         let theirs = File::open(Path::new(&name), &opts)?;
-        let theirs = AnonPipe { inner: theirs.into_inner() };
+        let theirs = AnonPipe::Sync(theirs.into_inner());
 
-        Ok(Pipes {
-            ours: AnonPipe { inner: ours },
-            theirs: AnonPipe { inner: theirs.into_inner() },
-        })
+        Ok(Pipes { ours: AnonPipe::Async(ours), theirs })
     }
 }
 
@@ -171,12 +201,12 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
 /// This is achieved by creating a new set of pipes and spawning a thread that
 /// relays messages between the source and the synchronous pipe.
 pub fn spawn_pipe_relay(
-    source: &AnonPipe,
+    source: &Handle,
     ours_readable: bool,
     their_handle_inheritable: bool,
 ) -> io::Result<AnonPipe> {
     // We need this handle to live for the lifetime of the thread spawned below.
-    let source = source.duplicate()?;
+    let source = AnonPipe::Async(source.duplicate(0, true, c::DUPLICATE_SAME_ACCESS)?);
 
     // create a new pair of anon pipes.
     let Pipes { theirs, ours } = anon_pipe(ours_readable, their_handle_inheritable)?;
@@ -227,19 +257,24 @@ type AlertableIoFn = unsafe extern "system" fn(
 
 impl AnonPipe {
     pub fn handle(&self) -> &Handle {
-        &self.inner
+        match self {
+            Self::Async(ref handle) => handle,
+            Self::Sync(ref handle) => handle,
+        }
     }
     pub fn into_handle(self) -> Handle {
-        self.inner
-    }
-    fn duplicate(&self) -> io::Result<Self> {
-        self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| AnonPipe { inner })
+        self.into_inner()
     }
 
     pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
         let result = unsafe {
             let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
-            self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
+            match self {
+                Self::Sync(ref handle) => handle.read(buf),
+                Self::Async(_) => {
+                    self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
+                }
+            }
         };
 
         match result {
@@ -253,28 +288,33 @@ impl AnonPipe {
     }
 
     pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
-        self.inner.read_vectored(bufs)
+        io::default_read_vectored(|buf| self.read(buf), bufs)
     }
 
     #[inline]
     pub fn is_read_vectored(&self) -> bool {
-        self.inner.is_read_vectored()
+        false
     }
 
     pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
         unsafe {
             let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
-            self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
+            match self {
+                Self::Sync(ref handle) => handle.write(buf),
+                Self::Async(_) => {
+                    self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
+                }
+            }
         }
     }
 
     pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
-        self.inner.write_vectored(bufs)
+        io::default_write_vectored(|buf| self.write(buf), bufs)
     }
 
     #[inline]
     pub fn is_write_vectored(&self) -> bool {
-        self.inner.is_write_vectored()
+        false
     }
 
     /// Synchronizes asynchronous reads or writes using our anonymous pipe.
@@ -346,7 +386,7 @@ impl AnonPipe {
 
         // Asynchronous read of the pipe.
         // If successful, `callback` will be called once it completes.
-        let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback);
+        let result = io(self.handle().as_handle(), buf, len, &mut overlapped, callback);
         if result == c::FALSE {
             // We can return here because the call failed.
             // After this we must not return until the I/O completes.
diff --git a/library/std/src/sys/windows/process.rs b/library/std/src/sys/windows/process.rs
index e6f01df2627..cc29d1a72fb 100644
--- a/library/std/src/sys/windows/process.rs
+++ b/library/std/src/sys/windows/process.rs
@@ -24,7 +24,7 @@ use crate::sys::cvt;
 use crate::sys::fs::{File, OpenOptions};
 use crate::sys::handle::Handle;
 use crate::sys::path;
-use crate::sys::pipe::{self, AnonPipe};
+use crate::sys::pipe::{self, AnonPipe, Pipes};
 use crate::sys::stdio;
 use crate::sys_common::mutex::StaticMutex;
 use crate::sys_common::process::{CommandEnv, CommandEnvs};
@@ -173,7 +173,7 @@ pub enum Stdio {
     Inherit,
     Null,
     MakePipe,
-    Pipe(AnonPipe),
+    AsyncPipe(Handle),
     Handle(Handle),
 }
 
@@ -527,13 +527,33 @@ impl Stdio {
             },
 
             Stdio::MakePipe => {
-                let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
-                let pipes = pipe::anon_pipe(ours_readable, true)?;
+                // Handles that are passed to a child process must be synchronous
+                // because they will be read synchronously (see #95759).
+                // Therefore we prefer to make both ends of a pipe synchronous
+                // just in case our end of the pipe is passed to another process.
+                //
+                // However, we may need to read from both the child's stdout and
+                // stderr simultaneously when waiting for output. This requires
+                // async reads so as to avoid blocking either pipe.
+                //
+                // The solution used here is to make handles synchronous
+                // except for our side of the stdout and sterr pipes.
+                // If our side of those pipes do end up being given to another
+                // process then we use a "pipe relay" to synchronize access
+                // (see `Stdio::AsyncPipe` below).
+                let pipes = if stdio_id == c::STD_INPUT_HANDLE {
+                    // For stdin both sides of the pipe are synchronous.
+                    Pipes::new_synchronous(false, true)?
+                } else {
+                    // For stdout/stderr our side of the pipe is async and their side is synchronous.
+                    pipe::anon_pipe(true, true)?
+                };
                 *pipe = Some(pipes.ours);
                 Ok(pipes.theirs.into_handle())
             }
 
-            Stdio::Pipe(ref source) => {
+            Stdio::AsyncPipe(ref source) => {
+                // We need to synchronize asynchronous pipes by using a pipe relay.
                 let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
                 pipe::spawn_pipe_relay(source, ours_readable, true).map(AnonPipe::into_handle)
             }
@@ -562,7 +582,13 @@ impl Stdio {
 
 impl From<AnonPipe> for Stdio {
     fn from(pipe: AnonPipe) -> Stdio {
-        Stdio::Pipe(pipe)
+        // Note that it's very important we don't give async handles to child processes.
+        // Therefore if the pipe is asynchronous we must have a way to turn it synchronous.
+        // See #95759.
+        match pipe {
+            AnonPipe::Sync(handle) => Stdio::Handle(handle),
+            AnonPipe::Async(handle) => Stdio::AsyncPipe(handle),
+        }
     }
 }