diff options
| author | Patrick Walton <pcwalton@mimiga.net> | 2013-05-03 14:20:34 -0700 |
|---|---|---|
| committer | Patrick Walton <pcwalton@mimiga.net> | 2013-05-08 17:03:59 -0700 |
| commit | 6a44482b175a5486039fd5f2fd32f1051ce80e50 (patch) | |
| tree | 818881a061a2ece4bc5d1fe86952b0fbc5a981f9 | |
| parent | 803a4f45fa5b581155e638143afb97195cfa9f2e (diff) | |
| download | rust-6a44482b175a5486039fd5f2fd32f1051ce80e50.tar.gz rust-6a44482b175a5486039fd5f2fd32f1051ce80e50.zip | |
libcore: Remove mutable fields from pipes
| -rw-r--r-- | src/libcore/comm.rs | 94 | ||||
| -rw-r--r-- | src/libcore/pipes.rs | 317 | ||||
| -rw-r--r-- | src/libstd/comm.rs | 2 | ||||
| -rw-r--r-- | src/libstd/timer.rs | 35 |
4 files changed, 246 insertions, 202 deletions
diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index d075ff08bb7..56c301cd1c7 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -12,6 +12,7 @@ Message passing */ +use cast::transmute; use cast; use either::{Either, Left, Right}; use kinds::Owned; @@ -192,9 +193,9 @@ impl<T: Owned> Peekable<T> for Port<T> { fn peek(&self) -> bool { let mut endp = None; endp <-> self.endp; - let peek = match &endp { - &Some(ref endp) => peek(endp), - &None => fail!(~"peeking empty stream") + let peek = match endp { + Some(ref mut endp) => peek(endp), + None => fail!(~"peeking empty stream") }; self.endp <-> endp; peek @@ -202,10 +203,10 @@ impl<T: Owned> Peekable<T> for Port<T> { } impl<T: Owned> Selectable for Port<T> { - fn header(&self) -> *PacketHeader { + fn header(&mut self) -> *mut PacketHeader { unsafe { match self.endp { - Some(ref endp) => endp.header(), + Some(ref mut endp) => endp.header(), None => fail!(~"peeking empty stream") } } @@ -327,23 +328,20 @@ impl<T: Owned> ::clone::Clone for SharedChan<T> { #[allow(non_camel_case_types)] pub mod oneshot { priv use core::kinds::Owned; - use ptr::to_unsafe_ptr; + use ptr::to_mut_unsafe_ptr; pub fn init<T: Owned>() -> (client::Oneshot<T>, server::Oneshot<T>) { pub use core::pipes::HasBuffer; - let buffer = - ~::core::pipes::Buffer{ + let mut buffer = ~::core::pipes::Buffer { header: ::core::pipes::BufferHeader(), - data: __Buffer{ + data: __Buffer { Oneshot: ::core::pipes::mk_packet::<Oneshot<T>>() }, }; do ::core::pipes::entangle_buffer(buffer) |buffer, data| { - { - data.Oneshot.set_buffer(buffer); - to_unsafe_ptr(&data.Oneshot) - } + data.Oneshot.set_buffer(buffer); + to_mut_unsafe_ptr(&mut data.Oneshot) } } #[allow(non_camel_case_types)] @@ -497,48 +495,66 @@ pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T) -> bool { /// Returns the index of an endpoint that is ready to receive. -pub fn selecti<T: Selectable>(endpoints: &[T]) -> uint { +pub fn selecti<T: Selectable>(endpoints: &mut [T]) -> uint { wait_many(endpoints) } /// Returns 0 or 1 depending on which endpoint is ready to receive -pub fn select2i<A: Selectable, B: Selectable>(a: &A, b: &B) -> - Either<(), ()> { - match wait_many([a.header(), b.header()]) { - 0 => Left(()), - 1 => Right(()), - _ => fail!(~"wait returned unexpected index") +pub fn select2i<A:Selectable, B:Selectable>(a: &mut A, b: &mut B) + -> Either<(), ()> { + let mut endpoints = [ a.header(), b.header() ]; + match wait_many(endpoints) { + 0 => Left(()), + 1 => Right(()), + _ => fail!(~"wait returned unexpected index"), } } /// Receive a message from one of two endpoints. pub trait Select2<T: Owned, U: Owned> { /// Receive a message or return `None` if a connection closes. - fn try_select(&self) -> Either<Option<T>, Option<U>>; + fn try_select(&mut self) -> Either<Option<T>, Option<U>>; /// Receive a message or fail if a connection closes. - fn select(&self) -> Either<T, U>; + fn select(&mut self) -> Either<T, U>; } -impl<T: Owned, U: Owned, - Left: Selectable + GenericPort<T>, - Right: Selectable + GenericPort<U>> - Select2<T, U> for (Left, Right) { - - fn select(&self) -> Either<T, U> { - match *self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.recv()), - Right(()) => Right(rp.recv()) - } +impl<T:Owned, + U:Owned, + Left:Selectable + GenericPort<T>, + Right:Selectable + GenericPort<U>> + Select2<T, U> + for (Left, Right) { + fn select(&mut self) -> Either<T, U> { + // XXX: Bad borrow check workaround. + unsafe { + let this: &(Left, Right) = transmute(self); + match *this { + (ref lp, ref rp) => { + let lp: &mut Left = transmute(lp); + let rp: &mut Right = transmute(rp); + match select2i(lp, rp) { + Left(()) => Left(lp.recv()), + Right(()) => Right(rp.recv()), + } + } + } } } - fn try_select(&self) -> Either<Option<T>, Option<U>> { - match *self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.try_recv()), - Right(()) => Right(rp.try_recv()) - } + fn try_select(&mut self) -> Either<Option<T>, Option<U>> { + // XXX: Bad borrow check workaround. + unsafe { + let this: &(Left, Right) = transmute(self); + match *this { + (ref lp, ref rp) => { + let lp: &mut Left = transmute(lp); + let rp: &mut Right = transmute(rp); + match select2i(lp, rp) { + Left(()) => Left (lp.try_recv()), + Right(()) => Right(rp.try_recv()), + } + } + } } } } diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 19674900f90..1fda5a97a37 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -111,7 +111,7 @@ enum State { pub struct BufferHeader { // Tracks whether this buffer needs to be freed. We can probably // get away with restricting it to 0 or 1, if we're careful. - mut ref_count: int, + ref_count: int, // We may want a drop, and to be careful about stringing this // thing along. @@ -130,12 +130,12 @@ pub struct Buffer<T> { } pub struct PacketHeader { - mut state: State, - mut blocked_task: *rust_task, + state: State, + blocked_task: *rust_task, // This is a transmute_copy of a ~buffer, that can also be cast // to a buffer_header if need be. - mut buffer: *libc::c_void, + buffer: *libc::c_void, } pub fn PacketHeader() -> PacketHeader { @@ -148,14 +148,14 @@ pub fn PacketHeader() -> PacketHeader { pub impl PacketHeader { // Returns the old state. - unsafe fn mark_blocked(&self, this: *rust_task) -> State { + unsafe fn mark_blocked(&mut self, this: *rust_task) -> State { rustrt::rust_task_ref(this); let old_task = swap_task(&mut self.blocked_task, this); assert!(old_task.is_null()); swap_state_acq(&mut self.state, Blocked) } - unsafe fn unblock(&self) { + unsafe fn unblock(&mut self) { let old_task = swap_task(&mut self.blocked_task, ptr::null()); if !old_task.is_null() { rustrt::rust_task_deref(old_task) @@ -169,13 +169,13 @@ pub impl PacketHeader { // unsafe because this can do weird things to the space/time // continuum. It ends making multiple unique pointers to the same - // thing. You'll proobably want to forget them when you're done. - unsafe fn buf_header(&self) -> ~BufferHeader { + // thing. You'll probably want to forget them when you're done. + unsafe fn buf_header(&mut self) -> ~BufferHeader { assert!(self.buffer.is_not_null()); transmute_copy(&self.buffer) } - fn set_buffer<T:Owned>(&self, b: ~Buffer<T>) { + fn set_buffer<T:Owned>(&mut self, b: ~Buffer<T>) { unsafe { self.buffer = transmute_copy(&b); } @@ -184,15 +184,15 @@ pub impl PacketHeader { pub struct Packet<T> { header: PacketHeader, - mut payload: Option<T>, + payload: Option<T>, } pub trait HasBuffer { - fn set_buffer(&self, b: *libc::c_void); + fn set_buffer(&mut self, b: *libc::c_void); } impl<T:Owned> HasBuffer for Packet<T> { - fn set_buffer(&self, b: *libc::c_void) { + fn set_buffer(&mut self, b: *libc::c_void) { self.header.buffer = b; } } @@ -204,7 +204,7 @@ pub fn mk_packet<T:Owned>() -> Packet<T> { } } fn unibuffer<T>() -> ~Buffer<Packet<T>> { - let b = ~Buffer { + let mut b = ~Buffer { header: BufferHeader(), data: Packet { header: PacketHeader(), @@ -218,22 +218,25 @@ fn unibuffer<T>() -> ~Buffer<Packet<T>> { b } -pub fn packet<T>() -> *Packet<T> { - let b = unibuffer(); - let p = ptr::to_unsafe_ptr(&(b.data)); +pub fn packet<T>() -> *mut Packet<T> { + let mut b = unibuffer(); + let p = ptr::to_mut_unsafe_ptr(&mut b.data); // We'll take over memory management from here. - unsafe { forget(b) } + unsafe { + forget(b); + } p } pub fn entangle_buffer<T:Owned,Tstart:Owned>( - buffer: ~Buffer<T>, - init: &fn(*libc::c_void, x: &T) -> *Packet<Tstart>) - -> (SendPacketBuffered<Tstart, T>, RecvPacketBuffered<Tstart, T>) -{ - let p = init(unsafe { transmute_copy(&buffer) }, &buffer.data); - unsafe { forget(buffer) } - (SendPacketBuffered(p), RecvPacketBuffered(p)) + mut buffer: ~Buffer<T>, + init: &fn(*libc::c_void, x: &mut T) -> *mut Packet<Tstart>) + -> (SendPacketBuffered<Tstart, T>, RecvPacketBuffered<Tstart, T>) { + unsafe { + let p = init(transmute_copy(&buffer), &mut buffer.data); + forget(buffer); + (SendPacketBuffered(p), RecvPacketBuffered(p)) + } } pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task { @@ -292,7 +295,7 @@ fn swap_state_rel(dst: &mut State, src: State) -> State { } } -pub unsafe fn get_buffer<T>(p: *PacketHeader) -> ~Buffer<T> { +pub unsafe fn get_buffer<T>(p: *mut PacketHeader) -> ~Buffer<T> { transmute((*p).buf_header()) } @@ -306,10 +309,14 @@ struct BufferResource<T> { impl<T> Drop for BufferResource<T> { fn finalize(&self) { unsafe { - let b = move_it!(self.buffer); + let this: &mut BufferResource<T> = transmute(self); + + let mut b = move_it!(this.buffer); //let p = ptr::to_unsafe_ptr(*b); //error!("drop %?", p); - let old_count = intrinsics::atomic_xsub_rel(&mut b.header.ref_count, 1); + let old_count = intrinsics::atomic_xsub_rel( + &mut b.header.ref_count, + 1); //let old_count = atomic_xchng_rel(b.header.ref_count, 0); if old_count == 1 { // The new count is 0. @@ -323,10 +330,12 @@ impl<T> Drop for BufferResource<T> { } } -fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> { +fn BufferResource<T>(mut b: ~Buffer<T>) -> BufferResource<T> { //let p = ptr::to_unsafe_ptr(*b); //error!("take %?", p); - unsafe { intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1) }; + unsafe { + intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1); + } BufferResource { // tjc: ???? @@ -334,10 +343,12 @@ fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> { } } -pub fn send<T,Tbuffer>(p: SendPacketBuffered<T,Tbuffer>, payload: T) -> bool { +pub fn send<T,Tbuffer>(mut p: SendPacketBuffered<T,Tbuffer>, + payload: T) + -> bool { let header = p.header(); - let p_ = p.unwrap(); - let p = unsafe { &*p_ }; + let mut p_ = p.unwrap(); + let p = unsafe { &mut *p_ }; assert!(ptr::to_unsafe_ptr(&(p.header)) == header); assert!(p.payload.is_none()); p.payload = Some(payload); @@ -391,11 +402,12 @@ Returns `None` if the sender has closed the connection without sending a message, or `Some(T)` if a message was received. */ -pub fn try_recv<T:Owned,Tbuffer:Owned>(p: RecvPacketBuffered<T, Tbuffer>) - -> Option<T> -{ - let p_ = p.unwrap(); - let p = unsafe { &*p_ }; +pub fn try_recv<T:Owned,Tbuffer:Owned>(mut p: RecvPacketBuffered<T, Tbuffer>) + -> Option<T> { + let mut p_ = p.unwrap(); + let mut p = unsafe { + &mut *p_ + }; do (|| { try_recv_(p) @@ -412,7 +424,7 @@ pub fn try_recv<T:Owned,Tbuffer:Owned>(p: RecvPacketBuffered<T, Tbuffer>) } } -fn try_recv_<T:Owned>(p: &Packet<T>) -> Option<T> { +fn try_recv_<T:Owned>(p: &mut Packet<T>) -> Option<T> { // optimistic path match p.header.state { Full => { @@ -498,16 +510,20 @@ fn try_recv_<T:Owned>(p: &Packet<T>) -> Option<T> { } /// Returns true if messages are available. -pub fn peek<T:Owned,Tb:Owned>(p: &RecvPacketBuffered<T, Tb>) -> bool { - match unsafe {(*p.header()).state} { - Empty | Terminated => false, - Blocked => fail!(~"peeking on blocked packet"), - Full => true +pub fn peek<T:Owned,Tb:Owned>(p: &mut RecvPacketBuffered<T, Tb>) -> bool { + unsafe { + match (*p.header()).state { + Empty | Terminated => false, + Blocked => fail!(~"peeking on blocked packet"), + Full => true + } } } -fn sender_terminate<T:Owned>(p: *Packet<T>) { - let p = unsafe { &*p }; +fn sender_terminate<T:Owned>(p: *mut Packet<T>) { + let p = unsafe { + &mut *p + }; match swap_state_rel(&mut p.header.state, Terminated) { Empty => { // The receiver will eventually clean up. @@ -536,8 +552,10 @@ fn sender_terminate<T:Owned>(p: *Packet<T>) { } } -fn receiver_terminate<T:Owned>(p: *Packet<T>) { - let p = unsafe { &*p }; +fn receiver_terminate<T:Owned>(p: *mut Packet<T>) { + let p = unsafe { + &mut *p + }; match swap_state_rel(&mut p.header.state, Terminated) { Empty => { assert!(p.header.blocked_task.is_null()); @@ -569,8 +587,10 @@ that vector. The index points to an endpoint that has either been closed by the sender or has a message waiting to be received. */ -pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint { - let this = unsafe { rustrt::rust_get_task() }; +pub fn wait_many<T: Selectable>(pkts: &mut [T]) -> uint { + let this = unsafe { + rustrt::rust_get_task() + }; unsafe { rustrt::task_clear_event_reject(this); @@ -578,19 +598,19 @@ pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint { let mut data_avail = false; let mut ready_packet = pkts.len(); - for pkts.eachi |i, p| { + for vec::eachi_mut(pkts) |i, p| { unsafe { - let p = &*p.header(); + let p = &mut *p.header(); let old = p.mark_blocked(this); match old { - Full | Terminated => { - data_avail = true; - ready_packet = i; - (*p).state = old; - break; - } - Blocked => fail!(~"blocking on blocked packet"), - Empty => () + Full | Terminated => { + data_avail = true; + ready_packet = i; + (*p).state = old; + break; + } + Blocked => fail!(~"blocking on blocked packet"), + Empty => () } } } @@ -598,7 +618,14 @@ pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint { while !data_avail { debug!("sleeping on %? packets", pkts.len()); let event = wait_event(this) as *PacketHeader; - let pos = vec::position(pkts, |p| p.header() == event); + + let mut pos = None; + for vec::eachi_mut(pkts) |i, p| { + if p.header() == event { + pos = Some(i); + break; + } + }; match pos { Some(i) => { @@ -609,11 +636,15 @@ pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint { } } - debug!("%?", pkts[ready_packet]); + debug!("%?", &mut pkts[ready_packet]); - for pkts.each |p| { unsafe{ (*p.header()).unblock()} } + for vec::each_mut(pkts) |p| { + unsafe { + (*p.header()).unblock() + } + } - debug!("%?, %?", ready_packet, pkts[ready_packet]); + debug!("%?, %?", ready_packet, &mut pkts[ready_packet]); unsafe { assert!((*pkts[ready_packet].header()).state == Full @@ -629,65 +660,58 @@ message. */ pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>; -pub fn SendPacket<T>(p: *Packet<T>) -> SendPacket<T> { +pub fn SendPacket<T>(p: *mut Packet<T>) -> SendPacket<T> { SendPacketBuffered(p) } pub struct SendPacketBuffered<T, Tbuffer> { - mut p: Option<*Packet<T>>, - mut buffer: Option<BufferResource<Tbuffer>>, + p: Option<*mut Packet<T>>, + buffer: Option<BufferResource<Tbuffer>>, } #[unsafe_destructor] impl<T:Owned,Tbuffer:Owned> Drop for SendPacketBuffered<T,Tbuffer> { fn finalize(&self) { - //if self.p != none { - // debug!("drop send %?", option::get(self.p)); - //} - if self.p != None { - let mut p = None; - p <-> self.p; - sender_terminate(p.unwrap()) + unsafe { + let this: &mut SendPacketBuffered<T,Tbuffer> = transmute(self); + if this.p != None { + let mut p = None; + p <-> this.p; + sender_terminate(p.unwrap()) + } } - //unsafe { error!("send_drop: %?", - // if self.buffer == none { - // "none" - // } else { "some" }); } } } -pub fn SendPacketBuffered<T,Tbuffer>(p: *Packet<T>) - -> SendPacketBuffered<T, Tbuffer> { - //debug!("take send %?", p); +pub fn SendPacketBuffered<T,Tbuffer>(p: *mut Packet<T>) + -> SendPacketBuffered<T,Tbuffer> { SendPacketBuffered { p: Some(p), buffer: unsafe { - Some(BufferResource( - get_buffer(ptr::to_unsafe_ptr(&((*p).header))))) + Some(BufferResource(get_buffer(&mut (*p).header))) } } } pub impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> { - fn unwrap(&self) -> *Packet<T> { + fn unwrap(&mut self) -> *mut Packet<T> { let mut p = None; p <-> self.p; p.unwrap() } - fn header(&self) -> *PacketHeader { + fn header(&mut self) -> *mut PacketHeader { match self.p { - Some(packet) => unsafe { - let packet = &*packet; - let header = ptr::to_unsafe_ptr(&(packet.header)); - //forget(packet); - header - }, - None => fail!(~"packet already consumed") + Some(packet) => unsafe { + let packet = &mut *packet; + let header = ptr::to_mut_unsafe_ptr(&mut packet.header); + header + }, + None => fail!(~"packet already consumed") } } - fn reuse_buffer(&self) -> BufferResource<Tbuffer> { + fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> { //error!("send reuse_buffer"); let mut tmp = None; tmp <-> self.buffer; @@ -699,41 +723,37 @@ pub impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> { /// message. pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>; -pub fn RecvPacket<T>(p: *Packet<T>) -> RecvPacket<T> { +pub fn RecvPacket<T>(p: *mut Packet<T>) -> RecvPacket<T> { RecvPacketBuffered(p) } + pub struct RecvPacketBuffered<T, Tbuffer> { - mut p: Option<*Packet<T>>, - mut buffer: Option<BufferResource<Tbuffer>>, + p: Option<*mut Packet<T>>, + buffer: Option<BufferResource<Tbuffer>>, } #[unsafe_destructor] impl<T:Owned,Tbuffer:Owned> Drop for RecvPacketBuffered<T,Tbuffer> { fn finalize(&self) { - //if self.p != none { - // debug!("drop recv %?", option::get(self.p)); - //} - if self.p != None { - let mut p = None; - p <-> self.p; - receiver_terminate(p.unwrap()) + unsafe { + let this: &mut RecvPacketBuffered<T,Tbuffer> = transmute(self); + if this.p != None { + let mut p = None; + p <-> this.p; + receiver_terminate(p.unwrap()) + } } - //unsafe { error!("recv_drop: %?", - // if self.buffer == none { - // "none" - // } else { "some" }); } } } pub impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> { - fn unwrap(&self) -> *Packet<T> { + fn unwrap(&mut self) -> *mut Packet<T> { let mut p = None; p <-> self.p; p.unwrap() } - fn reuse_buffer(&self) -> BufferResource<Tbuffer> { - //error!("recv reuse_buffer"); + fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> { let mut tmp = None; tmp <-> self.buffer; tmp.unwrap() @@ -741,27 +761,24 @@ pub impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> { } impl<T:Owned,Tbuffer:Owned> Selectable for RecvPacketBuffered<T, Tbuffer> { - fn header(&self) -> *PacketHeader { + fn header(&mut self) -> *mut PacketHeader { match self.p { - Some(packet) => unsafe { - let packet = &*packet; - let header = ptr::to_unsafe_ptr(&(packet.header)); - //forget(packet); - header - }, - None => fail!(~"packet already consumed") + Some(packet) => unsafe { + let packet = &mut *packet; + let header = ptr::to_mut_unsafe_ptr(&mut packet.header); + header + }, + None => fail!(~"packet already consumed") } } } -pub fn RecvPacketBuffered<T,Tbuffer>(p: *Packet<T>) - -> RecvPacketBuffered<T,Tbuffer> { - //debug!("take recv %?", p); +pub fn RecvPacketBuffered<T,Tbuffer>(p: *mut Packet<T>) + -> RecvPacketBuffered<T,Tbuffer> { RecvPacketBuffered { p: Some(p), buffer: unsafe { - Some(BufferResource( - get_buffer(ptr::to_unsafe_ptr(&((*p).header))))) + Some(BufferResource(get_buffer(&mut (*p).header))) } } } @@ -800,51 +817,55 @@ this case, `select2` may return either `left` or `right`. */ pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>( - a: RecvPacketBuffered<A, Ab>, - b: RecvPacketBuffered<B, Bb>) + mut a: RecvPacketBuffered<A, Ab>, + mut b: RecvPacketBuffered<B, Bb>) -> Either<(Option<A>, RecvPacketBuffered<B, Bb>), - (RecvPacketBuffered<A, Ab>, Option<B>)> -{ - let i = wait_many([a.header(), b.header()]); - + (RecvPacketBuffered<A, Ab>, Option<B>)> { + let mut endpoints = [ a.header(), b.header() ]; + let i = wait_many(endpoints); match i { - 0 => Left((try_recv(a), b)), - 1 => Right((a, try_recv(b))), - _ => fail!(~"select2 return an invalid packet") + 0 => Left((try_recv(a), b)), + 1 => Right((a, try_recv(b))), + _ => fail!(~"select2 return an invalid packet") } } pub trait Selectable { - fn header(&self) -> *PacketHeader; + fn header(&mut self) -> *mut PacketHeader; } -impl Selectable for *PacketHeader { - fn header(&self) -> *PacketHeader { *self } +impl Selectable for *mut PacketHeader { + fn header(&mut self) -> *mut PacketHeader { *self } } /// Returns the index of an endpoint that is ready to receive. -pub fn selecti<T:Selectable>(endpoints: &[T]) -> uint { +pub fn selecti<T:Selectable>(endpoints: &mut [T]) -> uint { wait_many(endpoints) } /// Returns 0 or 1 depending on which endpoint is ready to receive -pub fn select2i<A:Selectable,B:Selectable>(a: &A, b: &B) -> - Either<(), ()> { - match wait_many([a.header(), b.header()]) { - 0 => Left(()), - 1 => Right(()), - _ => fail!(~"wait returned unexpected index") +pub fn select2i<A:Selectable,B:Selectable>(a: &mut A, b: &mut B) + -> Either<(), ()> { + let mut endpoints = [ a.header(), b.header() ]; + match wait_many(endpoints) { + 0 => Left(()), + 1 => Right(()), + _ => fail!(~"wait returned unexpected index") } } -/** Waits on a set of endpoints. Returns a message, its index, and a - list of the remaining endpoints. +/// Waits on a set of endpoints. Returns a message, its index, and a +/// list of the remaining endpoints. +pub fn select<T:Owned,Tb:Owned>(mut endpoints: ~[RecvPacketBuffered<T, Tb>]) + -> (uint, + Option<T>, + ~[RecvPacketBuffered<T, Tb>]) { + let mut endpoint_headers = ~[]; + for vec::each_mut(endpoints) |endpoint| { + endpoint_headers.push(endpoint.header()); + } -*/ -pub fn select<T:Owned,Tb:Owned>(endpoints: ~[RecvPacketBuffered<T, Tb>]) - -> (uint, Option<T>, ~[RecvPacketBuffered<T, Tb>]) -{ - let ready = wait_many(endpoints.map(|p| p.header())); + let ready = wait_many(endpoint_headers); let mut remaining = endpoints; let port = remaining.swap_remove(ready); let result = try_recv(port); diff --git a/src/libstd/comm.rs b/src/libstd/comm.rs index d866ee6cedb..20ab2d61ecc 100644 --- a/src/libstd/comm.rs +++ b/src/libstd/comm.rs @@ -72,7 +72,7 @@ impl<T:Owned,U:Owned> Peekable<U> for DuplexStream<T, U> { } impl<T:Owned,U:Owned> Selectable for DuplexStream<T, U> { - fn header(&self) -> *pipes::PacketHeader { + fn header(&mut self) -> *mut pipes::PacketHeader { self.port.header() } } diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index b19b2f2889e..76aa4d615e1 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -14,10 +14,11 @@ use uv; use uv::iotask; use uv::iotask::IoTask; -use core::libc; -use core::libc::c_void; use core::cast::transmute; +use core::cast; use core::comm::{stream, Chan, SharedChan, Port, select2i}; +use core::libc::c_void; +use core::libc; /** * Wait for timeout period then send provided value over a channel @@ -120,22 +121,28 @@ pub fn sleep(iotask: &IoTask, msecs: uint) { pub fn recv_timeout<T:Copy + Owned>(iotask: &IoTask, msecs: uint, wait_po: &Port<T>) - -> Option<T> { - let (timeout_po, timeout_ch) = stream::<()>(); + -> Option<T> { + let mut (timeout_po, timeout_ch) = stream::<()>(); delayed_send(iotask, msecs, &timeout_ch, ()); - // FIXME: This could be written clearer (#2618) - either::either( - |_| { - None - }, |_| { - Some(wait_po.recv()) - }, &select2i(&timeout_po, wait_po) - ) + + // XXX: Workaround due to ports and channels not being &mut. They should + // be. + unsafe { + let wait_po = cast::transmute_mut(wait_po); + + // FIXME: This could be written clearer (#2618) + either::either( + |_| { + None + }, |_| { + Some(wait_po.recv()) + }, &select2i(&mut timeout_po, wait_po) + ) + } } // INTERNAL API -extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, - status: libc::c_int) { +extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t, status: libc::c_int) { unsafe { debug!( "delayed_send_cb handle %? status %?", handle, status); |
