about summary refs log tree commit diff
path: root/src/libstd
diff options
context:
space:
mode:
authorAlex Crichton <alex@alexcrichton.com>2014-11-24 11:16:40 -0800
committerAlex Crichton <alex@alexcrichton.com>2014-12-05 09:12:25 -0800
commitc3adbd34c4e637d20a184eb03f09b30c69de8b6e (patch)
tree7be3d3a9b5bf062fcffc8aa0b9e0de8267ab41c9 /src/libstd
parent71d4e77db8ad4b6d821da7e5d5300134ac95974e (diff)
downloadrust-c3adbd34c4e637d20a184eb03f09b30c69de8b6e.tar.gz
rust-c3adbd34c4e637d20a184eb03f09b30c69de8b6e.zip
Fall out of the std::sync rewrite
Diffstat (limited to 'src/libstd')
-rw-r--r--src/libstd/comm/mod.rs20
-rw-r--r--src/libstd/comm/shared.rs21
-rw-r--r--src/libstd/comm/stream.rs2
-rw-r--r--src/libstd/dynamic_lib.rs4
-rw-r--r--src/libstd/lib.rs2
-rw-r--r--src/libstd/os.rs10
-rw-r--r--src/libstd/rt/backtrace.rs16
-rw-r--r--src/libstd/sync/condvar.rs15
-rw-r--r--src/libstd/sync/mutex.rs2
-rw-r--r--src/libstd/sys/common/helper_thread.rs21
-rw-r--r--src/libstd/sys/common/net.rs12
-rw-r--r--src/libstd/sys/unix/mod.rs4
-rw-r--r--src/libstd/sys/unix/pipe.rs7
-rw-r--r--src/libstd/sys/windows/mod.rs4
-rw-r--r--src/libstd/sys/windows/mutex.rs6
-rw-r--r--src/libstd/sys/windows/pipe.rs7
16 files changed, 88 insertions, 65 deletions
diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs
index 2b66e91c00d..d291ed72567 100644
--- a/src/libstd/comm/mod.rs
+++ b/src/libstd/comm/mod.rs
@@ -354,6 +354,8 @@ mod select;
 mod shared;
 mod stream;
 mod sync;
+mod mpsc_queue;
+mod spsc_queue;
 
 /// The receiving-half of Rust's channel type. This half can only be owned by
 /// one task
@@ -628,24 +630,26 @@ impl<T: Send> Sender<T> {
 #[unstable]
 impl<T: Send> Clone for Sender<T> {
     fn clone(&self) -> Sender<T> {
-        let (packet, sleeper) = match *unsafe { self.inner() } {
+        let (packet, sleeper, guard) = match *unsafe { self.inner() } {
             Oneshot(ref p) => {
                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
                 unsafe {
-                    (*a.get()).postinit_lock();
+                    let guard = (*a.get()).postinit_lock();
                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
-                        oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
-                        oneshot::UpWoke(task) => (a, Some(task))
+                        oneshot::UpSuccess |
+                        oneshot::UpDisconnected => (a, None, guard),
+                        oneshot::UpWoke(task) => (a, Some(task), guard)
                     }
                 }
             }
             Stream(ref p) => {
                 let a = Arc::new(UnsafeCell::new(shared::Packet::new()));
                 unsafe {
-                    (*a.get()).postinit_lock();
+                    let guard = (*a.get()).postinit_lock();
                     match (*p.get()).upgrade(Receiver::new(Shared(a.clone()))) {
-                        stream::UpSuccess | stream::UpDisconnected => (a, None),
-                        stream::UpWoke(task) => (a, Some(task)),
+                        stream::UpSuccess |
+                        stream::UpDisconnected => (a, None, guard),
+                        stream::UpWoke(task) => (a, Some(task), guard),
                     }
                 }
             }
@@ -657,7 +661,7 @@ impl<T: Send> Clone for Sender<T> {
         };
 
         unsafe {
-            (*packet.get()).inherit_blocker(sleeper);
+            (*packet.get()).inherit_blocker(sleeper, guard);
 
             let tmp = Sender::new(Shared(packet.clone()));
             mem::swap(self.inner_mut(), tmp.inner_mut());
diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs
index 6396edbdbd1..13b5e10fcd3 100644
--- a/src/libstd/comm/shared.rs
+++ b/src/libstd/comm/shared.rs
@@ -26,12 +26,11 @@ use alloc::boxed::Box;
 use core::cmp;
 use core::int;
 use rustrt::local::Local;
-use rustrt::mutex::NativeMutex;
 use rustrt::task::{Task, BlockedTask};
 use rustrt::thread::Thread;
 
-use sync::atomic;
-use sync::mpsc_queue as mpsc;
+use sync::{atomic, Mutex, MutexGuard};
+use comm::mpsc_queue as mpsc;
 
 const DISCONNECTED: int = int::MIN;
 const FUDGE: int = 1024;
@@ -56,7 +55,7 @@ pub struct Packet<T> {
 
     // this lock protects various portions of this implementation during
     // select()
-    select_lock: NativeMutex,
+    select_lock: Mutex<()>,
 }
 
 pub enum Failure {
@@ -76,7 +75,7 @@ impl<T: Send> Packet<T> {
             channels: atomic::AtomicInt::new(2),
             port_dropped: atomic::AtomicBool::new(false),
             sender_drain: atomic::AtomicInt::new(0),
-            select_lock: unsafe { NativeMutex::new() },
+            select_lock: Mutex::new(()),
         };
         return p;
     }
@@ -86,8 +85,8 @@ impl<T: Send> Packet<T> {
     // In other case mutex data will be duplicated while cloning
     // and that could cause problems on platforms where it is
     // represented by opaque data structure
-    pub fn postinit_lock(&mut self) {
-        unsafe { self.select_lock.lock_noguard() }
+    pub fn postinit_lock(&self) -> MutexGuard<()> {
+        self.select_lock.lock()
     }
 
     // This function is used at the creation of a shared packet to inherit a
@@ -95,7 +94,9 @@ impl<T: Send> Packet<T> {
     // tasks in select().
     //
     // This can only be called at channel-creation time
-    pub fn inherit_blocker(&mut self, task: Option<BlockedTask>) {
+    pub fn inherit_blocker(&mut self,
+                           task: Option<BlockedTask>,
+                           guard: MutexGuard<()>) {
         match task {
             Some(task) => {
                 assert_eq!(self.cnt.load(atomic::SeqCst), 0);
@@ -135,7 +136,7 @@ impl<T: Send> Packet<T> {
         // interfere with this method. After we unlock this lock, we're
         // signifying that we're done modifying self.cnt and self.to_wake and
         // the port is ready for the world to continue using it.
-        unsafe { self.select_lock.unlock_noguard() }
+        drop(guard);
     }
 
     pub fn send(&mut self, t: T) -> Result<(), T> {
@@ -441,7 +442,7 @@ impl<T: Send> Packet<T> {
         // done with. Without this bounce, we can race with inherit_blocker
         // about looking at and dealing with to_wake. Once we have acquired the
         // lock, we are guaranteed that inherit_blocker is done.
-        unsafe {
+        {
             let _guard = self.select_lock.lock();
         }
 
diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs
index 23d042960b1..06ab4f4427a 100644
--- a/src/libstd/comm/stream.rs
+++ b/src/libstd/comm/stream.rs
@@ -32,7 +32,7 @@ use rustrt::task::{Task, BlockedTask};
 use rustrt::thread::Thread;
 
 use sync::atomic;
-use sync::spsc_queue as spsc;
+use comm::spsc_queue as spsc;
 use comm::Receiver;
 
 const DISCONNECTED: int = int::MIN;
diff --git a/src/libstd/dynamic_lib.rs b/src/libstd/dynamic_lib.rs
index 3cd0c0eeaf2..160365dac36 100644
--- a/src/libstd/dynamic_lib.rs
+++ b/src/libstd/dynamic_lib.rs
@@ -225,8 +225,8 @@ pub mod dl {
     }
 
     pub fn check_for_errors_in<T>(f: || -> T) -> Result<T, String> {
-        use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
-        static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
+        use sync::{StaticMutex, MUTEX_INIT};
+        static LOCK: StaticMutex = MUTEX_INIT;
         unsafe {
             // dlerror isn't thread safe, so we need to lock around this entire
             // sequence
diff --git a/src/libstd/lib.rs b/src/libstd/lib.rs
index f6b73f037f2..d4274d7e401 100644
--- a/src/libstd/lib.rs
+++ b/src/libstd/lib.rs
@@ -106,7 +106,7 @@
 #![allow(unknown_features)]
 #![feature(macro_rules, globs, linkage)]
 #![feature(default_type_params, phase, lang_items, unsafe_destructor)]
-#![feature(import_shadowing, slicing_syntax)]
+#![feature(import_shadowing, slicing_syntax, tuple_indexing)]
 
 // Don't link to std. We are std.
 #![no_std]
diff --git a/src/libstd/os.rs b/src/libstd/os.rs
index 0abd030a163..a8adfec34ed 100644
--- a/src/libstd/os.rs
+++ b/src/libstd/os.rs
@@ -209,14 +209,12 @@ Accessing environment variables is not generally threadsafe.
 Serialize access through a global lock.
 */
 fn with_env_lock<T>(f: || -> T) -> T {
-    use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
+    use sync::{StaticMutex, MUTEX_INIT};
 
-    static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
+    static LOCK: StaticMutex = MUTEX_INIT;
 
-    unsafe {
-        let _guard = LOCK.lock();
-        f()
-    }
+    let _guard = LOCK.lock();
+    f()
 }
 
 /// Returns a vector of (variable, value) pairs, for all the environment
diff --git a/src/libstd/rt/backtrace.rs b/src/libstd/rt/backtrace.rs
index 0103fe670e7..159fc3080e8 100644
--- a/src/libstd/rt/backtrace.rs
+++ b/src/libstd/rt/backtrace.rs
@@ -238,7 +238,7 @@ mod imp {
     use mem;
     use option::{Some, None, Option};
     use result::{Ok, Err};
-    use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
+    use sync::{StaticMutex, MUTEX_INIT};
 
     /// As always - iOS on arm uses SjLj exceptions and
     /// _Unwind_Backtrace is even not available there. Still,
@@ -264,8 +264,8 @@ mod imp {
         // while it doesn't requires lock for work as everything is
         // local, it still displays much nicer backtraces when a
         // couple of tasks panic simultaneously
-        static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
-        let _g = unsafe { LOCK.lock() };
+        static LOCK: StaticMutex = MUTEX_INIT;
+        let _g = LOCK.lock();
 
         try!(writeln!(w, "stack backtrace:"));
         // 100 lines should be enough
@@ -297,8 +297,8 @@ mod imp {
         // is semi-reasonable in terms of printing anyway, and we know that all
         // I/O done here is blocking I/O, not green I/O, so we don't have to
         // worry about this being a native vs green mutex.
-        static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
-        let _g = unsafe { LOCK.lock() };
+        static LOCK: StaticMutex = MUTEX_INIT;
+        let _g = LOCK.lock();
 
         try!(writeln!(w, "stack backtrace:"));
 
@@ -667,7 +667,7 @@ mod imp {
     use option::{Some, None};
     use path::Path;
     use result::{Ok, Err};
-    use rustrt::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
+    use sync::{StaticMutex, MUTEX_INIT};
     use slice::SlicePrelude;
     use str::StrPrelude;
     use dynamic_lib::DynamicLibrary;
@@ -928,8 +928,8 @@ mod imp {
     pub fn write(w: &mut Writer) -> IoResult<()> {
         // According to windows documentation, all dbghelp functions are
         // single-threaded.
-        static LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
-        let _g = unsafe { LOCK.lock() };
+        static LOCK: StaticMutex = MUTEX_INIT;
+        let _g = LOCK.lock();
 
         // Open up dbghelp.dll, we don't link to it explicitly because it can't
         // always be found. Additionally, it's nice having fewer dependencies.
diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs
index 581b6b4e412..0fdd57b2792 100644
--- a/src/libstd/sync/condvar.rs
+++ b/src/libstd/sync/condvar.rs
@@ -143,8 +143,14 @@ impl Condvar {
     ///
     /// Like `wait`, the lock specified will be re-acquired when this function
     /// returns, regardless of whether the timeout elapsed or not.
-    pub fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
-                                         dur: Duration) -> bool {
+    // Note that this method is *not* public, and this is quite intentional
+    // because we're not quite sure about the semantics of relative vs absolute
+    // durations or how the timing guarantees play into what the system APIs
+    // provide. There are also additional concerns about the unix-specific
+    // implementation which may need to be addressed.
+    #[allow(dead_code)]
+    fn wait_timeout<T: AsMutexGuard>(&self, mutex_guard: &T,
+                                     dur: Duration) -> bool {
         unsafe {
             let me: &'static Condvar = &*(self as *const _);
             me.inner.wait_timeout(mutex_guard, dur)
@@ -195,8 +201,9 @@ impl StaticCondvar {
     /// specified duration.
     ///
     /// See `Condvar::wait_timeout`.
-    pub fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
-                                         dur: Duration) -> bool {
+    #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
+    fn wait_timeout<T: AsMutexGuard>(&'static self, mutex_guard: &T,
+                                     dur: Duration) -> bool {
         unsafe {
             let lock = mutex_guard.as_mutex_guard();
             let sys = mutex::guard_lock(lock);
diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs
index 3d17f2bc64b..4e07d54c57e 100644
--- a/src/libstd/sync/mutex.rs
+++ b/src/libstd/sync/mutex.rs
@@ -45,7 +45,7 @@ use sys_common::mutex as sys;
 /// let data = Arc::new(Mutex::new(0));
 ///
 /// let (tx, rx) = channel();
-/// for _ in range(0, 10) {
+/// for _ in range(0u, 10) {
 ///     let (data, tx) = (data.clone(), tx.clone());
 ///     spawn(proc() {
 ///         // The shared static can only be accessed once the lock is held.
diff --git a/src/libstd/sys/common/helper_thread.rs b/src/libstd/sys/common/helper_thread.rs
index 9508d8d9232..c0018c5d970 100644
--- a/src/libstd/sys/common/helper_thread.rs
+++ b/src/libstd/sys/common/helper_thread.rs
@@ -20,13 +20,14 @@
 //! can be created in the future and there must be no active timers at that
 //! time.
 
+use prelude::*;
+
+use cell::UnsafeCell;
 use mem;
 use rustrt::bookkeeping;
-use rustrt::mutex::StaticNativeMutex;
 use rustrt;
-use cell::UnsafeCell;
+use sync::{StaticMutex, StaticCondvar};
 use sys::helper_signal;
-use prelude::*;
 
 use task;
 
@@ -39,7 +40,8 @@ use task;
 /// is for static initialization.
 pub struct Helper<M> {
     /// Internal lock which protects the remaining fields
-    pub lock: StaticNativeMutex,
+    pub lock: StaticMutex,
+    pub cond: StaticCondvar,
 
     // You'll notice that the remaining fields are UnsafeCell<T>, and this is
     // because all helper thread operations are done through &self, but we need
@@ -53,6 +55,9 @@ pub struct Helper<M> {
 
     /// Flag if this helper thread has booted and been initialized yet.
     pub initialized: UnsafeCell<bool>,
+
+    /// Flag if this helper thread has shut down
+    pub shutdown: UnsafeCell<bool>,
 }
 
 impl<M: Send> Helper<M> {
@@ -80,7 +85,9 @@ impl<M: Send> Helper<M> {
                 task::spawn(proc() {
                     bookkeeping::decrement();
                     helper(receive, rx, t);
-                    self.lock.lock().signal()
+                    let _g = self.lock.lock();
+                    *self.shutdown.get() = true;
+                    self.cond.notify_one()
                 });
 
                 rustrt::at_exit(proc() { self.shutdown() });
@@ -119,7 +126,9 @@ impl<M: Send> Helper<M> {
             helper_signal::signal(*self.signal.get() as helper_signal::signal);
 
             // Wait for the child to exit
-            guard.wait();
+            while !*self.shutdown.get() {
+                self.cond.wait(&guard);
+            }
             drop(guard);
 
             // Clean up after ourselves
diff --git a/src/libstd/sys/common/net.rs b/src/libstd/sys/common/net.rs
index 029fc852742..ddc6dd021c3 100644
--- a/src/libstd/sys/common/net.rs
+++ b/src/libstd/sys/common/net.rs
@@ -16,13 +16,13 @@ use libc::{mod, c_char, c_int};
 use mem;
 use num::Int;
 use ptr::{mod, null, null_mut};
-use rustrt::mutex;
 use io::net::ip::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr};
 use io::net::addrinfo;
 use io::{IoResult, IoError};
 use sys::{mod, retry, c, sock_t, last_error, last_net_error, last_gai_error, close_sock,
           wrlen, msglen_t, os, wouldblock, set_nonblocking, timer, ms_to_timeval,
           decode_error_detailed};
+use sync::{Mutex, MutexGuard};
 use sys_common::{mod, keep_going, short_write, timeout};
 use prelude::*;
 use cmp;
@@ -557,12 +557,12 @@ struct Inner {
 
     // Unused on Linux, where this lock is not necessary.
     #[allow(dead_code)]
-    lock: mutex::NativeMutex
+    lock: Mutex<()>,
 }
 
 impl Inner {
     fn new(fd: sock_t) -> Inner {
-        Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
+        Inner { fd: fd, lock: Mutex::new(()) }
     }
 }
 
@@ -572,7 +572,7 @@ impl Drop for Inner {
 
 pub struct Guard<'a> {
     pub fd: sock_t,
-    pub guard: mutex::LockGuard<'a>,
+    pub guard: MutexGuard<'a, ()>,
 }
 
 #[unsafe_destructor]
@@ -666,7 +666,7 @@ impl TcpStream {
     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
         let ret = Guard {
             fd: self.fd(),
-            guard: unsafe { self.inner.lock.lock() },
+            guard: self.inner.lock.lock(),
         };
         assert!(set_nonblocking(self.fd(), true).is_ok());
         ret
@@ -805,7 +805,7 @@ impl UdpSocket {
     fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
         let ret = Guard {
             fd: self.fd(),
-            guard: unsafe { self.inner.lock.lock() },
+            guard: self.inner.lock.lock(),
         };
         assert!(set_nonblocking(self.fd(), true).is_ok());
         ret
diff --git a/src/libstd/sys/unix/mod.rs b/src/libstd/sys/unix/mod.rs
index 7b37fb3fb0f..4effedbe3ab 100644
--- a/src/libstd/sys/unix/mod.rs
+++ b/src/libstd/sys/unix/mod.rs
@@ -25,10 +25,12 @@ use sys_common::mkerr_libc;
 
 macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
     static $name: Helper<$m> = Helper {
-        lock: ::rustrt::mutex::NATIVE_MUTEX_INIT,
+        lock: ::sync::MUTEX_INIT,
+        cond: ::sync::CONDVAR_INIT,
         chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> },
         signal: ::cell::UnsafeCell { value: 0 },
         initialized: ::cell::UnsafeCell { value: false },
+        shutdown: ::cell::UnsafeCell { value: false },
     };
 ) )
 
diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs
index 3f70fb5c1a5..08e6f7059d8 100644
--- a/src/libstd/sys/unix/pipe.rs
+++ b/src/libstd/sys/unix/pipe.rs
@@ -12,8 +12,7 @@ use alloc::arc::Arc;
 use libc;
 use c_str::CString;
 use mem;
-use rustrt::mutex;
-use sync::atomic;
+use sync::{atomic, Mutex};
 use io::{mod, IoResult, IoError};
 use prelude::*;
 
@@ -60,12 +59,12 @@ struct Inner {
 
     // Unused on Linux, where this lock is not necessary.
     #[allow(dead_code)]
-    lock: mutex::NativeMutex
+    lock: Mutex<()>,
 }
 
 impl Inner {
     fn new(fd: fd_t) -> Inner {
-        Inner { fd: fd, lock: unsafe { mutex::NativeMutex::new() } }
+        Inner { fd: fd, lock: Mutex::new(()) }
     }
 }
 
diff --git a/src/libstd/sys/windows/mod.rs b/src/libstd/sys/windows/mod.rs
index e9243c5040c..9fce308cb94 100644
--- a/src/libstd/sys/windows/mod.rs
+++ b/src/libstd/sys/windows/mod.rs
@@ -26,10 +26,12 @@ use sync::{Once, ONCE_INIT};
 
 macro_rules! helper_init( (static $name:ident: Helper<$m:ty>) => (
     static $name: Helper<$m> = Helper {
-        lock: ::rustrt::mutex::NATIVE_MUTEX_INIT,
+        lock: ::sync::MUTEX_INIT,
+        cond: ::sync::CONDVAR_INIT,
         chan: ::cell::UnsafeCell { value: 0 as *mut Sender<$m> },
         signal: ::cell::UnsafeCell { value: 0 },
         initialized: ::cell::UnsafeCell { value: false },
+        shutdown: ::cell::UnsafeCell { value: false },
     };
 ) )
 
diff --git a/src/libstd/sys/windows/mutex.rs b/src/libstd/sys/windows/mutex.rs
index 10ebcf4bd09..ddd89070ed5 100644
--- a/src/libstd/sys/windows/mutex.rs
+++ b/src/libstd/sys/windows/mutex.rs
@@ -8,6 +8,8 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+use prelude::*;
+
 use sync::atomic;
 use alloc::{mod, heap};
 
@@ -21,8 +23,8 @@ pub struct Mutex { inner: atomic::AtomicUint }
 pub const MUTEX_INIT: Mutex = Mutex { inner: atomic::INIT_ATOMIC_UINT };
 
 #[inline]
-pub unsafe fn raw(m: &super::Mutex) -> ffi::LPCRITICAL_SECTION {
-    m.0.get()
+pub unsafe fn raw(m: &Mutex) -> ffi::LPCRITICAL_SECTION {
+    m.get()
 }
 
 impl Mutex {
diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs
index ca7985aa35b..bf658d0efd0 100644
--- a/src/libstd/sys/windows/pipe.rs
+++ b/src/libstd/sys/windows/pipe.rs
@@ -89,8 +89,7 @@ use libc;
 use c_str::CString;
 use mem;
 use ptr;
-use sync::atomic;
-use rustrt::mutex;
+use sync::{atomic, Mutex};
 use io::{mod, IoError, IoResult};
 use prelude::*;
 
@@ -126,7 +125,7 @@ impl Drop for Event {
 
 struct Inner {
     handle: libc::HANDLE,
-    lock: mutex::NativeMutex,
+    lock: Mutex<()>,
     read_closed: atomic::AtomicBool,
     write_closed: atomic::AtomicBool,
 }
@@ -135,7 +134,7 @@ impl Inner {
     fn new(handle: libc::HANDLE) -> Inner {
         Inner {
             handle: handle,
-            lock: unsafe { mutex::NativeMutex::new() },
+            lock: Mutex::new(()),
             read_closed: atomic::AtomicBool::new(false),
             write_closed: atomic::AtomicBool::new(false),
         }