about summary refs log tree commit diff
diff options
context:
space:
mode:
authorRalf Jung <post@ralfj.de>2024-11-24 21:40:15 +0000
committerGitHub <noreply@github.com>2024-11-24 21:40:15 +0000
commitd12594570aa81a2156098a7c49a16968293afb5d (patch)
treed668a9b7d3556ca4a8578976f99a0b3a375dd91b
parent0761f07dc84d49290310d63e3650f7edb7465666 (diff)
parent440080407219f4ad413fcc4819323d0ba920c440 (diff)
downloadrust-d12594570aa81a2156098a7c49a16968293afb5d.tar.gz
rust-d12594570aa81a2156098a7c49a16968293afb5d.zip
Merge pull request #4037 from tiif/blockpipe
Refactor AnonSocket::read/write for blocking socketpair
-rw-r--r--src/tools/miri/src/shims/unix/unnamed_socket.rs124
1 files changed, 80 insertions, 44 deletions
diff --git a/src/tools/miri/src/shims/unix/unnamed_socket.rs b/src/tools/miri/src/shims/unix/unnamed_socket.rs
index 8ccce7c1986..36575f4b5fb 100644
--- a/src/tools/miri/src/shims/unix/unnamed_socket.rs
+++ b/src/tools/miri/src/shims/unix/unnamed_socket.rs
@@ -146,8 +146,7 @@ impl FileDescription for AnonSocket {
             // corresponding ErrorKind variant.
             throw_unsup_format!("reading from the write end of a pipe");
         };
-        let mut readbuf = readbuf.borrow_mut();
-        if readbuf.buf.is_empty() {
+        if readbuf.borrow().buf.is_empty() {
             if self.peer_fd().upgrade().is_none() {
                 // Socketpair with no peer and empty buffer.
                 // 0 bytes successfully read indicates end-of-file.
@@ -167,31 +166,8 @@ impl FileDescription for AnonSocket {
                 }
             }
         }
-
-        // Synchronize with all previous writes to this buffer.
-        // FIXME: this over-synchronizes; a more precise approach would be to
-        // only sync with the writes whose data we will read.
-        ecx.acquire_clock(&readbuf.clock);
-
-        // Do full read / partial read based on the space available.
-        // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
-        let actual_read_size = readbuf.buf.read(&mut bytes).unwrap();
-
-        // Need to drop before others can access the readbuf again.
-        drop(readbuf);
-
-        // A notification should be provided for the peer file description even when it can
-        // only write 1 byte. This implementation is not compliant with the actual Linux kernel
-        // implementation. For optimization reasons, the kernel will only mark the file description
-        // as "writable" when it can write more than a certain number of bytes. Since we
-        // don't know what that *certain number* is, we will provide a notification every time
-        // a read is successful. This might result in our epoll emulation providing more
-        // notifications than the real system.
-        if let Some(peer_fd) = self.peer_fd().upgrade() {
-            ecx.check_and_update_readiness(&peer_fd)?;
-        }
-
-        ecx.return_read_success(ptr, &bytes, actual_read_size, dest)
+        // TODO: We might need to decide what to do if peer_fd is closed when read is blocked.
+        anonsocket_read(self, self.peer_fd().upgrade(), &mut bytes, ptr, dest, ecx)
     }
 
     fn write<'tcx>(
@@ -221,9 +197,8 @@ impl FileDescription for AnonSocket {
             // corresponding ErrorKind variant.
             throw_unsup_format!("writing to the reading end of a pipe");
         };
-        let mut writebuf = writebuf.borrow_mut();
-        let data_size = writebuf.buf.len();
-        let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
+        let available_space =
+            MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
         if available_space == 0 {
             if self.is_nonblock {
                 // Non-blocking socketpair with a full buffer.
@@ -233,24 +208,85 @@ impl FileDescription for AnonSocket {
                 throw_unsup_format!("socketpair/pipe/pipe2 write: blocking isn't supported yet");
             }
         }
-        // Remember this clock so `read` can synchronize with us.
-        ecx.release_clock(|clock| {
-            writebuf.clock.join(clock);
-        });
-        // Do full write / partial write based on the space available.
-        let actual_write_size = len.min(available_space);
-        let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
-        writebuf.buf.extend(&bytes[..actual_write_size]);
+        anonsocket_write(available_space, &peer_fd, ptr, len, dest, ecx)
+    }
+}
 
-        // Need to stop accessing peer_fd so that it can be notified.
-        drop(writebuf);
+/// Write to AnonSocket based on the space available and return the written byte size.
+fn anonsocket_write<'tcx>(
+    available_space: usize,
+    peer_fd: &FileDescriptionRef,
+    ptr: Pointer,
+    len: usize,
+    dest: &MPlaceTy<'tcx>,
+    ecx: &mut MiriInterpCx<'tcx>,
+) -> InterpResult<'tcx> {
+    let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
+        // FIXME: This should return EBADF, but there's no nice way to do that as there's no
+        // corresponding ErrorKind variant.
+        throw_unsup_format!("writing to the reading end of a pipe")
+    };
+    let mut writebuf = writebuf.borrow_mut();
+
+    // Remember this clock so `read` can synchronize with us.
+    ecx.release_clock(|clock| {
+        writebuf.clock.join(clock);
+    });
+    // Do full write / partial write based on the space available.
+    let actual_write_size = len.min(available_space);
+    let bytes = ecx.read_bytes_ptr_strip_provenance(ptr, Size::from_bytes(len))?;
+    writebuf.buf.extend(&bytes[..actual_write_size]);
+
+    // Need to stop accessing peer_fd so that it can be notified.
+    drop(writebuf);
+
+    // Notification should be provided for peer fd as it became readable.
+    // The kernel does this even if the fd was already readable before, so we follow suit.
+    ecx.check_and_update_readiness(peer_fd)?;
+
+    ecx.return_write_success(actual_write_size, dest)
+}
 
-        // Notification should be provided for peer fd as it became readable.
-        // The kernel does this even if the fd was already readable before, so we follow suit.
+/// Read from AnonSocket and return the number of bytes read.
+fn anonsocket_read<'tcx>(
+    anonsocket: &AnonSocket,
+    peer_fd: Option<FileDescriptionRef>,
+    bytes: &mut [u8],
+    ptr: Pointer,
+    dest: &MPlaceTy<'tcx>,
+    ecx: &mut MiriInterpCx<'tcx>,
+) -> InterpResult<'tcx> {
+    let Some(readbuf) = &anonsocket.readbuf else {
+        // FIXME: This should return EBADF, but there's no nice way to do that as there's no
+        // corresponding ErrorKind variant.
+        throw_unsup_format!("reading from the write end of a pipe")
+    };
+    let mut readbuf = readbuf.borrow_mut();
+
+    // Synchronize with all previous writes to this buffer.
+    // FIXME: this over-synchronizes; a more precise approach would be to
+    // only sync with the writes whose data we will read.
+    ecx.acquire_clock(&readbuf.clock);
+
+    // Do full read / partial read based on the space available.
+    // Conveniently, `read` exists on `VecDeque` and has exactly the desired behavior.
+    let actual_read_size = readbuf.buf.read(bytes).unwrap();
+
+    // Need to drop before others can access the readbuf again.
+    drop(readbuf);
+
+    // A notification should be provided for the peer file description even when it can
+    // only write 1 byte. This implementation is not compliant with the actual Linux kernel
+    // implementation. For optimization reasons, the kernel will only mark the file description
+    // as "writable" when it can write more than a certain number of bytes. Since we
+    // don't know what that *certain number* is, we will provide a notification every time
+    // a read is successful. This might result in our epoll emulation providing more
+    // notifications than the real system.
+    if let Some(peer_fd) = peer_fd {
         ecx.check_and_update_readiness(&peer_fd)?;
-
-        ecx.return_write_success(actual_write_size, dest)
     }
+
+    ecx.return_read_success(ptr, bytes, actual_read_size, dest)
 }
 
 impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {}