about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authortiif <pekyuan@gmail.com>2024-08-19 15:13:12 +0800
committertiif <pekyuan@gmail.com>2024-08-25 01:14:49 +0800
commitf71cdbbc467ee0f13703dabe025fdd1f49f287f2 (patch)
treec2771b5540bc8a333eb123fcd338fa08b7abc3d1 /src
parentdbfd066ed828c38f5de4fbc6ee0d0e6153606133 (diff)
downloadrust-f71cdbbc467ee0f13703dabe025fdd1f49f287f2.tar.gz
rust-f71cdbbc467ee0f13703dabe025fdd1f49f287f2.zip
Support blocking for epoll
Diffstat (limited to 'src')
-rw-r--r--src/tools/miri/src/concurrency/thread.rs2
-rw-r--r--src/tools/miri/src/shims/unix/fd.rs8
-rw-r--r--src/tools/miri/src/shims/unix/linux/epoll.rs151
-rw-r--r--src/tools/miri/src/shims/unix/linux/foreign_items.rs3
-rw-r--r--src/tools/miri/tests/fail-dep/tokio/sleep.stderr15
-rw-r--r--src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs98
-rw-r--r--src/tools/miri/tests/pass-dep/libc/libc-epoll-no-blocking.rs (renamed from src/tools/miri/tests/pass-dep/libc/libc-epoll.rs)0
-rw-r--r--src/tools/miri/tests/pass-dep/tokio/sleep.rs (renamed from src/tools/miri/tests/fail-dep/tokio/sleep.rs)2
8 files changed, 239 insertions, 40 deletions
diff --git a/src/tools/miri/src/concurrency/thread.rs b/src/tools/miri/src/concurrency/thread.rs
index f72591f0c4b..1b119ae7192 100644
--- a/src/tools/miri/src/concurrency/thread.rs
+++ b/src/tools/miri/src/concurrency/thread.rs
@@ -172,6 +172,8 @@ pub enum BlockReason {
     Futex { addr: u64 },
     /// Blocked on an InitOnce.
     InitOnce(InitOnceId),
+    /// Blocked on epoll
+    Epoll,
 }
 
 /// The state of a thread.
diff --git a/src/tools/miri/src/shims/unix/fd.rs b/src/tools/miri/src/shims/unix/fd.rs
index e3b9835e360..3ca5f6bb2df 100644
--- a/src/tools/miri/src/shims/unix/fd.rs
+++ b/src/tools/miri/src/shims/unix/fd.rs
@@ -278,6 +278,14 @@ impl WeakFileDescriptionRef {
     }
 }
 
+impl VisitProvenance for WeakFileDescriptionRef {
+    fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
+        // A weak reference can never be the only reference to some pointer or place.
+        // Since the actual file description is tracked by strong ref somewhere,
+        // it is ok to make this a NOP operation.
+    }
+}
+
 /// A unique id for file descriptions. While we could use the address, considering that
 /// is definitely unique, the address would expose interpreter internal state when used
 /// for sorting things. So instead we generate a unique id per file description that stays
diff --git a/src/tools/miri/src/shims/unix/linux/epoll.rs b/src/tools/miri/src/shims/unix/linux/epoll.rs
index 53f8b06ca6a..a0baa781dea 100644
--- a/src/tools/miri/src/shims/unix/linux/epoll.rs
+++ b/src/tools/miri/src/shims/unix/linux/epoll.rs
@@ -2,8 +2,9 @@ use std::cell::RefCell;
 use std::collections::BTreeMap;
 use std::io;
 use std::rc::{Rc, Weak};
+use std::time::Duration;
 
-use crate::shims::unix::fd::{FdId, FileDescriptionRef};
+use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef};
 use crate::shims::unix::*;
 use crate::*;
 
@@ -19,6 +20,8 @@ struct Epoll {
     // This is an Rc because EpollInterest need to hold a reference to update
     // it.
     ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
+    /// A list of thread ids blocked on this epoll instance.
+    thread_id: RefCell<Vec<ThreadId>>,
 }
 
 /// EpollEventInstance contains information that will be returned by epoll_wait.
@@ -58,6 +61,8 @@ pub struct EpollEventInterest {
     data: u64,
     /// Ready list of the epoll instance under which this EpollEventInterest is registered.
     ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
+    /// The file descriptor value that this EpollEventInterest is registered under.
+    epfd: i32,
 }
 
 /// EpollReadyEvents reflects the readiness of a file description.
@@ -338,6 +343,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
                 events,
                 data,
                 ready_list: Rc::clone(ready_list),
+                epfd: epfd_value,
             }));
 
             if op == epoll_ctl_add {
@@ -395,7 +401,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
 
     /// The `timeout` argument specifies the number of milliseconds that
     /// `epoll_wait()` will block. Time is measured against the
-    /// CLOCK_MONOTONIC clock.
+    /// CLOCK_MONOTONIC clock. If the timeout is zero, the function will not block,
+    /// while if the timeout is -1, the function will block
+    /// until at least one event has been retrieved (or an error
+    /// occurred).
 
     /// A call to `epoll_wait()` will block until either:
     /// • a file descriptor delivers an event;
@@ -421,35 +430,107 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
         events_op: &OpTy<'tcx>,
         maxevents: &OpTy<'tcx>,
         timeout: &OpTy<'tcx>,
-    ) -> InterpResult<'tcx, Scalar> {
+        dest: MPlaceTy<'tcx>,
+    ) -> InterpResult<'tcx> {
         let this = self.eval_context_mut();
 
-        let epfd = this.read_scalar(epfd)?.to_i32()?;
+        let epfd_value = this.read_scalar(epfd)?.to_i32()?;
         let events = this.read_immediate(events_op)?;
         let maxevents = this.read_scalar(maxevents)?.to_i32()?;
         let timeout = this.read_scalar(timeout)?.to_i32()?;
 
-        if epfd <= 0 || maxevents <= 0 {
+        if epfd_value <= 0 || maxevents <= 0 {
             let einval = this.eval_libc("EINVAL");
             this.set_last_error(einval)?;
-            return Ok(Scalar::from_i32(-1));
+            this.write_int(-1, &dest)?;
+            return Ok(());
         }
 
         // This needs to come after the maxevents value check, or else maxevents.try_into().unwrap()
         // will fail.
-        let events = this.deref_pointer_as(
+        let event = this.deref_pointer_as(
             &events,
             this.libc_array_ty_layout("epoll_event", maxevents.try_into().unwrap()),
         )?;
 
-        // FIXME: Implement blocking support
-        if timeout != 0 {
-            throw_unsup_format!("epoll_wait: timeout value can only be 0");
+        let Some(epfd) = this.machine.fds.get(epfd_value) else {
+            let result_value: i32 = this.fd_not_found()?;
+            this.write_int(result_value, &dest)?;
+            return Ok(());
+        };
+        // Create a weak ref of epfd and pass it to callback so we will make sure that epfd
+        // is not close after the thread unblocks.
+        let weak_epfd = epfd.downgrade();
+
+        // We just need to know if the ready list is empty and borrow the thread_ids out.
+        // The whole logic is wrapped inside a block so we don't need to manually drop epfd later.
+        let ready_list_empty;
+        let mut thread_ids;
+        {
+            let epoll_file_description = epfd
+                .downcast::<Epoll>()
+                .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
+            let binding = epoll_file_description.get_ready_list();
+            ready_list_empty = binding.borrow_mut().is_empty();
+            thread_ids = epoll_file_description.thread_id.borrow_mut();
+        }
+        if timeout == 0 || !ready_list_empty {
+            // If the ready list is not empty, or the timeout is 0, we can return immediately.
+            this.blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event)?;
+        } else {
+            // Blocking
+            let timeout = match timeout {
+                0.. => {
+                    let duration = Duration::from_millis(timeout.try_into().unwrap());
+                    Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration))
+                }
+                -1 => None,
+                ..-1 => {
+                    throw_unsup_format!(
+                        "epoll_wait: Only timeout values greater than -1 are supported."
+                    );
+                }
+            };
+            thread_ids.push(this.active_thread());
+            this.block_thread(
+                BlockReason::Epoll,
+                timeout,
+                callback!(
+                    @capture<'tcx> {
+                        epfd_value: i32,
+                        weak_epfd: WeakFileDescriptionRef,
+                        dest: MPlaceTy<'tcx>,
+                        event: MPlaceTy<'tcx>,
+                    }
+                    @unblock = |this| {
+                        this.blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event)?;
+                        Ok(())
+                    }
+                    @timeout = |this| {
+                        // No notification after blocking timeout.
+                        this.write_int(0, &dest)?;
+                        Ok(())
+                    }
+                ),
+            );
         }
+        Ok(())
+    }
 
-        let Some(epfd) = this.machine.fds.get(epfd) else {
-            return Ok(Scalar::from_i32(this.fd_not_found()?));
+    /// Callback function after epoll_wait unblocks
+    fn blocking_epoll_callback(
+        &mut self,
+        epfd_value: i32,
+        weak_epfd: WeakFileDescriptionRef,
+        dest: &MPlaceTy<'tcx>,
+        event: &MPlaceTy<'tcx>,
+    ) -> InterpResult<'tcx> {
+        let this = self.eval_context_mut();
+
+        let Some(epfd) = weak_epfd.upgrade() else {
+            throw_unsup_format!("epoll FD {epfd_value} is closed while blocking.")
         };
+
         let epoll_file_description = epfd
             .downcast::<Epoll>()
             .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
@@ -457,7 +538,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
         let ready_list = epoll_file_description.get_ready_list();
         let mut ready_list = ready_list.borrow_mut();
         let mut num_of_events: i32 = 0;
-        let mut array_iter = this.project_array_fields(&events)?;
+        let mut array_iter = this.project_array_fields(event)?;
 
         while let Some(des) = array_iter.next(this)? {
             if let Some(epoll_event_instance) = ready_list_next(this, &mut ready_list) {
@@ -473,7 +554,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
                 break;
             }
         }
-        Ok(Scalar::from_i32(num_of_events))
+        this.write_int(num_of_events, dest)?;
+        Ok(())
     }
 
     /// For a specific file description, get its ready events and update the corresponding ready
@@ -483,17 +565,42 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
     ///
     /// This *will* report an event if anyone is subscribed to it, without any further filtering, so
     /// do not call this function when an FD didn't have anything happen to it!
-    fn check_and_update_readiness(&self, fd_ref: &FileDescriptionRef) -> InterpResult<'tcx, ()> {
-        let this = self.eval_context_ref();
+    fn check_and_update_readiness(
+        &mut self,
+        fd_ref: &FileDescriptionRef,
+    ) -> InterpResult<'tcx, ()> {
+        let this = self.eval_context_mut();
         let id = fd_ref.get_id();
+        let mut waiter = Vec::new();
         // Get a list of EpollEventInterest that is associated to a specific file description.
         if let Some(epoll_interests) = this.machine.epoll_interests.get_epoll_interest(id) {
             for weak_epoll_interest in epoll_interests {
                 if let Some(epoll_interest) = weak_epoll_interest.upgrade() {
-                    check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?;
+                    let is_updated = check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?;
+                    if is_updated {
+                        // Edge-triggered notification only notify one thread even if there are
+                        // multiple threads block on the same epfd.
+                        let epfd = this.machine.fds.get(epoll_event_interest.epfd).unwrap();
+                        // FIXME: We can randomly pick a thread to unblock.
+
+                        // This unwrap can never fail because if the current epoll instance were
+                        // closed and its epfd value reused, the upgrade of weak_epoll_interest
+                        // above would fail. This guarantee holds because only the epoll instance
+                        // holds a strong ref to epoll_interest.
+                        if let Some(thread_id) =
+                            epfd.downcast::<Epoll>().unwrap().thread_id.borrow_mut().pop()
+                        {
+                            waiter.push(thread_id);
+                        };
+                    }
                 }
             }
         }
+        waiter.sort();
+        waiter.dedup();
+        for thread_id in waiter {
+            this.unblock_thread(thread_id, BlockReason::Epoll)?;
+        }
         Ok(())
     }
 }
@@ -517,14 +624,15 @@ fn ready_list_next(
 }
 
 /// This helper function checks whether an epoll notification should be triggered for a specific
-/// epoll_interest and, if necessary, triggers the notification. Unlike check_and_update_readiness,
-/// this function sends a notification to only one epoll instance.
+/// epoll_interest and, if necessary, triggers the notification, and returns whether the
+/// event interest was updated. Unlike check_and_update_readiness, this function sends a
+/// notification to only one epoll instance.
 fn check_and_update_one_event_interest<'tcx>(
     fd_ref: &FileDescriptionRef,
     interest: Rc<RefCell<EpollEventInterest>>,
     id: FdId,
     ecx: &MiriInterpCx<'tcx>,
-) -> InterpResult<'tcx> {
+) -> InterpResult<'tcx, bool> {
     // Get the bitmask of ready events for a file description.
     let ready_events_bitmask = fd_ref.get_epoll_ready_events()?.get_event_bitmask(ecx);
     let epoll_event_interest = interest.borrow();
@@ -539,6 +647,7 @@ fn check_and_update_one_event_interest<'tcx>(
         let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data);
         // Triggers the notification by inserting it to the ready list.
         ready_list.insert(epoll_key, event_instance);
+        return Ok(true);
     }
-    Ok(())
+    return Ok(false);
 }
diff --git a/src/tools/miri/src/shims/unix/linux/foreign_items.rs b/src/tools/miri/src/shims/unix/linux/foreign_items.rs
index 581f0db42e1..d21f0e8f3e6 100644
--- a/src/tools/miri/src/shims/unix/linux/foreign_items.rs
+++ b/src/tools/miri/src/shims/unix/linux/foreign_items.rs
@@ -62,8 +62,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
             "epoll_wait" => {
                 let [epfd, events, maxevents, timeout] =
                     this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
-                let result = this.epoll_wait(epfd, events, maxevents, timeout)?;
-                this.write_scalar(result, dest)?;
+                this.epoll_wait(epfd, events, maxevents, timeout, dest.clone())?;
             }
             "eventfd" => {
                 let [val, flag] =
diff --git a/src/tools/miri/tests/fail-dep/tokio/sleep.stderr b/src/tools/miri/tests/fail-dep/tokio/sleep.stderr
deleted file mode 100644
index d5bf00fc175..00000000000
--- a/src/tools/miri/tests/fail-dep/tokio/sleep.stderr
+++ /dev/null
@@ -1,15 +0,0 @@
-error: unsupported operation: epoll_wait: timeout value can only be 0
-  --> CARGO_REGISTRY/.../epoll.rs:LL:CC
-   |
-LL | /         syscall!(epoll_wait(
-LL | |             self.ep.as_raw_fd(),
-LL | |             events.as_mut_ptr(),
-LL | |             events.capacity() as i32,
-LL | |             timeout,
-LL | |         ))
-   | |__________^ epoll_wait: timeout value can only be 0
-   |
-   = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support
-
-error: aborting due to 1 previous error
-
diff --git a/src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs b/src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs
new file mode 100644
index 00000000000..e1b4d3d85be
--- /dev/null
+++ b/src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs
@@ -0,0 +1,98 @@
+//@only-target-linux
+
+use std::convert::TryInto;
+use std::thread;
+use std::thread::spawn;
+
+// This is a set of testcases for blocking epoll.
+
+fn main() {
+    test_epoll_block_without_notification();
+    test_epoll_block_then_unblock();
+}
+
+// Using `as` cast since `EPOLLET` wraps around
+const EPOLL_IN_OUT_ET: u32 = (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET) as _;
+
+#[track_caller]
+fn check_epoll_wait<const N: usize>(
+    epfd: i32,
+    expected_notifications: &[(u32, u64)],
+    timeout: i32,
+) {
+    let epoll_event = libc::epoll_event { events: 0, u64: 0 };
+    let mut array: [libc::epoll_event; N] = [epoll_event; N];
+    let maxsize = N;
+    let array_ptr = array.as_mut_ptr();
+    let res = unsafe { libc::epoll_wait(epfd, array_ptr, maxsize.try_into().unwrap(), timeout) };
+    if res < 0 {
+        panic!("epoll_wait failed: {}", std::io::Error::last_os_error());
+    }
+    assert_eq!(
+        res,
+        expected_notifications.len().try_into().unwrap(),
+        "got wrong number of notifications"
+    );
+    let slice = unsafe { std::slice::from_raw_parts(array_ptr, res.try_into().unwrap()) };
+    for (return_event, expected_event) in slice.iter().zip(expected_notifications.iter()) {
+        let event = return_event.events;
+        let data = return_event.u64;
+        assert_eq!(event, expected_event.0, "got wrong events");
+        assert_eq!(data, expected_event.1, "got wrong data");
+    }
+}
+fn test_epoll_block_without_notification() {
+    // Create an epoll instance.
+    let epfd = unsafe { libc::epoll_create1(0) };
+    assert_ne!(epfd, -1);
+
+    // Create an eventfd instances.
+    let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
+    let fd = unsafe { libc::eventfd(0, flags) };
+
+    // Register eventfd with epoll.
+    let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 };
+    let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) };
+    assert_eq!(res, 0);
+
+    // epoll_wait to clear notification.
+    let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
+    let expected_value = fd as u64;
+    check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
+
+    // epoll_wait before triggering notification so it will block then unblock.
+    check_epoll_wait::<1>(epfd, &[], 5);
+}
+
+fn test_epoll_block_then_unblock() {
+    // Create an epoll instance.
+    let epfd = unsafe { libc::epoll_create1(0) };
+    assert_ne!(epfd, -1);
+
+    // Create a socketpair instance.
+    let mut fds = [-1, -1];
+    let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
+    assert_eq!(res, 0);
+
+    // Register one side of the socketpair with epoll.
+    let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 };
+    let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) };
+    assert_eq!(res, 0);
+
+    // epoll_wait to clear notification.
+    let expected_event = u32::try_from(libc::EPOLLOUT).unwrap();
+    let expected_value = fds[0] as u64;
+    check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0);
+
+    // epoll_wait before triggering notification so it will block then get unblocked before timeout.
+    let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
+    let expected_value = fds[0] as u64;
+    let thread1 = spawn(move || {
+        thread::yield_now();
+        let data = "abcde".as_bytes().as_ptr();
+        let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
+        assert_eq!(res, 5);
+    });
+    check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10);
+    thread1.join().unwrap();
+}
diff --git a/src/tools/miri/tests/pass-dep/libc/libc-epoll.rs b/src/tools/miri/tests/pass-dep/libc/libc-epoll-no-blocking.rs
index 647b5e60649..647b5e60649 100644
--- a/src/tools/miri/tests/pass-dep/libc/libc-epoll.rs
+++ b/src/tools/miri/tests/pass-dep/libc/libc-epoll-no-blocking.rs
diff --git a/src/tools/miri/tests/fail-dep/tokio/sleep.rs b/src/tools/miri/tests/pass-dep/tokio/sleep.rs
index 0fa5080d484..00cc68eba3e 100644
--- a/src/tools/miri/tests/fail-dep/tokio/sleep.rs
+++ b/src/tools/miri/tests/pass-dep/tokio/sleep.rs
@@ -1,7 +1,5 @@
 //@compile-flags: -Zmiri-permissive-provenance -Zmiri-backtrace=full
 //@only-target-x86_64-unknown-linux: support for tokio only on linux and x86
-//@error-in-other-file: timeout value can only be 0
-//@normalize-stderr-test: " += note:.*\n" -> ""
 
 use tokio::time::{sleep, Duration, Instant};