diff options
| author | Matthias Krüger <matthias.krueger@famsik.de> | 2024-01-22 16:13:26 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-22 16:13:26 +0100 |
| commit | e9c2e1bfbed1b7c1f48bcbd545620a54af2c40cf (patch) | |
| tree | 63bf2a44e3432a3bd9b00751fa50a1def50dfebb | |
| parent | c5984caa443cbffdfe9223df75f6e9ab88409f1f (diff) | |
| parent | 50e4fede2427f54cbdb9e4d556aa9ef4c68332fb (diff) | |
| download | rust-e9c2e1bfbed1b7c1f48bcbd545620a54af2c40cf.tar.gz rust-e9c2e1bfbed1b7c1f48bcbd545620a54af2c40cf.zip | |
Rollup merge of #119408 - betrusted-io:xous-fixes-add-network, r=Mark-Simulacrum
xous: misc fixes + add network support
This patchset makes several fixes to Xous support. Additionally, this patch adds networking support.
Many of these fixes are the result of the recent patch to get `unwinding` support merged. As a result of this patch, we can now run rust tests. As a result of these tests, we now have 729 tests passing:
```
failures:
env::tests::test
env::tests::test_self_exe_path
env::tests::vars_debug
env::tests::vars_os_debug
os::raw::tests::same
path::tests::test_push
path::tests::test_set_file_name
time::tests::since_epoch
test result: FAILED. 729 passed; 8 failed; 1 ignored; 0 measured; 0 filtered out; finished in 214.54s
```
In the course of fixing several tests and getting the test sequence to reliably run, several issues were found. This patchset fixes those issues.
20 files changed, 1770 insertions, 176 deletions
diff --git a/library/std/src/os/xous/ffi.rs b/library/std/src/os/xous/ffi.rs index 8be7fbb102f..7fe84db515c 100644 --- a/library/std/src/os/xous/ffi.rs +++ b/library/std/src/os/xous/ffi.rs @@ -88,29 +88,31 @@ fn lend_impl( let a3 = opcode; let a4 = data.as_ptr() as usize; let a5 = data.len(); - let mut a6 = arg1; - let mut a7 = arg2; + let a6 = arg1; + let a7 = arg2; + let mut ret1; + let mut ret2; unsafe { core::arch::asm!( "ecall", inlateout("a0") a0, - inlateout("a1") a1 => _, - inlateout("a2") a2 => _, + inlateout("a1") a1 => ret1, + inlateout("a2") a2 => ret2, inlateout("a3") a3 => _, inlateout("a4") a4 => _, inlateout("a5") a5 => _, - inlateout("a6") a6, - inlateout("a7") a7, + inlateout("a6") a6 => _, + inlateout("a7") a7 => _, ) }; let result = a0; if result == SyscallResult::MemoryReturned as usize { - Ok((a6, a7)) + Ok((ret1, ret2)) } else if result == SyscallResult::Error as usize { - Err(a1.into()) + Err(ret1.into()) } else { Err(Error::InternalError) } @@ -405,7 +407,7 @@ pub(crate) unsafe fn map_memory<T>( pub(crate) unsafe fn unmap_memory<T>(range: *mut [T]) -> Result<(), Error> { let mut a0 = Syscall::UnmapMemory as usize; let mut a1 = range.as_mut_ptr() as usize; - let a2 = range.len(); + let a2 = range.len() * core::mem::size_of::<T>(); let a3 = 0; let a4 = 0; let a5 = 0; @@ -450,7 +452,7 @@ pub(crate) unsafe fn update_memory_flags<T>( ) -> Result<(), Error> { let mut a0 = Syscall::UpdateMemoryFlags as usize; let mut a1 = range.as_mut_ptr() as usize; - let a2 = range.len(); + let a2 = range.len() * core::mem::size_of::<T>(); let a3 = new_flags.bits(); let a4 = 0; // Process ID is currently None let a5 = 0; diff --git a/library/std/src/os/xous/services.rs b/library/std/src/os/xous/services.rs index 5c219f1fbb9..a75be1b8570 100644 --- a/library/std/src/os/xous/services.rs +++ b/library/std/src/os/xous/services.rs @@ -1,9 +1,15 @@ use crate::os::xous::ffi::Connection; use core::sync::atomic::{AtomicU32, Ordering}; +mod dns; +pub(crate) use dns::*; + mod log; pub(crate) use log::*; +mod net; +pub(crate) use net::*; + mod systime; pub(crate) use systime::*; diff --git a/library/std/src/os/xous/services/dns.rs b/library/std/src/os/xous/services/dns.rs new file mode 100644 index 00000000000..a7d88f4892c --- /dev/null +++ b/library/std/src/os/xous/services/dns.rs @@ -0,0 +1,28 @@ +use crate::os::xous::ffi::Connection; +use crate::os::xous::services::connect; +use core::sync::atomic::{AtomicU32, Ordering}; + +#[repr(usize)] +pub(crate) enum DnsLendMut { + RawLookup = 6, +} + +impl Into<usize> for DnsLendMut { + fn into(self) -> usize { + self as usize + } +} + +/// Return a `Connection` to the DNS lookup server. This server is used for +/// querying domain name values. +pub(crate) fn dns_server() -> Connection { + static DNS_CONNECTION: AtomicU32 = AtomicU32::new(0); + let cid = DNS_CONNECTION.load(Ordering::Relaxed); + if cid != 0 { + return cid.into(); + } + + let cid = connect("_DNS Resolver Middleware_").unwrap(); + DNS_CONNECTION.store(cid.into(), Ordering::Relaxed); + cid +} diff --git a/library/std/src/os/xous/services/log.rs b/library/std/src/os/xous/services/log.rs index e6bae929eac..55a501dc7d0 100644 --- a/library/std/src/os/xous/services/log.rs +++ b/library/std/src/os/xous/services/log.rs @@ -45,6 +45,17 @@ impl<'a> Into<[usize; 5]> for LogScalar<'a> { } } +pub(crate) enum LogLend { + StandardOutput = 1, + StandardError = 2, +} + +impl Into<usize> for LogLend { + fn into(self) -> usize { + self as usize + } +} + /// Return a `Connection` to the log server, which is used for printing messages to /// the console and reporting panics. If the log server has not yet started, this /// will block until the server is running. It is safe to call this multiple times, diff --git a/library/std/src/os/xous/services/net.rs b/library/std/src/os/xous/services/net.rs new file mode 100644 index 00000000000..26d337dcef1 --- /dev/null +++ b/library/std/src/os/xous/services/net.rs @@ -0,0 +1,95 @@ +use crate::os::xous::ffi::Connection; +use crate::os::xous::services::connect; +use core::sync::atomic::{AtomicU32, Ordering}; + +pub(crate) enum NetBlockingScalar { + StdGetTtlUdp(u16 /* fd */), /* 36 */ + StdSetTtlUdp(u16 /* fd */, u32 /* ttl */), /* 37 */ + StdGetTtlTcp(u16 /* fd */), /* 36 */ + StdSetTtlTcp(u16 /* fd */, u32 /* ttl */), /* 37 */ + StdGetNodelay(u16 /* fd */), /* 38 */ + StdSetNodelay(u16 /* fd */, bool), /* 39 */ + StdTcpClose(u16 /* fd */), /* 34 */ + StdUdpClose(u16 /* fd */), /* 41 */ + StdTcpStreamShutdown(u16 /* fd */, crate::net::Shutdown /* how */), /* 46 */ +} + +pub(crate) enum NetLendMut { + StdTcpConnect, /* 30 */ + StdTcpTx(u16 /* fd */), /* 31 */ + StdTcpPeek(u16 /* fd */, bool /* nonblocking */), /* 32 */ + StdTcpRx(u16 /* fd */, bool /* nonblocking */), /* 33 */ + StdGetAddress(u16 /* fd */), /* 35 */ + StdUdpBind, /* 40 */ + StdUdpRx(u16 /* fd */), /* 42 */ + StdUdpTx(u16 /* fd */), /* 43 */ + StdTcpListen, /* 44 */ + StdTcpAccept(u16 /* fd */), /* 45 */ +} + +impl Into<usize> for NetLendMut { + fn into(self) -> usize { + match self { + NetLendMut::StdTcpConnect => 30, + NetLendMut::StdTcpTx(fd) => 31 | ((fd as usize) << 16), + NetLendMut::StdTcpPeek(fd, blocking) => { + 32 | ((fd as usize) << 16) | if blocking { 0x8000 } else { 0 } + } + NetLendMut::StdTcpRx(fd, blocking) => { + 33 | ((fd as usize) << 16) | if blocking { 0x8000 } else { 0 } + } + NetLendMut::StdGetAddress(fd) => 35 | ((fd as usize) << 16), + NetLendMut::StdUdpBind => 40, + NetLendMut::StdUdpRx(fd) => 42 | ((fd as usize) << 16), + NetLendMut::StdUdpTx(fd) => 43 | ((fd as usize) << 16), + NetLendMut::StdTcpListen => 44, + NetLendMut::StdTcpAccept(fd) => 45 | ((fd as usize) << 16), + } + } +} + +impl<'a> Into<[usize; 5]> for NetBlockingScalar { + fn into(self) -> [usize; 5] { + match self { + NetBlockingScalar::StdGetTtlTcp(fd) => [36 | ((fd as usize) << 16), 0, 0, 0, 0], + NetBlockingScalar::StdGetTtlUdp(fd) => [36 | ((fd as usize) << 16), 0, 0, 0, 1], + NetBlockingScalar::StdSetTtlTcp(fd, ttl) => { + [37 | ((fd as usize) << 16), ttl as _, 0, 0, 0] + } + NetBlockingScalar::StdSetTtlUdp(fd, ttl) => { + [37 | ((fd as usize) << 16), ttl as _, 0, 0, 1] + } + NetBlockingScalar::StdGetNodelay(fd) => [38 | ((fd as usize) << 16), 0, 0, 0, 0], + NetBlockingScalar::StdSetNodelay(fd, enabled) => { + [39 | ((fd as usize) << 16), if enabled { 1 } else { 0 }, 0, 0, 1] + } + NetBlockingScalar::StdTcpClose(fd) => [34 | ((fd as usize) << 16), 0, 0, 0, 0], + NetBlockingScalar::StdUdpClose(fd) => [41 | ((fd as usize) << 16), 0, 0, 0, 0], + NetBlockingScalar::StdTcpStreamShutdown(fd, how) => [ + 46 | ((fd as usize) << 16), + match how { + crate::net::Shutdown::Read => 1, + crate::net::Shutdown::Write => 2, + crate::net::Shutdown::Both => 3, + }, + 0, + 0, + 0, + ], + } + } +} + +/// Return a `Connection` to the Network server. This server provides all +/// OS-level networking functions. +pub(crate) fn net_server() -> Connection { + static NET_CONNECTION: AtomicU32 = AtomicU32::new(0); + let cid = NET_CONNECTION.load(Ordering::Relaxed); + if cid != 0 { + return cid.into(); + } + + let cid = connect("_Middleware Network Server_").unwrap(); + NET_CONNECTION.store(cid.into(), Ordering::Relaxed); + cid +} diff --git a/library/std/src/sys/pal/xous/alloc.rs b/library/std/src/sys/pal/xous/alloc.rs index b3a3e691e0d..0d540e95520 100644 --- a/library/std/src/sys/pal/xous/alloc.rs +++ b/library/std/src/sys/pal/xous/alloc.rs @@ -1,7 +1,15 @@ use crate::alloc::{GlobalAlloc, Layout, System}; +#[cfg(not(test))] +#[export_name = "_ZN16__rust_internals3std3sys4xous5alloc8DLMALLOCE"] static mut DLMALLOC: dlmalloc::Dlmalloc = dlmalloc::Dlmalloc::new(); +#[cfg(test)] +extern "Rust" { + #[link_name = "_ZN16__rust_internals3std3sys4xous5alloc8DLMALLOCE"] + static mut DLMALLOC: dlmalloc::Dlmalloc; +} + #[stable(feature = "alloc_system_type", since = "1.28.0")] unsafe impl GlobalAlloc for System { #[inline] diff --git a/library/std/src/sys/pal/xous/locks/condvar.rs b/library/std/src/sys/pal/xous/locks/condvar.rs index 1bb38dfa341..510235046e1 100644 --- a/library/std/src/sys/pal/xous/locks/condvar.rs +++ b/library/std/src/sys/pal/xous/locks/condvar.rs @@ -1,14 +1,17 @@ use super::mutex::Mutex; use crate::os::xous::ffi::{blocking_scalar, scalar}; -use crate::os::xous::services::ticktimer_server; -use crate::sync::Mutex as StdMutex; +use crate::os::xous::services::{ticktimer_server, TicktimerScalar}; use crate::time::Duration; +use core::sync::atomic::{AtomicUsize, Ordering}; // The implementation is inspired by Andrew D. Birrell's paper // "Implementing Condition Variables with Semaphores" +const NOTIFY_TRIES: usize = 3; + pub struct Condvar { - counter: StdMutex<usize>, + counter: AtomicUsize, + timed_out: AtomicUsize, } unsafe impl Send for Condvar {} @@ -18,94 +21,128 @@ impl Condvar { #[inline] #[rustc_const_stable(feature = "const_locks", since = "1.63.0")] pub const fn new() -> Condvar { - Condvar { counter: StdMutex::new(0) } + Condvar { counter: AtomicUsize::new(0), timed_out: AtomicUsize::new(0) } } - pub fn notify_one(&self) { - let mut counter = self.counter.lock().unwrap(); - if *counter <= 0 { + fn notify_some(&self, to_notify: usize) { + // Assumption: The Mutex protecting this condvar is locked throughout the + // entirety of this call, preventing calls to `wait` and `wait_timeout`. + + // Logic check: Ensure that there aren't any missing waiters. Remove any that + // timed-out, ensuring the counter doesn't underflow. + assert!(self.timed_out.load(Ordering::Relaxed) <= self.counter.load(Ordering::Relaxed)); + self.counter.fetch_sub(self.timed_out.swap(0, Ordering::Relaxed), Ordering::Relaxed); + + // Figure out how many threads to notify. Note that it is impossible for `counter` + // to increase during this operation because Mutex is locked. However, it is + // possible for `counter` to decrease due to a condvar timing out, in which + // case the corresponding `timed_out` will increase accordingly. + let Ok(waiter_count) = + self.counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |counter| { + if counter == 0 { + return None; + } else { + Some(counter - counter.min(to_notify)) + } + }) + else { + // No threads are waiting on this condvar return; - } else { - *counter -= 1; - } - let result = blocking_scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), 1).into(), - ); - drop(counter); - result.expect("failure to send NotifyCondition command"); - } + }; - pub fn notify_all(&self) { - let mut counter = self.counter.lock().unwrap(); - if *counter <= 0 { + let mut remaining_to_wake = waiter_count.min(to_notify); + if remaining_to_wake == 0 { return; } - let result = blocking_scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), *counter) - .into(), - ); - *counter = 0; - drop(counter); + for _wake_tries in 0..NOTIFY_TRIES { + let result = blocking_scalar( + ticktimer_server(), + TicktimerScalar::NotifyCondition(self.index(), remaining_to_wake).into(), + ) + .expect("failure to send NotifyCondition command"); + + // Remove the list of waiters that were notified + remaining_to_wake -= result[0]; + + // Also remove the number of waiters that timed out. Clamp it to 0 in order to + // ensure we don't wait forever in case the waiter woke up between the time + // we counted the remaining waiters and now. + remaining_to_wake = + remaining_to_wake.saturating_sub(self.timed_out.swap(0, Ordering::Relaxed)); + if remaining_to_wake == 0 { + return; + } + crate::thread::yield_now(); + } + } - result.expect("failure to send NotifyCondition command"); + pub fn notify_one(&self) { + self.notify_some(1) + } + + pub fn notify_all(&self) { + self.notify_some(self.counter.load(Ordering::Relaxed)) } fn index(&self) -> usize { - self as *const Condvar as usize + core::ptr::from_ref(self).addr() } - pub unsafe fn wait(&self, mutex: &Mutex) { - let mut counter = self.counter.lock().unwrap(); - *counter += 1; + /// Unlock the given Mutex and wait for the notification. Wait at most + /// `ms` milliseconds, or pass `0` to wait forever. + /// + /// Returns `true` if the condition was received, `false` if it timed out + fn wait_ms(&self, mutex: &Mutex, ms: usize) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed); unsafe { mutex.unlock() }; - drop(counter); + // Threading concern: There is a chance that the `notify` thread wakes up here before + // we have a chance to wait for the condition. This is fine because we've recorded + // the fact that we're waiting by incrementing the counter. let result = blocking_scalar( ticktimer_server(), - crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), 0).into(), + TicktimerScalar::WaitForCondition(self.index(), ms).into(), ); + let awoken = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0; + + // If we awoke due to a timeout, increment the `timed_out` counter so that the + // main loop of `notify` knows there's a timeout. + // + // This is done with the Mutex still unlocked, because the Mutex might still + // be locked by the `notify` process above. + if !awoken { + self.timed_out.fetch_add(1, Ordering::Relaxed); + } + unsafe { mutex.lock() }; + awoken + } - result.expect("Ticktimer: failure to send WaitForCondition command"); + pub unsafe fn wait(&self, mutex: &Mutex) { + // Wait for 0 ms, which is a special case to "wait forever" + self.wait_ms(mutex, 0); } pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - let mut counter = self.counter.lock().unwrap(); - *counter += 1; - unsafe { mutex.unlock() }; - drop(counter); - let mut millis = dur.as_millis() as usize; + // Ensure we don't wait for 0 ms, which would cause us to wait forever if millis == 0 { millis = 1; } - - let result = blocking_scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), millis) - .into(), - ); - unsafe { mutex.lock() }; - - let result = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0; - - // If we awoke due to a timeout, decrement the wake count, as that would not have - // been done in the `notify()` call. - if !result { - *self.counter.lock().unwrap() -= 1; - } - result + self.wait_ms(mutex, millis) } } impl Drop for Condvar { fn drop(&mut self) { - scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::FreeCondition(self.index()).into(), - ) - .ok(); + let remaining_count = self.counter.load(Ordering::Relaxed); + let timed_out = self.timed_out.load(Ordering::Relaxed); + assert!( + remaining_count - timed_out == 0, + "counter was {} and timed_out was {} not 0", + remaining_count, + timed_out + ); + scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok(); } } diff --git a/library/std/src/sys/pal/xous/locks/mutex.rs b/library/std/src/sys/pal/xous/locks/mutex.rs index ea51776d54e..a8c9518ff0b 100644 --- a/library/std/src/sys/pal/xous/locks/mutex.rs +++ b/library/std/src/sys/pal/xous/locks/mutex.rs @@ -1,5 +1,5 @@ -use crate::os::xous::ffi::{blocking_scalar, do_yield, scalar}; -use crate::os::xous::services::ticktimer_server; +use crate::os::xous::ffi::{blocking_scalar, do_yield}; +use crate::os::xous::services::{ticktimer_server, TicktimerScalar}; use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed, Ordering::SeqCst}; pub struct Mutex { @@ -29,7 +29,7 @@ impl Mutex { } fn index(&self) -> usize { - self as *const Mutex as usize + core::ptr::from_ref(self).addr() } #[inline] @@ -83,11 +83,8 @@ impl Mutex { } // Unblock one thread that is waiting on this message. - scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::UnlockMutex(self.index()).into(), - ) - .expect("failure to send UnlockMutex command"); + blocking_scalar(ticktimer_server(), TicktimerScalar::UnlockMutex(self.index()).into()) + .expect("failure to send UnlockMutex command"); } #[inline] @@ -106,11 +103,8 @@ impl Drop for Mutex { // If there was Mutex contention, then we involved the ticktimer. Free // the resources associated with this Mutex as it is deallocated. if self.contended.load(Relaxed) { - scalar( - ticktimer_server(), - crate::os::xous::services::TicktimerScalar::FreeMutex(self.index()).into(), - ) - .ok(); + blocking_scalar(ticktimer_server(), TicktimerScalar::FreeMutex(self.index()).into()) + .ok(); } } } diff --git a/library/std/src/sys/pal/xous/locks/rwlock.rs b/library/std/src/sys/pal/xous/locks/rwlock.rs index 618da758adf..ab45b33e1f6 100644 --- a/library/std/src/sys/pal/xous/locks/rwlock.rs +++ b/library/std/src/sys/pal/xous/locks/rwlock.rs @@ -1,5 +1,5 @@ -use crate::os::xous::ffi::do_yield; -use crate::sync::atomic::{AtomicIsize, Ordering::SeqCst}; +use crate::sync::atomic::{AtomicIsize, Ordering::Acquire}; +use crate::thread::yield_now; pub struct RwLock { /// The "mode" value indicates how many threads are waiting on this @@ -14,6 +14,9 @@ pub struct RwLock { mode: AtomicIsize, } +const RWLOCK_WRITING: isize = -1; +const RWLOCK_FREE: isize = 0; + unsafe impl Send for RwLock {} unsafe impl Sync for RwLock {} @@ -21,52 +24,51 @@ impl RwLock { #[inline] #[rustc_const_stable(feature = "const_locks", since = "1.63.0")] pub const fn new() -> RwLock { - RwLock { mode: AtomicIsize::new(0) } + RwLock { mode: AtomicIsize::new(RWLOCK_FREE) } } #[inline] pub unsafe fn read(&self) { while !unsafe { self.try_read() } { - do_yield(); + yield_now(); } } #[inline] pub unsafe fn try_read(&self) -> bool { - // Non-atomically determine the current value. - let current = self.mode.load(SeqCst); - - // If it's currently locked for writing, then we cannot read. - if current < 0 { - return false; - } - - // Attempt to lock. If the `current` value has changed, then this - // operation will fail and we will not obtain the lock even if we - // could potentially keep it. - let new = current + 1; - self.mode.compare_exchange(current, new, SeqCst, SeqCst).is_ok() + self.mode + .fetch_update( + Acquire, + Acquire, + |v| if v == RWLOCK_WRITING { None } else { Some(v + 1) }, + ) + .is_ok() } #[inline] pub unsafe fn write(&self) { while !unsafe { self.try_write() } { - do_yield(); + yield_now(); } } #[inline] pub unsafe fn try_write(&self) -> bool { - self.mode.compare_exchange(0, -1, SeqCst, SeqCst).is_ok() + self.mode.compare_exchange(RWLOCK_FREE, RWLOCK_WRITING, Acquire, Acquire).is_ok() } #[inline] pub unsafe fn read_unlock(&self) { - self.mode.fetch_sub(1, SeqCst); + let previous = self.mode.fetch_sub(1, Acquire); + assert!(previous != RWLOCK_FREE); + assert!(previous != RWLOCK_WRITING); } #[inline] pub unsafe fn write_unlock(&self) { - assert_eq!(self.mode.compare_exchange(-1, 0, SeqCst, SeqCst), Ok(-1)); + assert_eq!( + self.mode.compare_exchange(RWLOCK_WRITING, RWLOCK_FREE, Acquire, Acquire), + Ok(RWLOCK_WRITING) + ); } } diff --git a/library/std/src/sys/pal/xous/mod.rs b/library/std/src/sys/pal/xous/mod.rs index 230067907c8..516d0a68720 100644 --- a/library/std/src/sys/pal/xous/mod.rs +++ b/library/std/src/sys/pal/xous/mod.rs @@ -12,10 +12,7 @@ pub mod fs; #[path = "../unsupported/io.rs"] pub mod io; pub mod locks; -#[path = "../unsupported/net.rs"] pub mod net; -#[path = "../unsupported/once.rs"] -pub mod once; pub mod os; #[path = "../unix/path.rs"] pub mod path; diff --git a/library/std/src/sys/pal/xous/net/dns.rs b/library/std/src/sys/pal/xous/net/dns.rs new file mode 100644 index 00000000000..63056324bfb --- /dev/null +++ b/library/std/src/sys/pal/xous/net/dns.rs @@ -0,0 +1,127 @@ +use crate::io; +use crate::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use crate::os::xous::ffi::lend_mut; +use crate::os::xous::services::{dns_server, DnsLendMut}; +use core::convert::{TryFrom, TryInto}; + +pub struct DnsError { + pub code: u8, +} + +#[repr(C, align(4096))] +struct LookupHostQuery([u8; 4096]); + +pub struct LookupHost { + data: LookupHostQuery, + port: u16, + offset: usize, + count: usize, +} + +impl LookupHost { + pub fn port(&self) -> u16 { + self.port + } +} + +impl Iterator for LookupHost { + type Item = SocketAddr; + fn next(&mut self) -> Option<SocketAddr> { + if self.offset >= self.data.0.len() { + return None; + } + match self.data.0.get(self.offset) { + Some(&4) => { + self.offset += 1; + if self.offset + 4 > self.data.0.len() { + return None; + } + let result = Some(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new( + self.data.0[self.offset], + self.data.0[self.offset + 1], + self.data.0[self.offset + 2], + self.data.0[self.offset + 3], + ), + self.port, + ))); + self.offset += 4; + result + } + Some(&6) => { + self.offset += 1; + if self.offset + 16 > self.data.0.len() { + return None; + } + let mut new_addr = [0u8; 16]; + for (src, octet) in self.data.0[(self.offset + 1)..(self.offset + 16 + 1)] + .iter() + .zip(new_addr.iter_mut()) + { + *octet = *src; + } + let result = + Some(SocketAddr::V6(SocketAddrV6::new(new_addr.into(), self.port, 0, 0))); + self.offset += 16; + result + } + _ => None, + } + } +} + +pub fn lookup(query: &str, port: u16) -> Result<LookupHost, DnsError> { + let mut result = LookupHost { data: LookupHostQuery([0u8; 4096]), offset: 0, count: 0, port }; + + // Copy the query into the message that gets sent to the DNS server + for (query_byte, result_byte) in query.as_bytes().iter().zip(result.data.0.iter_mut()) { + *result_byte = *query_byte; + } + + lend_mut( + dns_server(), + DnsLendMut::RawLookup.into(), + &mut result.data.0, + 0, + query.as_bytes().len(), + ) + .unwrap(); + if result.data.0[0] != 0 { + return Err(DnsError { code: result.data.0[1] }); + } + assert_eq!(result.offset, 0); + result.count = result.data.0[1] as usize; + + // Advance the offset to the first record + result.offset = 2; + Ok(result) +} + +impl TryFrom<&str> for LookupHost { + type Error = io::Error; + + fn try_from(s: &str) -> io::Result<LookupHost> { + macro_rules! try_opt { + ($e:expr, $msg:expr) => { + match $e { + Some(r) => r, + None => return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &$msg)), + } + }; + } + + // split the string by ':' and convert the second part to u16 + let (host, port_str) = try_opt!(s.rsplit_once(':'), "invalid socket address"); + let port: u16 = try_opt!(port_str.parse().ok(), "invalid port value"); + (host, port).try_into() + } +} + +impl TryFrom<(&str, u16)> for LookupHost { + type Error = io::Error; + + fn try_from(v: (&str, u16)) -> io::Result<LookupHost> { + lookup(v.0, v.1) + .map_err(|_e| io::const_io_error!(io::ErrorKind::InvalidInput, &"DNS failure")) + } +} diff --git a/library/std/src/sys/pal/xous/net/mod.rs b/library/std/src/sys/pal/xous/net/mod.rs new file mode 100644 index 00000000000..b5a3da136a6 --- /dev/null +++ b/library/std/src/sys/pal/xous/net/mod.rs @@ -0,0 +1,84 @@ +mod dns; + +mod tcpstream; +pub use tcpstream::*; + +mod tcplistener; +pub use tcplistener::*; + +mod udp; +pub use udp::*; + +// this structure needs to be synchronized with what's in net/src/api.rs +#[repr(C)] +#[derive(Debug)] +enum NetError { + // Ok = 0, + Unaddressable = 1, + SocketInUse = 2, + // AccessDenied = 3, + Invalid = 4, + // Finished = 5, + LibraryError = 6, + // AlreadyUsed = 7, + TimedOut = 8, + WouldBlock = 9, +} + +#[repr(C, align(4096))] +struct ConnectRequest { + raw: [u8; 4096], +} + +#[repr(C, align(4096))] +struct SendData { + raw: [u8; 4096], +} + +#[repr(C, align(4096))] +pub struct ReceiveData { + raw: [u8; 4096], +} + +#[repr(C, align(4096))] +pub struct GetAddress { + raw: [u8; 4096], +} + +pub use dns::LookupHost; + +#[allow(nonstandard_style)] +pub mod netc { + pub const AF_INET: u8 = 0; + pub const AF_INET6: u8 = 1; + pub type sa_family_t = u8; + + #[derive(Copy, Clone)] + pub struct in_addr { + pub s_addr: u32, + } + + #[derive(Copy, Clone)] + pub struct sockaddr_in { + pub sin_family: sa_family_t, + pub sin_port: u16, + pub sin_addr: in_addr, + } + + #[derive(Copy, Clone)] + pub struct in6_addr { + pub s6_addr: [u8; 16], + } + + #[derive(Copy, Clone)] + pub struct sockaddr_in6 { + pub sin6_family: sa_family_t, + pub sin6_port: u16, + pub sin6_addr: in6_addr, + pub sin6_flowinfo: u32, + pub sin6_scope_id: u32, + } + + #[derive(Copy, Clone)] + pub struct sockaddr {} +} diff --git a/library/std/src/sys/pal/xous/net/tcplistener.rs b/library/std/src/sys/pal/xous/net/tcplistener.rs new file mode 100644 index 00000000000..47305013083 --- /dev/null +++ b/library/std/src/sys/pal/xous/net/tcplistener.rs @@ -0,0 +1,247 @@ +use super::*; +use crate::fmt; +use crate::io; +use crate::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use crate::os::xous::services; +use crate::sync::Arc; +use core::convert::TryInto; +use core::sync::atomic::{AtomicBool, AtomicU16, AtomicUsize, Ordering}; + +macro_rules! unimpl { + () => { + return Err(io::const_io_error!( + io::ErrorKind::Unsupported, + &"This function is not yet implemented", + )); + }; +} + +#[derive(Clone)] +pub struct TcpListener { + fd: Arc<AtomicU16>, + local: SocketAddr, + handle_count: Arc<AtomicUsize>, + nonblocking: Arc<AtomicBool>, +} + +impl TcpListener { + pub fn bind(socketaddr: io::Result<&SocketAddr>) -> io::Result<TcpListener> { + let mut addr = *socketaddr?; + + let fd = TcpListener::bind_inner(&mut addr)?; + return Ok(TcpListener { + fd: Arc::new(AtomicU16::new(fd)), + local: addr, + handle_count: Arc::new(AtomicUsize::new(1)), + nonblocking: Arc::new(AtomicBool::new(false)), + }); + } + + /// This returns the raw fd of a Listener, so that it can also be used by the + /// accept routine to replenish the Listener object after its handle has been converted into + /// a TcpStream object. + fn bind_inner(addr: &mut SocketAddr) -> io::Result<u16> { + // Construct the request + let mut connect_request = ConnectRequest { raw: [0u8; 4096] }; + + // Serialize the StdUdpBind structure. This is done "manually" because we don't want to + // make an auto-serdes (like bincode or rkyv) crate a dependency of Xous. + let port_bytes = addr.port().to_le_bytes(); + connect_request.raw[0] = port_bytes[0]; + connect_request.raw[1] = port_bytes[1]; + match addr.ip() { + IpAddr::V4(addr) => { + connect_request.raw[2] = 4; + for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + IpAddr::V6(addr) => { + connect_request.raw[2] = 6; + for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + } + + let Ok((_, valid)) = crate::os::xous::ffi::lend_mut( + services::net_server(), + services::NetLendMut::StdTcpListen.into(), + &mut connect_request.raw, + 0, + 4096, + ) else { + return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Invalid response")); + }; + + // The first four bytes should be zero upon success, and will be nonzero + // for an error. + let response = connect_request.raw; + if response[0] != 0 || valid == 0 { + let errcode = response[1]; + if errcode == NetError::SocketInUse as u8 { + return Err(io::const_io_error!(io::ErrorKind::ResourceBusy, &"Socket in use")); + } else if errcode == NetError::Invalid as u8 { + return Err(io::const_io_error!( + io::ErrorKind::AddrNotAvailable, + &"Invalid address" + )); + } else if errcode == NetError::LibraryError as u8 { + return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error")); + } else { + return Err(io::const_io_error!( + io::ErrorKind::Other, + &"Unable to connect or internal error" + )); + } + } + let fd = response[1] as usize; + if addr.port() == 0 { + // oddly enough, this is a valid port and it means "give me something valid, up to you what that is" + let assigned_port = u16::from_le_bytes(response[2..4].try_into().unwrap()); + addr.set_port(assigned_port); + } + // println!("TcpListening with file handle of {}\r\n", fd); + Ok(fd.try_into().unwrap()) + } + + pub fn socket_addr(&self) -> io::Result<SocketAddr> { + Ok(self.local) + } + + pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + let mut receive_request = ReceiveData { raw: [0u8; 4096] }; + + if self.nonblocking.load(Ordering::Relaxed) { + // nonblocking + receive_request.raw[0] = 0; + } else { + // blocking + receive_request.raw[0] = 1; + } + + if let Ok((_offset, _valid)) = crate::os::xous::ffi::lend_mut( + services::net_server(), + services::NetLendMut::StdTcpAccept(self.fd.load(Ordering::Relaxed)).into(), + &mut receive_request.raw, + 0, + 0, + ) { + if receive_request.raw[0] != 0 { + // error case + if receive_request.raw[1] == NetError::TimedOut as u8 { + return Err(io::const_io_error!(io::ErrorKind::TimedOut, &"accept timed out",)); + } else if receive_request.raw[1] == NetError::WouldBlock as u8 { + return Err(io::const_io_error!( + io::ErrorKind::WouldBlock, + &"accept would block", + )); + } else if receive_request.raw[1] == NetError::LibraryError as u8 { + return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error")); + } else { + return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",)); + } + } else { + // accept successful + let rr = &receive_request.raw; + let stream_fd = u16::from_le_bytes(rr[1..3].try_into().unwrap()); + let port = u16::from_le_bytes(rr[20..22].try_into().unwrap()); + let addr = if rr[3] == 4 { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(rr[4], rr[5], rr[6], rr[7])), port) + } else if rr[3] == 6 { + SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + u16::from_be_bytes(rr[4..6].try_into().unwrap()), + u16::from_be_bytes(rr[6..8].try_into().unwrap()), + u16::from_be_bytes(rr[8..10].try_into().unwrap()), + u16::from_be_bytes(rr[10..12].try_into().unwrap()), + u16::from_be_bytes(rr[12..14].try_into().unwrap()), + u16::from_be_bytes(rr[14..16].try_into().unwrap()), + u16::from_be_bytes(rr[16..18].try_into().unwrap()), + u16::from_be_bytes(rr[18..20].try_into().unwrap()), + )), + port, + ) + } else { + return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",)); + }; + + // replenish the listener + let mut local_copy = self.local.clone(); // port is non-0 by this time, but the method signature needs a mut + let new_fd = TcpListener::bind_inner(&mut local_copy)?; + self.fd.store(new_fd, Ordering::Relaxed); + + // now return a stream converted from the old stream's fd + Ok((TcpStream::from_listener(stream_fd, self.local.port(), port, addr), addr)) + } + } else { + Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unable to accept")) + } + } + + pub fn duplicate(&self) -> io::Result<TcpListener> { + self.handle_count.fetch_add(1, Ordering::Relaxed); + Ok(self.clone()) + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + if ttl > 255 { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "TTL must be less than 256")); + } + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdSetTtlTcp(self.fd.load(Ordering::Relaxed), ttl).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|_| ()) + } + + pub fn ttl(&self) -> io::Result<u32> { + Ok(crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdGetTtlTcp(self.fd.load(Ordering::Relaxed)).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|res| res[0] as _)?) + } + + pub fn set_only_v6(&self, _: bool) -> io::Result<()> { + unimpl!(); + } + + pub fn only_v6(&self) -> io::Result<bool> { + unimpl!(); + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + // this call doesn't have a meaning on our platform, but we can at least not panic if it's used. + Ok(None) + } + + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.nonblocking.store(nonblocking, Ordering::Relaxed); + Ok(()) + } +} + +impl fmt::Debug for TcpListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TCP listening on {:?}", self.local) + } +} + +impl Drop for TcpListener { + fn drop(&mut self) { + if self.handle_count.fetch_sub(1, Ordering::Relaxed) == 1 { + // only drop if we're the last clone + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + crate::os::xous::services::NetBlockingScalar::StdTcpClose( + self.fd.load(Ordering::Relaxed), + ) + .into(), + ) + .unwrap(); + } + } +} diff --git a/library/std/src/sys/pal/xous/net/tcpstream.rs b/library/std/src/sys/pal/xous/net/tcpstream.rs new file mode 100644 index 00000000000..7149678118a --- /dev/null +++ b/library/std/src/sys/pal/xous/net/tcpstream.rs @@ -0,0 +1,435 @@ +use super::*; +use crate::fmt; +use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut}; +use crate::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, SocketAddrV4, SocketAddrV6}; +use crate::os::xous::services; +use crate::sync::Arc; +use crate::time::Duration; +use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; + +macro_rules! unimpl { + () => { + return Err(io::const_io_error!( + io::ErrorKind::Unsupported, + &"This function is not yet implemented", + )); + }; +} + +enum ReadOrPeek { + Read, + Peek, +} + +#[derive(Clone)] +pub struct TcpStream { + fd: u16, + local_port: u16, + remote_port: u16, + peer_addr: SocketAddr, + // milliseconds + read_timeout: Arc<AtomicU32>, + // milliseconds + write_timeout: Arc<AtomicU32>, + handle_count: Arc<AtomicUsize>, + nonblocking: Arc<AtomicBool>, +} + +fn sockaddr_to_buf(duration: Duration, addr: &SocketAddr, buf: &mut [u8]) { + // Construct the request. + let port_bytes = addr.port().to_le_bytes(); + buf[0] = port_bytes[0]; + buf[1] = port_bytes[1]; + for (dest, src) in buf[2..].iter_mut().zip((duration.as_millis() as u64).to_le_bytes()) { + *dest = src; + } + match addr.ip() { + IpAddr::V4(addr) => { + buf[10] = 4; + for (dest, src) in buf[11..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + IpAddr::V6(addr) => { + buf[10] = 6; + for (dest, src) in buf[11..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + } +} + +impl TcpStream { + pub(crate) fn from_listener( + fd: u16, + local_port: u16, + remote_port: u16, + peer_addr: SocketAddr, + ) -> TcpStream { + TcpStream { + fd, + local_port, + remote_port, + peer_addr, + read_timeout: Arc::new(AtomicU32::new(0)), + write_timeout: Arc::new(AtomicU32::new(0)), + handle_count: Arc::new(AtomicUsize::new(1)), + nonblocking: Arc::new(AtomicBool::new(false)), + } + } + + pub fn connect(socketaddr: io::Result<&SocketAddr>) -> io::Result<TcpStream> { + Self::connect_timeout(socketaddr?, Duration::ZERO) + } + + pub fn connect_timeout(addr: &SocketAddr, duration: Duration) -> io::Result<TcpStream> { + let mut connect_request = ConnectRequest { raw: [0u8; 4096] }; + + // Construct the request. + sockaddr_to_buf(duration, &addr, &mut connect_request.raw); + + let Ok((_, valid)) = crate::os::xous::ffi::lend_mut( + services::net_server(), + services::NetLendMut::StdTcpConnect.into(), + &mut connect_request.raw, + 0, + 4096, + ) else { + return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Invalid response")); + }; + + // The first four bytes should be zero upon success, and will be nonzero + // for an error. + let response = connect_request.raw; + if response[0] != 0 || valid == 0 { + // errcode is a u8 but stuck in a u16 where the upper byte is invalid. Mask & decode accordingly. + let errcode = response[0]; + if errcode == NetError::SocketInUse as u8 { + return Err(io::const_io_error!(io::ErrorKind::ResourceBusy, &"Socket in use",)); + } else if errcode == NetError::Unaddressable as u8 { + return Err(io::const_io_error!( + io::ErrorKind::AddrNotAvailable, + &"Invalid address", + )); + } else { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Unable to connect or internal error", + )); + } + } + let fd = u16::from_le_bytes([response[2], response[3]]); + let local_port = u16::from_le_bytes([response[4], response[5]]); + let remote_port = u16::from_le_bytes([response[6], response[7]]); + // println!( + // "Connected with local port of {}, remote port of {}, file handle of {}", + // local_port, remote_port, fd + // ); + Ok(TcpStream { + fd, + local_port, + remote_port, + peer_addr: *addr, + read_timeout: Arc::new(AtomicU32::new(0)), + write_timeout: Arc::new(AtomicU32::new(0)), + handle_count: Arc::new(AtomicUsize::new(1)), + nonblocking: Arc::new(AtomicBool::new(false)), + }) + } + + pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> { + if let Some(to) = timeout { + if to.is_zero() { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Zero is an invalid timeout", + )); + } + } + self.read_timeout.store( + timeout.map(|t| t.as_millis().min(u32::MAX as u128) as u32).unwrap_or_default(), + Ordering::Relaxed, + ); + Ok(()) + } + + pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> { + if let Some(to) = timeout { + if to.is_zero() { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Zero is an invalid timeout", + )); + } + } + self.write_timeout.store( + timeout.map(|t| t.as_millis().min(u32::MAX as u128) as u32).unwrap_or_default(), + Ordering::Relaxed, + ); + Ok(()) + } + + pub fn read_timeout(&self) -> io::Result<Option<Duration>> { + match self.read_timeout.load(Ordering::Relaxed) { + 0 => Ok(None), + t => Ok(Some(Duration::from_millis(t as u64))), + } + } + + pub fn write_timeout(&self) -> io::Result<Option<Duration>> { + match self.write_timeout.load(Ordering::Relaxed) { + 0 => Ok(None), + t => Ok(Some(Duration::from_millis(t as u64))), + } + } + + fn read_or_peek(&self, buf: &mut [u8], op: ReadOrPeek) -> io::Result<usize> { + let mut receive_request = ReceiveData { raw: [0u8; 4096] }; + let data_to_read = buf.len().min(receive_request.raw.len()); + + let opcode = match op { + ReadOrPeek::Read => { + services::NetLendMut::StdTcpRx(self.fd, self.nonblocking.load(Ordering::Relaxed)) + } + ReadOrPeek::Peek => { + services::NetLendMut::StdTcpPeek(self.fd, self.nonblocking.load(Ordering::Relaxed)) + } + }; + + let Ok((offset, length)) = crate::os::xous::ffi::lend_mut( + services::net_server(), + opcode.into(), + &mut receive_request.raw, + // Reuse the `offset` as the read timeout + self.read_timeout.load(Ordering::Relaxed) as usize, + data_to_read, + ) else { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Library failure: wrong message type or messaging error" + )); + }; + + if offset != 0 { + for (dest, src) in buf.iter_mut().zip(receive_request.raw[..length].iter()) { + *dest = *src; + } + Ok(length) + } else { + let result = receive_request.raw; + if result[0] != 0 { + if result[1] == 8 { + // timed out + return Err(io::const_io_error!(io::ErrorKind::TimedOut, &"Timeout",)); + } + if result[1] == 9 { + // would block + return Err(io::const_io_error!(io::ErrorKind::WouldBlock, &"Would block",)); + } + } + Err(io::const_io_error!(io::ErrorKind::Other, &"recv_slice failure")) + } + } + + pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { + self.read_or_peek(buf, ReadOrPeek::Peek) + } + + pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.read_or_peek(buf, ReadOrPeek::Read) + } + + pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + crate::io::default_read_vectored(|b| self.read(b), bufs) + } + + pub fn read_buf(&self, cursor: BorrowedCursor<'_>) -> io::Result<()> { + crate::io::default_read_buf(|buf| self.read(buf), cursor) + } + + pub fn is_read_vectored(&self) -> bool { + false + } + + pub fn write(&self, buf: &[u8]) -> io::Result<usize> { + let mut send_request = SendData { raw: [0u8; 4096] }; + for (dest, src) in send_request.raw.iter_mut().zip(buf) { + *dest = *src; + } + let buf_len = send_request.raw.len().min(buf.len()); + + let (_offset, _valid) = crate::os::xous::ffi::lend_mut( + services::net_server(), + services::NetLendMut::StdTcpTx(self.fd).into(), + &mut send_request.raw, + // Reuse the offset as the timeout + self.write_timeout.load(Ordering::Relaxed) as usize, + buf_len, + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Internal error")))?; + + if send_request.raw[0] != 0 { + if send_request.raw[4] == 8 { + // timed out + return Err(io::const_io_error!( + io::ErrorKind::BrokenPipe, + &"Timeout or connection closed", + )); + } else if send_request.raw[4] == 9 { + // would block + return Err(io::const_io_error!(io::ErrorKind::WouldBlock, &"Would block",)); + } else { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Error when sending", + )); + } + } + Ok(u32::from_le_bytes([ + send_request.raw[4], + send_request.raw[5], + send_request.raw[6], + send_request.raw[7], + ]) as usize) + } + + pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + crate::io::default_write_vectored(|b| self.write(b), bufs) + } + + pub fn is_write_vectored(&self) -> bool { + false + } + + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + Ok(self.peer_addr) + } + + pub fn socket_addr(&self) -> io::Result<SocketAddr> { + let mut get_addr = GetAddress { raw: [0u8; 4096] }; + + let Ok((_offset, _valid)) = crate::os::xous::ffi::lend_mut( + services::net_server(), + services::NetLendMut::StdGetAddress(self.fd).into(), + &mut get_addr.raw, + 0, + 0, + ) else { + return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Internal error")); + }; + let mut i = get_addr.raw.iter(); + match *i.next().unwrap() { + 4 => Ok(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new( + *i.next().unwrap(), + *i.next().unwrap(), + *i.next().unwrap(), + *i.next().unwrap(), + ), + self.local_port, + ))), + 6 => { + let mut new_addr = [0u8; 16]; + for (src, octet) in i.zip(new_addr.iter_mut()) { + *octet = *src; + } + Ok(SocketAddr::V6(SocketAddrV6::new(new_addr.into(), self.local_port, 0, 0))) + } + _ => Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Internal error")), + } + } + + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdTcpStreamShutdown(self.fd, how).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|_| ()) + } + + pub fn duplicate(&self) -> io::Result<TcpStream> { + self.handle_count.fetch_add(1, Ordering::Relaxed); + Ok(self.clone()) + } + + pub fn set_linger(&self, _: Option<Duration>) -> io::Result<()> { + unimpl!(); + } + + pub fn linger(&self) -> io::Result<Option<Duration>> { + unimpl!(); + } + + pub fn set_nodelay(&self, enabled: bool) -> io::Result<()> { + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdSetNodelay(self.fd, enabled).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|_| ()) + } + + pub fn nodelay(&self) -> io::Result<bool> { + Ok(crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdGetNodelay(self.fd).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|res| res[0] != 0)?) + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + if ttl > 255 { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "TTL must be less than 256")); + } + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdSetTtlTcp(self.fd, ttl).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|_| ()) + } + + pub fn ttl(&self) -> io::Result<u32> { + Ok(crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdGetTtlTcp(self.fd).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|res| res[0] as _)?) + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + // this call doesn't have a meaning on our platform, but we can at least not panic if it's used. + Ok(None) + } + + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.nonblocking.store(nonblocking, Ordering::SeqCst); + Ok(()) + } +} + +impl fmt::Debug for TcpStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "TCP connection to {:?} port {} to local port {}", + self.peer_addr, self.remote_port, self.local_port + ) + } +} + +impl Drop for TcpStream { + fn drop(&mut self) { + if self.handle_count.fetch_sub(1, Ordering::Relaxed) == 1 { + // only drop if we're the last clone + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdTcpClose(self.fd).into(), + ) + .unwrap(); + } + } +} diff --git a/library/std/src/sys/pal/xous/net/udp.rs b/library/std/src/sys/pal/xous/net/udp.rs new file mode 100644 index 00000000000..cafa5b3bde8 --- /dev/null +++ b/library/std/src/sys/pal/xous/net/udp.rs @@ -0,0 +1,471 @@ +use super::*; +use crate::cell::Cell; +use crate::fmt; +use crate::io; +use crate::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use crate::os::xous::services; +use crate::sync::Arc; +use crate::time::Duration; +use core::convert::TryInto; +use core::sync::atomic::{AtomicUsize, Ordering}; + +macro_rules! unimpl { + () => { + return Err(io::const_io_error!( + io::ErrorKind::Unsupported, + &"This function is not yet implemented", + )); + }; +} + +#[derive(Clone)] +pub struct UdpSocket { + fd: u16, + local: SocketAddr, + remote: Cell<Option<SocketAddr>>, + // in milliseconds. The setting applies only to `recv` calls after the timeout is set. + read_timeout: Cell<u64>, + // in milliseconds. The setting applies only to `send` calls after the timeout is set. + write_timeout: Cell<u64>, + handle_count: Arc<AtomicUsize>, + nonblocking: Cell<bool>, +} + +impl UdpSocket { + pub fn bind(socketaddr: io::Result<&SocketAddr>) -> io::Result<UdpSocket> { + let addr = socketaddr?; + // Construct the request + let mut connect_request = ConnectRequest { raw: [0u8; 4096] }; + + // Serialize the StdUdpBind structure. This is done "manually" because we don't want to + // make an auto-serdes (like bincode or rkyv) crate a dependency of Xous. + let port_bytes = addr.port().to_le_bytes(); + connect_request.raw[0] = port_bytes[0]; + connect_request.raw[1] = port_bytes[1]; + match addr.ip() { + IpAddr::V4(addr) => { + connect_request.raw[2] = 4; + for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + IpAddr::V6(addr) => { + connect_request.raw[2] = 6; + for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + } + + let response = crate::os::xous::ffi::lend_mut( + services::net_server(), + services::NetLendMut::StdUdpBind.into(), + &mut connect_request.raw, + 0, + 4096, + ); + + if let Ok((_, valid)) = response { + // The first four bytes should be zero upon success, and will be nonzero + // for an error. + let response = connect_request.raw; + if response[0] != 0 || valid == 0 { + let errcode = response[1]; + if errcode == NetError::SocketInUse as u8 { + return Err(io::const_io_error!(io::ErrorKind::ResourceBusy, &"Socket in use")); + } else if errcode == NetError::Invalid as u8 { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Port can't be 0 or invalid address" + )); + } else if errcode == NetError::LibraryError as u8 { + return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error")); + } else { + return Err(io::const_io_error!( + io::ErrorKind::Other, + &"Unable to connect or internal error" + )); + } + } + let fd = response[1] as u16; + return Ok(UdpSocket { + fd, + local: *addr, + remote: Cell::new(None), + read_timeout: Cell::new(0), + write_timeout: Cell::new(0), + handle_count: Arc::new(AtomicUsize::new(1)), + nonblocking: Cell::new(false), + }); + } + Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Invalid response")) + } + + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + match self.remote.get() { + Some(dest) => Ok(dest), + None => Err(io::const_io_error!(io::ErrorKind::NotConnected, &"No peer specified")), + } + } + + pub fn socket_addr(&self) -> io::Result<SocketAddr> { + Ok(self.local) + } + + fn recv_inner(&self, buf: &mut [u8], do_peek: bool) -> io::Result<(usize, SocketAddr)> { + let mut receive_request = ReceiveData { raw: [0u8; 4096] }; + + if self.nonblocking.get() { + // nonblocking + receive_request.raw[0] = 0; + } else { + // blocking + receive_request.raw[0] = 1; + for (&s, d) in self + .read_timeout + .get() + .to_le_bytes() + .iter() + .zip(receive_request.raw[1..9].iter_mut()) + { + *d = s; + } + } + if let Ok((_offset, _valid)) = crate::os::xous::ffi::lend_mut( + services::net_server(), + services::NetLendMut::StdUdpRx(self.fd).into(), + &mut receive_request.raw, + if do_peek { 1 } else { 0 }, + 0, + ) { + if receive_request.raw[0] != 0 { + // error case + if receive_request.raw[1] == NetError::TimedOut as u8 { + return Err(io::const_io_error!(io::ErrorKind::TimedOut, &"recv timed out",)); + } else if receive_request.raw[1] == NetError::WouldBlock as u8 { + return Err(io::const_io_error!( + io::ErrorKind::WouldBlock, + &"recv would block", + )); + } else if receive_request.raw[1] == NetError::LibraryError as u8 { + return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error")); + } else { + return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",)); + } + } else { + let rr = &receive_request.raw; + let rxlen = u16::from_le_bytes(rr[1..3].try_into().unwrap()); + let port = u16::from_le_bytes(rr[20..22].try_into().unwrap()); + let addr = if rr[3] == 4 { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(rr[4], rr[5], rr[6], rr[7])), port) + } else if rr[3] == 6 { + SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + u16::from_be_bytes(rr[4..6].try_into().unwrap()), + u16::from_be_bytes(rr[6..8].try_into().unwrap()), + u16::from_be_bytes(rr[8..10].try_into().unwrap()), + u16::from_be_bytes(rr[10..12].try_into().unwrap()), + u16::from_be_bytes(rr[12..14].try_into().unwrap()), + u16::from_be_bytes(rr[14..16].try_into().unwrap()), + u16::from_be_bytes(rr[16..18].try_into().unwrap()), + u16::from_be_bytes(rr[18..20].try_into().unwrap()), + )), + port, + ) + } else { + return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",)); + }; + for (&s, d) in rr[22..22 + rxlen as usize].iter().zip(buf.iter_mut()) { + *d = s; + } + Ok((rxlen as usize, addr)) + } + } else { + Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unable to recv")) + } + } + + pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.recv_inner(buf, false) + } + + pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.recv_from(buf).map(|(len, _addr)| len) + } + + pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.recv_inner(buf, true) + } + + pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { + self.peek_from(buf).map(|(len, _addr)| len) + } + + pub fn connect(&self, maybe_addr: io::Result<&SocketAddr>) -> io::Result<()> { + let addr = maybe_addr?; + self.remote.set(Some(*addr)); + Ok(()) + } + + pub fn send(&self, buf: &[u8]) -> io::Result<usize> { + if let Some(addr) = self.remote.get() { + self.send_to(buf, &addr) + } else { + Err(io::const_io_error!(io::ErrorKind::NotConnected, &"No remote specified")) + } + } + + pub fn send_to(&self, buf: &[u8], addr: &SocketAddr) -> io::Result<usize> { + let mut tx_req = SendData { raw: [0u8; 4096] }; + + // Construct the request. + let port_bytes = addr.port().to_le_bytes(); + tx_req.raw[0] = port_bytes[0]; + tx_req.raw[1] = port_bytes[1]; + match addr.ip() { + IpAddr::V4(addr) => { + tx_req.raw[2] = 4; + for (dest, src) in tx_req.raw[3..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + IpAddr::V6(addr) => { + tx_req.raw[2] = 6; + for (dest, src) in tx_req.raw[3..].iter_mut().zip(addr.octets()) { + *dest = src; + } + } + } + let len = buf.len() as u16; + let len_bytes = len.to_le_bytes(); + tx_req.raw[19] = len_bytes[0]; + tx_req.raw[20] = len_bytes[1]; + for (&s, d) in buf.iter().zip(tx_req.raw[21..].iter_mut()) { + *d = s; + } + + // let buf = unsafe { + // xous::MemoryRange::new( + // &mut tx_req as *mut SendData as usize, + // core::mem::size_of::<SendData>(), + // ) + // .unwrap() + // }; + + // write time-outs are implemented on the caller side. Basically, if the Net crate server + // is too busy to take the call immediately: retry, until the timeout is reached. + let now = crate::time::Instant::now(); + let write_timeout = if self.nonblocking.get() { + // nonblocking + core::time::Duration::ZERO + } else { + // blocking + if self.write_timeout.get() == 0 { + // forever + core::time::Duration::from_millis(u64::MAX) + } else { + // or this amount of time + core::time::Duration::from_millis(self.write_timeout.get()) + } + }; + loop { + let response = crate::os::xous::ffi::try_lend_mut( + services::net_server(), + services::NetLendMut::StdUdpTx(self.fd).into(), + &mut tx_req.raw, + 0, + 4096, + ); + match response { + Ok((_, valid)) => { + let response = &tx_req.raw; + if response[0] != 0 || valid == 0 { + let errcode = response[1]; + if errcode == NetError::SocketInUse as u8 { + return Err(io::const_io_error!( + io::ErrorKind::ResourceBusy, + &"Socket in use" + )); + } else if errcode == NetError::Invalid as u8 { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Socket not valid" + )); + } else if errcode == NetError::LibraryError as u8 { + return Err(io::const_io_error!( + io::ErrorKind::Other, + &"Library error" + )); + } else { + return Err(io::const_io_error!( + io::ErrorKind::Other, + &"Unable to connect" + )); + } + } else { + // no error + return Ok(len as usize); + } + } + Err(crate::os::xous::ffi::Error::ServerQueueFull) => { + if now.elapsed() >= write_timeout { + return Err(io::const_io_error!( + io::ErrorKind::WouldBlock, + &"Write timed out" + )); + } else { + // question: do we want to do something a bit more gentle than immediately retrying? + crate::thread::yield_now(); + } + } + _ => return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error")), + } + } + } + + pub fn duplicate(&self) -> io::Result<UdpSocket> { + self.handle_count.fetch_add(1, Ordering::Relaxed); + Ok(self.clone()) + } + + pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> { + if let Some(d) = timeout { + if d.is_zero() { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Zero duration is invalid" + )); + } + } + self.read_timeout + .set(timeout.map(|t| t.as_millis().min(u64::MAX as u128) as u64).unwrap_or_default()); + Ok(()) + } + + pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> { + if let Some(d) = timeout { + if d.is_zero() { + return Err(io::const_io_error!( + io::ErrorKind::InvalidInput, + &"Zero duration is invalid" + )); + } + } + self.write_timeout + .set(timeout.map(|t| t.as_millis().min(u64::MAX as u128) as u64).unwrap_or_default()); + Ok(()) + } + + pub fn read_timeout(&self) -> io::Result<Option<Duration>> { + match self.read_timeout.get() { + 0 => Ok(None), + t => Ok(Some(Duration::from_millis(t as u64))), + } + } + + pub fn write_timeout(&self) -> io::Result<Option<Duration>> { + match self.write_timeout.get() { + 0 => Ok(None), + t => Ok(Some(Duration::from_millis(t as u64))), + } + } + + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + if ttl > 255 { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "TTL must be less than 256")); + } + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdSetTtlUdp(self.fd, ttl).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|_| ()) + } + + pub fn ttl(&self) -> io::Result<u32> { + Ok(crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdGetTtlUdp(self.fd).into(), + ) + .or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value"))) + .map(|res| res[0] as _)?) + } + + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + // this call doesn't have a meaning on our platform, but we can at least not panic if it's used. + Ok(None) + } + + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.nonblocking.set(nonblocking); + Ok(()) + } + + // ------------- smoltcp base stack does not have multicast or broadcast support --------------- + pub fn set_broadcast(&self, _: bool) -> io::Result<()> { + unimpl!(); + } + + pub fn broadcast(&self) -> io::Result<bool> { + unimpl!(); + } + + pub fn set_multicast_loop_v4(&self, _: bool) -> io::Result<()> { + unimpl!(); + } + + pub fn multicast_loop_v4(&self) -> io::Result<bool> { + unimpl!(); + } + + pub fn set_multicast_ttl_v4(&self, _: u32) -> io::Result<()> { + unimpl!(); + } + + pub fn multicast_ttl_v4(&self) -> io::Result<u32> { + unimpl!(); + } + + pub fn set_multicast_loop_v6(&self, _: bool) -> io::Result<()> { + unimpl!(); + } + + pub fn multicast_loop_v6(&self) -> io::Result<bool> { + unimpl!(); + } + + pub fn join_multicast_v4(&self, _: &Ipv4Addr, _: &Ipv4Addr) -> io::Result<()> { + unimpl!(); + } + + pub fn join_multicast_v6(&self, _: &Ipv6Addr, _: u32) -> io::Result<()> { + unimpl!(); + } + + pub fn leave_multicast_v4(&self, _: &Ipv4Addr, _: &Ipv4Addr) -> io::Result<()> { + unimpl!(); + } + + pub fn leave_multicast_v6(&self, _: &Ipv6Addr, _: u32) -> io::Result<()> { + unimpl!(); + } +} + +impl fmt::Debug for UdpSocket { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "UDP listening on {:?} to {:?}", self.local, self.remote.get(),) + } +} + +impl Drop for UdpSocket { + fn drop(&mut self) { + if self.handle_count.fetch_sub(1, Ordering::Relaxed) == 1 { + // only drop if we're the last clone + crate::os::xous::ffi::blocking_scalar( + services::net_server(), + services::NetBlockingScalar::StdUdpClose(self.fd).into(), + ) + .unwrap(); + } + } +} diff --git a/library/std/src/sys/pal/xous/stdio.rs b/library/std/src/sys/pal/xous/stdio.rs index 2ac694641ba..11608964b52 100644 --- a/library/std/src/sys/pal/xous/stdio.rs +++ b/library/std/src/sys/pal/xous/stdio.rs @@ -5,7 +5,7 @@ pub struct Stdout {} pub struct Stderr; use crate::os::xous::ffi::{lend, try_lend, try_scalar, Connection}; -use crate::os::xous::services::{log_server, try_connect, LogScalar}; +use crate::os::xous::services::{log_server, try_connect, LogLend, LogScalar}; impl Stdin { pub const fn new() -> Stdin { @@ -27,7 +27,7 @@ impl Stdout { impl io::Write for Stdout { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - #[repr(align(4096))] + #[repr(C, align(4096))] struct LendBuffer([u8; 4096]); let mut lend_buffer = LendBuffer([0u8; 4096]); let connection = log_server(); @@ -35,7 +35,8 @@ impl io::Write for Stdout { for (dest, src) in lend_buffer.0.iter_mut().zip(chunk) { *dest = *src; } - lend(connection, 1, &lend_buffer.0, 0, chunk.len()).unwrap(); + lend(connection, LogLend::StandardOutput.into(), &lend_buffer.0, 0, chunk.len()) + .unwrap(); } Ok(buf.len()) } @@ -53,7 +54,7 @@ impl Stderr { impl io::Write for Stderr { fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - #[repr(align(4096))] + #[repr(C, align(4096))] struct LendBuffer([u8; 4096]); let mut lend_buffer = LendBuffer([0u8; 4096]); let connection = log_server(); @@ -61,7 +62,8 @@ impl io::Write for Stderr { for (dest, src) in lend_buffer.0.iter_mut().zip(chunk) { *dest = *src; } - lend(connection, 1, &lend_buffer.0, 0, chunk.len()).unwrap(); + lend(connection, LogLend::StandardError.into(), &lend_buffer.0, 0, chunk.len()) + .unwrap(); } Ok(buf.len()) } diff --git a/library/std/src/sys/pal/xous/thread.rs b/library/std/src/sys/pal/xous/thread.rs index 78c68de7bf3..0f452e07a5c 100644 --- a/library/std/src/sys/pal/xous/thread.rs +++ b/library/std/src/sys/pal/xous/thread.rs @@ -68,14 +68,18 @@ impl Thread { ) .map_err(|code| io::Error::from_raw_os_error(code as i32))?; - extern "C" fn thread_start(main: *mut usize, guard_page_pre: usize, stack_size: usize) { + extern "C" fn thread_start( + main: *mut usize, + guard_page_pre: usize, + stack_size: usize, + ) -> ! { unsafe { - // Finally, let's run some code. + // Run the contents of the new thread. Box::from_raw(main as *mut Box<dyn FnOnce()>)(); } // Destroy TLS, which will free the TLS page and call the destructor for - // any thread local storage. + // any thread local storage (if any). unsafe { crate::sys::thread_local_key::destroy_tls(); } diff --git a/library/std/src/sys/pal/xous/thread_local_key.rs b/library/std/src/sys/pal/xous/thread_local_key.rs index 3771ea65700..59a668c3df6 100644 --- a/library/std/src/sys/pal/xous/thread_local_key.rs +++ b/library/std/src/sys/pal/xous/thread_local_key.rs @@ -23,10 +23,25 @@ pub type Dtor = unsafe extern "C" fn(*mut u8); const TLS_MEMORY_SIZE: usize = 4096; -/// TLS keys start at `1` to mimic POSIX. +/// TLS keys start at `1`. Index `0` is unused +#[cfg(not(test))] +#[export_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key13TLS_KEY_INDEXE"] static TLS_KEY_INDEX: AtomicUsize = AtomicUsize::new(1); -fn tls_ptr_addr() -> *mut usize { +#[cfg(not(test))] +#[export_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key9DTORSE"] +static DTORS: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut()); + +#[cfg(test)] +extern "Rust" { + #[link_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key13TLS_KEY_INDEXE"] + static TLS_KEY_INDEX: AtomicUsize; + + #[link_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key9DTORSE"] + static DTORS: AtomicPtr<Node>; +} + +fn tls_ptr_addr() -> *mut *mut u8 { let mut tp: usize; unsafe { asm!( @@ -34,50 +49,50 @@ fn tls_ptr_addr() -> *mut usize { out(reg) tp, ); } - core::ptr::from_exposed_addr_mut::<usize>(tp) + core::ptr::from_exposed_addr_mut::<*mut u8>(tp) } /// Create an area of memory that's unique per thread. This area will /// contain all thread local pointers. -fn tls_ptr() -> *mut usize { - let mut tp = tls_ptr_addr(); +fn tls_table() -> &'static mut [*mut u8] { + let tp = tls_ptr_addr(); + if !tp.is_null() { + return unsafe { + core::slice::from_raw_parts_mut(tp, TLS_MEMORY_SIZE / core::mem::size_of::<*mut u8>()) + }; + } // If the TP register is `0`, then this thread hasn't initialized // its TLS yet. Allocate a new page to store this memory. - if tp.is_null() { - tp = unsafe { - map_memory( - None, - None, - TLS_MEMORY_SIZE / core::mem::size_of::<usize>(), - MemoryFlags::R | MemoryFlags::W, - ) - } + let tp = unsafe { + map_memory( + None, + None, + TLS_MEMORY_SIZE / core::mem::size_of::<*mut u8>(), + MemoryFlags::R | MemoryFlags::W, + ) .expect("Unable to allocate memory for thread local storage") - .as_mut_ptr(); + }; - unsafe { - // Key #0 is currently unused. - (tp).write_volatile(0); + for val in tp.iter() { + assert!(*val as usize == 0); + } - // Set the thread's `$tp` register - asm!( - "mv tp, {}", - in(reg) tp as usize, - ); - } + unsafe { + // Set the thread's `$tp` register + asm!( + "mv tp, {}", + in(reg) tp.as_mut_ptr() as usize, + ); } tp } -/// Allocate a new TLS key. These keys are shared among all threads. -fn tls_alloc() -> usize { - TLS_KEY_INDEX.fetch_add(1, SeqCst) -} - #[inline] pub unsafe fn create(dtor: Option<Dtor>) -> Key { - let key = tls_alloc(); + // Allocate a new TLS key. These keys are shared among all threads. + #[allow(unused_unsafe)] + let key = unsafe { TLS_KEY_INDEX.fetch_add(1, SeqCst) }; if let Some(f) = dtor { unsafe { register_dtor(key, f) }; } @@ -87,18 +102,20 @@ pub unsafe fn create(dtor: Option<Dtor>) -> Key { #[inline] pub unsafe fn set(key: Key, value: *mut u8) { assert!((key < 1022) && (key >= 1)); - unsafe { tls_ptr().add(key).write_volatile(value as usize) }; + let table = tls_table(); + table[key] = value; } #[inline] pub unsafe fn get(key: Key) -> *mut u8 { assert!((key < 1022) && (key >= 1)); - core::ptr::from_exposed_addr_mut::<u8>(unsafe { tls_ptr().add(key).read_volatile() }) + tls_table()[key] } #[inline] pub unsafe fn destroy(_key: Key) { - panic!("can't destroy keys on Xous"); + // Just leak the key. Probably not great on long-running systems that create + // lots of TLS variables, but in practice that's not an issue. } // ------------------------------------------------------------------------- @@ -127,8 +144,6 @@ pub unsafe fn destroy(_key: Key) { // key but also a slot for the destructor queue on windows. An optimization for // another day! -static DTORS: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut()); - struct Node { dtor: Dtor, key: Key, @@ -138,10 +153,12 @@ struct Node { unsafe fn register_dtor(key: Key, dtor: Dtor) { let mut node = ManuallyDrop::new(Box::new(Node { key, dtor, next: ptr::null_mut() })); - let mut head = DTORS.load(SeqCst); + #[allow(unused_unsafe)] + let mut head = unsafe { DTORS.load(SeqCst) }; loop { node.next = head; - match DTORS.compare_exchange(head, &mut **node, SeqCst, SeqCst) { + #[allow(unused_unsafe)] + match unsafe { DTORS.compare_exchange(head, &mut **node, SeqCst, SeqCst) } { Ok(_) => return, // nothing to drop, we successfully added the node to the list Err(cur) => head = cur, } @@ -155,6 +172,7 @@ pub unsafe fn destroy_tls() { if tp.is_null() { return; } + unsafe { run_dtors() }; // Finally, free the TLS array @@ -169,12 +187,19 @@ pub unsafe fn destroy_tls() { unsafe fn run_dtors() { let mut any_run = true; + + // Run the destructor "some" number of times. This is 5x on Windows, + // so we copy it here. This allows TLS variables to create new + // TLS variables upon destruction that will also get destroyed. + // Keep going until we run out of tries or until we have nothing + // left to destroy. for _ in 0..5 { if !any_run { break; } any_run = false; - let mut cur = DTORS.load(SeqCst); + #[allow(unused_unsafe)] + let mut cur = unsafe { DTORS.load(SeqCst) }; while !cur.is_null() { let ptr = unsafe { get((*cur).key) }; diff --git a/library/std/src/sys/pal/xous/thread_parking.rs b/library/std/src/sys/pal/xous/thread_parking.rs index aa39c6d2718..0bd0462d77d 100644 --- a/library/std/src/sys/pal/xous/thread_parking.rs +++ b/library/std/src/sys/pal/xous/thread_parking.rs @@ -29,31 +29,40 @@ impl Parker { // Change NOTIFIED to EMPTY and EMPTY to PARKED. let state = self.state.fetch_sub(1, Acquire); if state == NOTIFIED { + // The state has gone from NOTIFIED (1) to EMPTY (0) return; } + // The state has gone from EMPTY (0) to PARKED (-1) + assert!(state == EMPTY); - // The state was set to PARKED. Wait until the `unpark` wakes us up. + // The state is now PARKED (-1). Wait until the `unpark` wakes us up. blocking_scalar( ticktimer_server(), TicktimerScalar::WaitForCondition(self.index(), 0).into(), ) .expect("failed to send WaitForCondition command"); - self.state.swap(EMPTY, Acquire); + let state = self.state.swap(EMPTY, Acquire); + assert!(state == NOTIFIED || state == PARKED); } pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) { // Change NOTIFIED to EMPTY and EMPTY to PARKED. let state = self.state.fetch_sub(1, Acquire); if state == NOTIFIED { + // The state has gone from NOTIFIED (1) to EMPTY (0) return; } + // The state has gone from EMPTY (0) to PARKED (-1) + assert!(state == EMPTY); // A value of zero indicates an indefinite wait. Clamp the number of // milliseconds to the allowed range. let millis = usize::max(timeout.as_millis().try_into().unwrap_or(usize::MAX), 1); - let was_timeout = blocking_scalar( + // The state is now PARKED (-1). Wait until the `unpark` wakes us up, + // or things time out. + let _was_timeout = blocking_scalar( ticktimer_server(), TicktimerScalar::WaitForCondition(self.index(), millis).into(), ) @@ -61,28 +70,37 @@ impl Parker { != 0; let state = self.state.swap(EMPTY, Acquire); - if was_timeout && state == NOTIFIED { - // The state was set to NOTIFIED after we returned from the wait - // but before we reset the state. Therefore, a wakeup is on its - // way, which we need to consume here. - // NOTICE: this is a priority hole. - blocking_scalar( - ticktimer_server(), - TicktimerScalar::WaitForCondition(self.index(), 0).into(), - ) - .expect("failed to send WaitForCondition command"); - } + assert!(state == PARKED || state == NOTIFIED); } pub fn unpark(self: Pin<&Self>) { - let state = self.state.swap(NOTIFIED, Release); - if state == PARKED { - // The thread is parked, wake it up. - blocking_scalar( - ticktimer_server(), - TicktimerScalar::NotifyCondition(self.index(), 1).into(), - ) - .expect("failed to send NotifyCondition command"); + // If the state is already `NOTIFIED`, then another thread has + // indicated it wants to wake up the target thread. + // + // If the state is `EMPTY` then there is nothing to wake up, and + // the target thread will immediately exit from `park()` the + // next time that function is called. + if self.state.swap(NOTIFIED, Release) != PARKED { + return; + } + + // The thread is parked, wake it up. Keep trying until we wake something up. + // This will happen when the `NotifyCondition` call returns the fact that + // 1 condition was notified. + // Alternately, keep going until the state is seen as `EMPTY`, indicating + // the thread woke up and kept going. This can happen when the Park + // times out before we can send the NotifyCondition message. + while blocking_scalar( + ticktimer_server(), + TicktimerScalar::NotifyCondition(self.index(), 1).into(), + ) + .expect("failed to send NotifyCondition command")[0] + == 0 + && self.state.load(Acquire) != EMPTY + { + // The target thread hasn't yet hit the `WaitForCondition` call. + // Yield to let the target thread run some more. + crate::thread::yield_now(); } } } diff --git a/library/std/src/sys_common/once/mod.rs b/library/std/src/sys_common/once/mod.rs index 359697d8313..ec57568c54c 100644 --- a/library/std/src/sys_common/once/mod.rs +++ b/library/std/src/sys_common/once/mod.rs @@ -25,6 +25,7 @@ cfg_if::cfg_if! { target_family = "unix", all(target_vendor = "fortanix", target_env = "sgx"), target_os = "solid_asp3", + target_os = "xous", ))] { mod queue; pub use queue::{Once, OnceState}; |
