diff options
| author | tiif <pekyuan@gmail.com> | 2024-08-19 15:13:12 +0800 |
|---|---|---|
| committer | tiif <pekyuan@gmail.com> | 2024-08-25 01:14:49 +0800 |
| commit | f71cdbbc467ee0f13703dabe025fdd1f49f287f2 (patch) | |
| tree | c2771b5540bc8a333eb123fcd338fa08b7abc3d1 /src | |
| parent | dbfd066ed828c38f5de4fbc6ee0d0e6153606133 (diff) | |
| download | rust-f71cdbbc467ee0f13703dabe025fdd1f49f287f2.tar.gz rust-f71cdbbc467ee0f13703dabe025fdd1f49f287f2.zip | |
Support blocking for epoll
Diffstat (limited to 'src')
| -rw-r--r-- | src/tools/miri/src/concurrency/thread.rs | 2 | ||||
| -rw-r--r-- | src/tools/miri/src/shims/unix/fd.rs | 8 | ||||
| -rw-r--r-- | src/tools/miri/src/shims/unix/linux/epoll.rs | 151 | ||||
| -rw-r--r-- | src/tools/miri/src/shims/unix/linux/foreign_items.rs | 3 | ||||
| -rw-r--r-- | src/tools/miri/tests/fail-dep/tokio/sleep.stderr | 15 | ||||
| -rw-r--r-- | src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs | 98 | ||||
| -rw-r--r-- | src/tools/miri/tests/pass-dep/libc/libc-epoll-no-blocking.rs (renamed from src/tools/miri/tests/pass-dep/libc/libc-epoll.rs) | 0 | ||||
| -rw-r--r-- | src/tools/miri/tests/pass-dep/tokio/sleep.rs (renamed from src/tools/miri/tests/fail-dep/tokio/sleep.rs) | 2 |
8 files changed, 239 insertions, 40 deletions
diff --git a/src/tools/miri/src/concurrency/thread.rs b/src/tools/miri/src/concurrency/thread.rs index f72591f0c4b..1b119ae7192 100644 --- a/src/tools/miri/src/concurrency/thread.rs +++ b/src/tools/miri/src/concurrency/thread.rs @@ -172,6 +172,8 @@ pub enum BlockReason { Futex { addr: u64 }, /// Blocked on an InitOnce. InitOnce(InitOnceId), + /// Blocked on epoll + Epoll, } /// The state of a thread. diff --git a/src/tools/miri/src/shims/unix/fd.rs b/src/tools/miri/src/shims/unix/fd.rs index e3b9835e360..3ca5f6bb2df 100644 --- a/src/tools/miri/src/shims/unix/fd.rs +++ b/src/tools/miri/src/shims/unix/fd.rs @@ -278,6 +278,14 @@ impl WeakFileDescriptionRef { } } +impl VisitProvenance for WeakFileDescriptionRef { + fn visit_provenance(&self, _visit: &mut VisitWith<'_>) { + // A weak reference can never be the only reference to some pointer or place. + // Since the actual file description is tracked by strong ref somewhere, + // it is ok to make this a NOP operation. + } +} + /// A unique id for file descriptions. While we could use the address, considering that /// is definitely unique, the address would expose interpreter internal state when used /// for sorting things. So instead we generate a unique id per file description that stays diff --git a/src/tools/miri/src/shims/unix/linux/epoll.rs b/src/tools/miri/src/shims/unix/linux/epoll.rs index 53f8b06ca6a..a0baa781dea 100644 --- a/src/tools/miri/src/shims/unix/linux/epoll.rs +++ b/src/tools/miri/src/shims/unix/linux/epoll.rs @@ -2,8 +2,9 @@ use std::cell::RefCell; use std::collections::BTreeMap; use std::io; use std::rc::{Rc, Weak}; +use std::time::Duration; -use crate::shims::unix::fd::{FdId, FileDescriptionRef}; +use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef}; use crate::shims::unix::*; use crate::*; @@ -19,6 +20,8 @@ struct Epoll { // This is an Rc because EpollInterest need to hold a reference to update // it. ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>, + /// A list of thread ids blocked on this epoll instance. + thread_id: RefCell<Vec<ThreadId>>, } /// EpollEventInstance contains information that will be returned by epoll_wait. @@ -58,6 +61,8 @@ pub struct EpollEventInterest { data: u64, /// Ready list of the epoll instance under which this EpollEventInterest is registered. ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>, + /// The file descriptor value that this EpollEventInterest is registered under. + epfd: i32, } /// EpollReadyEvents reflects the readiness of a file description. @@ -338,6 +343,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { events, data, ready_list: Rc::clone(ready_list), + epfd: epfd_value, })); if op == epoll_ctl_add { @@ -395,7 +401,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// The `timeout` argument specifies the number of milliseconds that /// `epoll_wait()` will block. Time is measured against the - /// CLOCK_MONOTONIC clock. + /// CLOCK_MONOTONIC clock. If the timeout is zero, the function will not block, + /// while if the timeout is -1, the function will block + /// until at least one event has been retrieved (or an error + /// occurred). /// A call to `epoll_wait()` will block until either: /// • a file descriptor delivers an event; @@ -421,35 +430,107 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { events_op: &OpTy<'tcx>, maxevents: &OpTy<'tcx>, timeout: &OpTy<'tcx>, - ) -> InterpResult<'tcx, Scalar> { + dest: MPlaceTy<'tcx>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let epfd = this.read_scalar(epfd)?.to_i32()?; + let epfd_value = this.read_scalar(epfd)?.to_i32()?; let events = this.read_immediate(events_op)?; let maxevents = this.read_scalar(maxevents)?.to_i32()?; let timeout = this.read_scalar(timeout)?.to_i32()?; - if epfd <= 0 || maxevents <= 0 { + if epfd_value <= 0 || maxevents <= 0 { let einval = this.eval_libc("EINVAL"); this.set_last_error(einval)?; - return Ok(Scalar::from_i32(-1)); + this.write_int(-1, &dest)?; + return Ok(()); } // This needs to come after the maxevents value check, or else maxevents.try_into().unwrap() // will fail. - let events = this.deref_pointer_as( + let event = this.deref_pointer_as( &events, this.libc_array_ty_layout("epoll_event", maxevents.try_into().unwrap()), )?; - // FIXME: Implement blocking support - if timeout != 0 { - throw_unsup_format!("epoll_wait: timeout value can only be 0"); + let Some(epfd) = this.machine.fds.get(epfd_value) else { + let result_value: i32 = this.fd_not_found()?; + this.write_int(result_value, &dest)?; + return Ok(()); + }; + // Create a weak ref of epfd and pass it to callback so we will make sure that epfd + // is not close after the thread unblocks. + let weak_epfd = epfd.downgrade(); + + // We just need to know if the ready list is empty and borrow the thread_ids out. + // The whole logic is wrapped inside a block so we don't need to manually drop epfd later. + let ready_list_empty; + let mut thread_ids; + { + let epoll_file_description = epfd + .downcast::<Epoll>() + .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?; + let binding = epoll_file_description.get_ready_list(); + ready_list_empty = binding.borrow_mut().is_empty(); + thread_ids = epoll_file_description.thread_id.borrow_mut(); + } + if timeout == 0 || !ready_list_empty { + // If the ready list is not empty, or the timeout is 0, we can return immediately. + this.blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event)?; + } else { + // Blocking + let timeout = match timeout { + 0.. => { + let duration = Duration::from_millis(timeout.try_into().unwrap()); + Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)) + } + -1 => None, + ..-1 => { + throw_unsup_format!( + "epoll_wait: Only timeout values greater than -1 are supported." + ); + } + }; + thread_ids.push(this.active_thread()); + this.block_thread( + BlockReason::Epoll, + timeout, + callback!( + @capture<'tcx> { + epfd_value: i32, + weak_epfd: WeakFileDescriptionRef, + dest: MPlaceTy<'tcx>, + event: MPlaceTy<'tcx>, + } + @unblock = |this| { + this.blocking_epoll_callback(epfd_value, weak_epfd, &dest, &event)?; + Ok(()) + } + @timeout = |this| { + // No notification after blocking timeout. + this.write_int(0, &dest)?; + Ok(()) + } + ), + ); } + Ok(()) + } - let Some(epfd) = this.machine.fds.get(epfd) else { - return Ok(Scalar::from_i32(this.fd_not_found()?)); + /// Callback function after epoll_wait unblocks + fn blocking_epoll_callback( + &mut self, + epfd_value: i32, + weak_epfd: WeakFileDescriptionRef, + dest: &MPlaceTy<'tcx>, + event: &MPlaceTy<'tcx>, + ) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + + let Some(epfd) = weak_epfd.upgrade() else { + throw_unsup_format!("epoll FD {epfd_value} is closed while blocking.") }; + let epoll_file_description = epfd .downcast::<Epoll>() .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?; @@ -457,7 +538,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { let ready_list = epoll_file_description.get_ready_list(); let mut ready_list = ready_list.borrow_mut(); let mut num_of_events: i32 = 0; - let mut array_iter = this.project_array_fields(&events)?; + let mut array_iter = this.project_array_fields(event)?; while let Some(des) = array_iter.next(this)? { if let Some(epoll_event_instance) = ready_list_next(this, &mut ready_list) { @@ -473,7 +554,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { break; } } - Ok(Scalar::from_i32(num_of_events)) + this.write_int(num_of_events, dest)?; + Ok(()) } /// For a specific file description, get its ready events and update the corresponding ready @@ -483,17 +565,42 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// /// This *will* report an event if anyone is subscribed to it, without any further filtering, so /// do not call this function when an FD didn't have anything happen to it! - fn check_and_update_readiness(&self, fd_ref: &FileDescriptionRef) -> InterpResult<'tcx, ()> { - let this = self.eval_context_ref(); + fn check_and_update_readiness( + &mut self, + fd_ref: &FileDescriptionRef, + ) -> InterpResult<'tcx, ()> { + let this = self.eval_context_mut(); let id = fd_ref.get_id(); + let mut waiter = Vec::new(); // Get a list of EpollEventInterest that is associated to a specific file description. if let Some(epoll_interests) = this.machine.epoll_interests.get_epoll_interest(id) { for weak_epoll_interest in epoll_interests { if let Some(epoll_interest) = weak_epoll_interest.upgrade() { - check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?; + let is_updated = check_and_update_one_event_interest(fd_ref, epoll_interest, id, this)?; + if is_updated { + // Edge-triggered notification only notify one thread even if there are + // multiple threads block on the same epfd. + let epfd = this.machine.fds.get(epoll_event_interest.epfd).unwrap(); + // FIXME: We can randomly pick a thread to unblock. + + // This unwrap can never fail because if the current epoll instance were + // closed and its epfd value reused, the upgrade of weak_epoll_interest + // above would fail. This guarantee holds because only the epoll instance + // holds a strong ref to epoll_interest. + if let Some(thread_id) = + epfd.downcast::<Epoll>().unwrap().thread_id.borrow_mut().pop() + { + waiter.push(thread_id); + }; + } } } } + waiter.sort(); + waiter.dedup(); + for thread_id in waiter { + this.unblock_thread(thread_id, BlockReason::Epoll)?; + } Ok(()) } } @@ -517,14 +624,15 @@ fn ready_list_next( } /// This helper function checks whether an epoll notification should be triggered for a specific -/// epoll_interest and, if necessary, triggers the notification. Unlike check_and_update_readiness, -/// this function sends a notification to only one epoll instance. +/// epoll_interest and, if necessary, triggers the notification, and returns whether the +/// event interest was updated. Unlike check_and_update_readiness, this function sends a +/// notification to only one epoll instance. fn check_and_update_one_event_interest<'tcx>( fd_ref: &FileDescriptionRef, interest: Rc<RefCell<EpollEventInterest>>, id: FdId, ecx: &MiriInterpCx<'tcx>, -) -> InterpResult<'tcx> { +) -> InterpResult<'tcx, bool> { // Get the bitmask of ready events for a file description. let ready_events_bitmask = fd_ref.get_epoll_ready_events()?.get_event_bitmask(ecx); let epoll_event_interest = interest.borrow(); @@ -539,6 +647,7 @@ fn check_and_update_one_event_interest<'tcx>( let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data); // Triggers the notification by inserting it to the ready list. ready_list.insert(epoll_key, event_instance); + return Ok(true); } - Ok(()) + return Ok(false); } diff --git a/src/tools/miri/src/shims/unix/linux/foreign_items.rs b/src/tools/miri/src/shims/unix/linux/foreign_items.rs index 581f0db42e1..d21f0e8f3e6 100644 --- a/src/tools/miri/src/shims/unix/linux/foreign_items.rs +++ b/src/tools/miri/src/shims/unix/linux/foreign_items.rs @@ -62,8 +62,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { "epoll_wait" => { let [epfd, events, maxevents, timeout] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; - let result = this.epoll_wait(epfd, events, maxevents, timeout)?; - this.write_scalar(result, dest)?; + this.epoll_wait(epfd, events, maxevents, timeout, dest.clone())?; } "eventfd" => { let [val, flag] = diff --git a/src/tools/miri/tests/fail-dep/tokio/sleep.stderr b/src/tools/miri/tests/fail-dep/tokio/sleep.stderr deleted file mode 100644 index d5bf00fc175..00000000000 --- a/src/tools/miri/tests/fail-dep/tokio/sleep.stderr +++ /dev/null @@ -1,15 +0,0 @@ -error: unsupported operation: epoll_wait: timeout value can only be 0 - --> CARGO_REGISTRY/.../epoll.rs:LL:CC - | -LL | / syscall!(epoll_wait( -LL | | self.ep.as_raw_fd(), -LL | | events.as_mut_ptr(), -LL | | events.capacity() as i32, -LL | | timeout, -LL | | )) - | |__________^ epoll_wait: timeout value can only be 0 - | - = help: this is likely not a bug in the program; it indicates that the program performed an operation that Miri does not support - -error: aborting due to 1 previous error - diff --git a/src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs b/src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs new file mode 100644 index 00000000000..e1b4d3d85be --- /dev/null +++ b/src/tools/miri/tests/pass-dep/libc/libc-epoll-blocking.rs @@ -0,0 +1,98 @@ +//@only-target-linux + +use std::convert::TryInto; +use std::thread; +use std::thread::spawn; + +// This is a set of testcases for blocking epoll. + +fn main() { + test_epoll_block_without_notification(); + test_epoll_block_then_unblock(); +} + +// Using `as` cast since `EPOLLET` wraps around +const EPOLL_IN_OUT_ET: u32 = (libc::EPOLLIN | libc::EPOLLOUT | libc::EPOLLET) as _; + +#[track_caller] +fn check_epoll_wait<const N: usize>( + epfd: i32, + expected_notifications: &[(u32, u64)], + timeout: i32, +) { + let epoll_event = libc::epoll_event { events: 0, u64: 0 }; + let mut array: [libc::epoll_event; N] = [epoll_event; N]; + let maxsize = N; + let array_ptr = array.as_mut_ptr(); + let res = unsafe { libc::epoll_wait(epfd, array_ptr, maxsize.try_into().unwrap(), timeout) }; + if res < 0 { + panic!("epoll_wait failed: {}", std::io::Error::last_os_error()); + } + assert_eq!( + res, + expected_notifications.len().try_into().unwrap(), + "got wrong number of notifications" + ); + let slice = unsafe { std::slice::from_raw_parts(array_ptr, res.try_into().unwrap()) }; + for (return_event, expected_event) in slice.iter().zip(expected_notifications.iter()) { + let event = return_event.events; + let data = return_event.u64; + assert_eq!(event, expected_event.0, "got wrong events"); + assert_eq!(data, expected_event.1, "got wrong data"); + } +} +fn test_epoll_block_without_notification() { + // Create an epoll instance. + let epfd = unsafe { libc::epoll_create1(0) }; + assert_ne!(epfd, -1); + + // Create an eventfd instances. + let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC; + let fd = unsafe { libc::eventfd(0, flags) }; + + // Register eventfd with epoll. + let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 }; + let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) }; + assert_eq!(res, 0); + + // epoll_wait to clear notification. + let expected_event = u32::try_from(libc::EPOLLOUT).unwrap(); + let expected_value = fd as u64; + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0); + + // epoll_wait before triggering notification so it will block then unblock. + check_epoll_wait::<1>(epfd, &[], 5); +} + +fn test_epoll_block_then_unblock() { + // Create an epoll instance. + let epfd = unsafe { libc::epoll_create1(0) }; + assert_ne!(epfd, -1); + + // Create a socketpair instance. + let mut fds = [-1, -1]; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + // Register one side of the socketpair with epoll. + let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fds[0] as u64 }; + let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fds[0], &mut ev) }; + assert_eq!(res, 0); + + // epoll_wait to clear notification. + let expected_event = u32::try_from(libc::EPOLLOUT).unwrap(); + let expected_value = fds[0] as u64; + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 0); + + // epoll_wait before triggering notification so it will block then get unblocked before timeout. + let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap(); + let expected_value = fds[0] as u64; + let thread1 = spawn(move || { + thread::yield_now(); + let data = "abcde".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; + assert_eq!(res, 5); + }); + check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10); + thread1.join().unwrap(); +} diff --git a/src/tools/miri/tests/pass-dep/libc/libc-epoll.rs b/src/tools/miri/tests/pass-dep/libc/libc-epoll-no-blocking.rs index 647b5e60649..647b5e60649 100644 --- a/src/tools/miri/tests/pass-dep/libc/libc-epoll.rs +++ b/src/tools/miri/tests/pass-dep/libc/libc-epoll-no-blocking.rs diff --git a/src/tools/miri/tests/fail-dep/tokio/sleep.rs b/src/tools/miri/tests/pass-dep/tokio/sleep.rs index 0fa5080d484..00cc68eba3e 100644 --- a/src/tools/miri/tests/fail-dep/tokio/sleep.rs +++ b/src/tools/miri/tests/pass-dep/tokio/sleep.rs @@ -1,7 +1,5 @@ //@compile-flags: -Zmiri-permissive-provenance -Zmiri-backtrace=full //@only-target-x86_64-unknown-linux: support for tokio only on linux and x86 -//@error-in-other-file: timeout value can only be 0 -//@normalize-stderr-test: " += note:.*\n" -> "" use tokio::time::{sleep, Duration, Instant}; |
