diff options
| author | Aleksey Kladov <aleksey.kladov@gmail.com> | 2019-02-14 20:43:45 +0300 |
|---|---|---|
| committer | Aleksey Kladov <aleksey.kladov@gmail.com> | 2019-02-14 21:11:07 +0300 |
| commit | bf352cd2511775a331d77dee261b64bd8359dacb (patch) | |
| tree | ba7d988ebef437d5a9d7beac048b5ac0dbd2fe9c /crates/thread_worker/src | |
| parent | 10bf61b83b2600ed3cb7e7825f1cd0ee83e9b7e7 (diff) | |
| download | rust-bf352cd2511775a331d77dee261b64bd8359dacb.tar.gz rust-bf352cd2511775a331d77dee261b64bd8359dacb.zip | |
automatically wait for worker threads
closes #817
Diffstat (limited to 'crates/thread_worker/src')
| -rw-r--r-- | crates/thread_worker/src/lib.rs | 120 |
1 files changed, 63 insertions, 57 deletions
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs index a522a0843d6..d67e44e3800 100644 --- a/crates/thread_worker/src/lib.rs +++ b/crates/thread_worker/src/lib.rs @@ -2,74 +2,80 @@ use std::thread; -use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; -use drop_bomb::DropBomb; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; -pub struct Worker<I, O> { - pub inp: Sender<I>, - pub out: Receiver<O>, +/// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically. +pub struct ScopedThread { + // Option for drop + inner: Option<thread::JoinHandle<()>>, } -pub struct WorkerHandle { - name: &'static str, - thread: thread::JoinHandle<()>, - bomb: DropBomb, -} +impl Drop for ScopedThread { + fn drop(&mut self) { + let inner = self.inner.take().unwrap(); + let name = inner.thread().name().unwrap().to_string(); + log::info!("waiting for {} to finish...", name); + let res = inner.join(); + log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" }); -pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle) -where - F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, - I: Send + 'static, - O: Send + 'static, -{ - let (worker, inp_r, out_s) = worker_chan(buf); - let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); - (worker, watcher) -} - -impl<I, O> Worker<I, O> { - /// Stops the worker. Returns the message receiver to fetch results which - /// have become ready before the worker is stopped. - pub fn shutdown(self) -> Receiver<O> { - self.out + // escalate panic, but avoid aborting the process + match res { + Err(e) => { + if !thread::panicking() { + panic!(e) + } + } + _ => (), + } } +} - pub fn send(&self, item: I) -> Result<(), SendError<I>> { - self.inp.send(item) - } - pub fn recv(&self) -> Result<O, RecvError> { - self.out.recv() +impl ScopedThread { + pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread { + let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap(); + ScopedThread { inner: Some(inner) } } } -impl WorkerHandle { - fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { - let thread = thread::spawn(f); - WorkerHandle { - name, - thread, - bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)), - } - } +/// A wrapper around event-processing thread with automatic shutdown semantics. +pub struct Worker<I, O> { + // XXX: field order is significant here. + // + // In Rust, fields are dropped in the declaration order, and we rely on this + // here. We must close input first, so that the `thread` (who holds the + // opposite side of the channel) noticed shutdown. Then, we must join the + // thread, but we must keep out alive so that the thread does not panic. + // + // Note that a potential problem here is that we might drop some messages + // from receiver on the floor. This is ok for rust-analyzer: we have only a + // single client, so, if we are shutting down, nobody is interested in the + // unfinished work anyway! + sender: Sender<I>, + _thread: ScopedThread, + receiver: Receiver<O>, +} - pub fn shutdown(mut self) -> thread::Result<()> { - log::info!("waiting for {} to finish ...", self.name); - let name = self.name; - self.bomb.defuse(); - let res = self.thread.join(); - match &res { - Ok(()) => log::info!("... {} terminated with ok", name), - Err(_) => log::error!("... {} terminated with err", name), - } - res +impl<I, O> Worker<I, O> { + pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> Worker<I, O> + where + F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + // Set up worker channels in a deadlock-avoiding way. If one sets both input + // and output buffers to a fixed size, a worker might get stuck. + let (sender, input_receiver) = bounded::<I>(buf); + let (output_sender, receiver) = unbounded::<O>(); + let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender)); + Worker { sender, _thread, receiver } } } -/// Sets up worker channels in a deadlock-avoiding way. -/// If one sets both input and output buffers to a fixed size, -/// a worker might get stuck. -fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) { - let (input_sender, input_receiver) = bounded::<I>(buf); - let (output_sender, output_receiver) = unbounded::<O>(); - (Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender) +impl<I, O> Worker<I, O> { + pub fn sender(&self) -> &Sender<I> { + &self.sender + } + pub fn receiver(&self) -> &Receiver<O> { + &self.receiver + } } |
