about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrian Anderson <banderson@mozilla.com>2013-04-15 16:00:15 -0700
committerBrian Anderson <banderson@mozilla.com>2013-04-15 16:00:15 -0700
commitebefe07792caf17c03c6f90fb1979d4e6c935001 (patch)
treec0849200ec0237e39999e09a9d0eb2fbcbf84a43
parent473b4d19ad51529afb8f1391cf471a20053c781c (diff)
downloadrust-ebefe07792caf17c03c6f90fb1979d4e6c935001.tar.gz
rust-ebefe07792caf17c03c6f90fb1979d4e6c935001.zip
core::rt: Make Scheduler::unsafe_local return a fabricated region pointer
Instead of taking a closure. It's unsafe either way. Rename it to unsafe_local_borrow.
-rw-r--r--src/libcore/rt/sched/local.rs7
-rw-r--r--src/libcore/rt/sched/mod.rs97
-rw-r--r--src/libcore/rt/uvio.rs371
3 files changed, 228 insertions, 247 deletions
diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local.rs
index 020d581546a..d8001011114 100644
--- a/src/libcore/rt/sched/local.rs
+++ b/src/libcore/rt/sched/local.rs
@@ -43,7 +43,7 @@ pub fn take() -> ~Scheduler {
 /// # Safety Note
 /// Because this leaves the Scheduler in thread-local storage it is possible
 /// For the Scheduler pointer to be aliased
-pub unsafe fn borrow(f: &fn(&mut Scheduler)) {
+pub unsafe fn borrow() -> &mut Scheduler {
     unsafe {
         let key = tls_key();
         let mut void_sched: *mut c_void = tls::get(key);
@@ -54,7 +54,7 @@ pub unsafe fn borrow(f: &fn(&mut Scheduler)) {
                 transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr)
             };
             let sched: &mut Scheduler = &mut **sched;
-            f(sched);
+            return sched;
         }
     }
 }
@@ -93,8 +93,7 @@ fn borrow_smoke_test() {
     let scheduler = ~UvEventLoop::new_scheduler();
     put(scheduler);
     unsafe {
-        do borrow |_sched| {
-        }
+        let _scheduler = borrow();
     }
     let _scheduler = take();
 }
diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs
index 82f0c3592c4..a2a440ba76e 100644
--- a/src/libcore/rt/sched/mod.rs
+++ b/src/libcore/rt/sched/mod.rs
@@ -95,20 +95,18 @@ pub impl Scheduler {
         // Give ownership of the scheduler (self) to the thread
         local::put(self);
 
-        do Scheduler::unsafe_local |scheduler| {
-            fn run_scheduler_once() {
-                do Scheduler::unsafe_local |scheduler| {
-                    if scheduler.resume_task_from_queue() {
-                        // Ok, a task ran. Nice! We'll do it again later
-                        scheduler.event_loop.callback(run_scheduler_once);
-                    }
-                }
+        let scheduler = Scheduler::unsafe_local_borrow();
+        fn run_scheduler_once() {
+            let scheduler = Scheduler::unsafe_local_borrow();
+            if scheduler.resume_task_from_queue() {
+                // Ok, a task ran. Nice! We'll do it again later
+                scheduler.event_loop.callback(run_scheduler_once);
             }
-
-            scheduler.event_loop.callback(run_scheduler_once);
-            scheduler.event_loop.run();
         }
 
+        scheduler.event_loop.callback(run_scheduler_once);
+        scheduler.event_loop.run();
+
         return local::take();
     }
 
@@ -116,8 +114,14 @@ pub impl Scheduler {
     /// # Safety Note
     /// This allows other mutable aliases to the scheduler, both in the current
     /// execution context and other execution contexts.
-    fn unsafe_local(f: &fn(&mut Scheduler)) {
-        unsafe { local::borrow(f) }
+    fn unsafe_local_borrow() -> &mut Scheduler {
+        unsafe { local::borrow() }
+    }
+
+    fn local_borrow(f: &fn(&mut Scheduler)) {
+        let mut sched = local::take();
+        f(sched);
+        local::put(sched);
     }
 
     // * Scheduler-context operations
@@ -208,9 +212,8 @@ pub impl Scheduler {
         }
 
         // We could be executing in a different thread now
-        do Scheduler::unsafe_local |sched| {
-            sched.run_cleanup_job();
-        }
+        let sched = Scheduler::unsafe_local_borrow();
+        sched.run_cleanup_job();
     }
 
     /// Switch directly to another task, without going through the scheduler.
@@ -232,9 +235,8 @@ pub impl Scheduler {
         }
 
         // We could be executing in a different thread now
-        do Scheduler::unsafe_local |sched| {
-            sched.run_cleanup_job();
-        }
+        let sched = Scheduler::unsafe_local_borrow();
+        sched.run_cleanup_job();
     }
 
     // * Other stuff
@@ -331,15 +333,13 @@ pub impl Task {
             // This is the first code to execute after the initial
             // context switch to the task. The previous context may
             // have asked us to do some cleanup.
-            do Scheduler::unsafe_local |sched| {
-                sched.run_cleanup_job();
-            }
+            let sched = Scheduler::unsafe_local_borrow();
+            sched.run_cleanup_job();
 
             start();
 
-            do Scheduler::unsafe_local |sched| {
-                sched.terminate_current_task();
-            }
+            let sched = Scheduler::unsafe_local_borrow();
+            sched.terminate_current_task();
         };
         return wrapper;
     }
@@ -398,13 +398,12 @@ fn test_swap_tasks() {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task1 = ~do Task::new(&mut sched.stack_pool) {
             unsafe { *count_ptr = *count_ptr + 1; }
-            do Scheduler::unsafe_local |sched| {
-                let task2 = ~do Task::new(&mut sched.stack_pool) {
-                    unsafe { *count_ptr = *count_ptr + 1; }
-                };
-                // Context switch directly to the new task
-                sched.resume_task_from_running_task_direct(task2);
-            }
+            let sched = Scheduler::unsafe_local_borrow();
+            let task2 = ~do Task::new(&mut sched.stack_pool) {
+                unsafe { *count_ptr = *count_ptr + 1; }
+            };
+            // Context switch directly to the new task
+            sched.resume_task_from_running_task_direct(task2);
             unsafe { *count_ptr = *count_ptr + 1; }
         };
         sched.task_queue.push_back(task1);
@@ -431,7 +430,7 @@ fn test_run_a_lot_of_tasks_queued() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            do Scheduler::unsafe_local |sched| {
+            do Scheduler::local_borrow |sched| {
                 let task = ~do Task::new(&mut sched.stack_pool) {
                     unsafe {
                         *count_ptr = *count_ptr + 1;
@@ -464,18 +463,17 @@ fn test_run_a_lot_of_tasks_direct() {
         assert!(count == MAX);
 
         fn run_task(count_ptr: *mut int) {
-            do Scheduler::unsafe_local |sched| {
-                let task = ~do Task::new(&mut sched.stack_pool) {
-                    unsafe {
-                        *count_ptr = *count_ptr + 1;
-                        if *count_ptr != MAX {
-                            run_task(count_ptr);
-                        }
+            let sched = Scheduler::unsafe_local_borrow();
+            let task = ~do Task::new(&mut sched.stack_pool) {
+                unsafe {
+                    *count_ptr = *count_ptr + 1;
+                    if *count_ptr != MAX {
+                        run_task(count_ptr);
                     }
-                };
-                // Context switch directly to the new task
-                sched.resume_task_from_running_task_direct(task);
-            }
+                }
+            };
+            // Context switch directly to the new task
+            sched.resume_task_from_running_task_direct(task);
         };
     }
 }
@@ -485,12 +483,11 @@ fn test_block_task() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::unsafe_local |sched| {
-                assert!(sched.in_task_context());
-                do sched.deschedule_running_task_and_then() |sched, task| {
-                    assert!(!sched.in_task_context());
-                    sched.task_queue.push_back(task);
-                }
+            let sched = Scheduler::unsafe_local_borrow();
+            assert!(sched.in_task_context());
+            do sched.deschedule_running_task_and_then() |sched, task| {
+                assert!(!sched.in_task_context());
+                sched.task_queue.push_back(task);
             }
         };
         sched.task_queue.push_back(task);
diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs
index 7dfc6fff4cd..a43ec07c2de 100644
--- a/src/libcore/rt/uvio.rs
+++ b/src/libcore/rt/uvio.rs
@@ -104,37 +104,35 @@ impl IoFactory for UvIoFactory {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
 
-        do Scheduler::unsafe_local |scheduler| {
-            assert!(scheduler.in_task_context());
-
-            // Block this task and take ownership, switch to scheduler context
-            do scheduler.deschedule_running_task_and_then |scheduler, task| {
-
-                rtdebug!("connect: entered scheduler context");
-                assert!(!scheduler.in_task_context());
-                let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
-                let task_cell = Cell(task);
-
-                // Wait for a connection
-                do tcp_watcher.connect(addr) |stream_watcher, status| {
-                    rtdebug!("connect: in connect callback");
-                    let maybe_stream = if status.is_none() {
-                        rtdebug!("status is none");
-                        Some(~UvStream(stream_watcher))
-                    } else {
-                        rtdebug!("status is some");
-                        stream_watcher.close(||());
-                        None
-                    };
-
-                    // Store the stream in the task's stack
-                    unsafe { (*result_cell_ptr).put_back(maybe_stream); }
-
-                    // Context switch
-                    do Scheduler::unsafe_local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+        let scheduler = Scheduler::unsafe_local_borrow();
+        assert!(scheduler.in_task_context());
+
+        // Block this task and take ownership, switch to scheduler context
+        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+
+            rtdebug!("connect: entered scheduler context");
+            assert!(!scheduler.in_task_context());
+            let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
+            let task_cell = Cell(task);
+
+            // Wait for a connection
+            do tcp_watcher.connect(addr) |stream_watcher, status| {
+                rtdebug!("connect: in connect callback");
+                let maybe_stream = if status.is_none() {
+                    rtdebug!("status is none");
+                    Some(~UvStream(stream_watcher))
+                } else {
+                    rtdebug!("status is some");
+                    stream_watcher.close(||());
+                    None
+                };
+
+                // Store the stream in the task's stack
+                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
+
+                // Context switch
+                let scheduler = Scheduler::unsafe_local_borrow();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -178,33 +176,31 @@ impl TcpListener for UvTcpListener {
 
         let server_tcp_watcher = self.watcher();
 
-        do Scheduler::unsafe_local |scheduler| {
-            assert!(scheduler.in_task_context());
-
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let task_cell = Cell(task);
-                let mut server_tcp_watcher = server_tcp_watcher;
-                do server_tcp_watcher.listen |server_stream_watcher, status| {
-                    let maybe_stream = if status.is_none() {
-                        let mut server_stream_watcher = server_stream_watcher;
-                        let mut loop_ = loop_from_watcher(&server_stream_watcher);
-                        let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
-                        let mut client_tcp_watcher = client_tcp_watcher.as_stream();
-                        // XXX: Need's to be surfaced in interface
-                        server_stream_watcher.accept(client_tcp_watcher);
-                        Some(~UvStream::new(client_tcp_watcher))
-                    } else {
-                        None
-                    };
-
-                    unsafe { (*result_cell_ptr).put_back(maybe_stream); }
-
-                    rtdebug!("resuming task from listen");
-                    // Context switch
-                    do Scheduler::unsafe_local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+        let scheduler = Scheduler::unsafe_local_borrow();
+        assert!(scheduler.in_task_context());
+
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let task_cell = Cell(task);
+            let mut server_tcp_watcher = server_tcp_watcher;
+            do server_tcp_watcher.listen |server_stream_watcher, status| {
+                let maybe_stream = if status.is_none() {
+                    let mut server_stream_watcher = server_stream_watcher;
+                    let mut loop_ = loop_from_watcher(&server_stream_watcher);
+                    let mut client_tcp_watcher = TcpWatcher::new(&mut loop_);
+                    let mut client_tcp_watcher = client_tcp_watcher.as_stream();
+                    // XXX: Need's to be surfaced in interface
+                    server_stream_watcher.accept(client_tcp_watcher);
+                    Some(~UvStream::new(client_tcp_watcher))
+                } else {
+                    None
+                };
+
+                unsafe { (*result_cell_ptr).put_back(maybe_stream); }
+
+                rtdebug!("resuming task from listen");
+                // Context switch
+                let scheduler = Scheduler::unsafe_local_borrow();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -243,42 +239,40 @@ impl Stream for UvStream {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
 
-        do Scheduler::unsafe_local |scheduler| {
-            assert!(scheduler.in_task_context());
-            let watcher = self.watcher();
-            let buf_ptr: *&mut [u8] = &buf;
-            do scheduler.deschedule_running_task_and_then |scheduler, task| {
-                rtdebug!("read: entered scheduler context");
-                assert!(!scheduler.in_task_context());
+        let scheduler = Scheduler::unsafe_local_borrow();
+        assert!(scheduler.in_task_context());
+        let watcher = self.watcher();
+        let buf_ptr: *&mut [u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |scheduler, task| {
+            rtdebug!("read: entered scheduler context");
+            assert!(!scheduler.in_task_context());
+            let mut watcher = watcher;
+            let task_cell = Cell(task);
+            // XXX: We shouldn't reallocate these callbacks every
+            // call to read
+            let alloc: AllocCallback = |_| unsafe {
+                slice_to_uv_buf(*buf_ptr)
+            };
+            do watcher.read_start(alloc) |watcher, nread, _buf, status| {
+
+                // Stop reading so that no read callbacks are
+                // triggered before the user calls `read` again.
+                // XXX: Is there a performance impact to calling
+                // stop here?
                 let mut watcher = watcher;
-                let task_cell = Cell(task);
-                // XXX: We shouldn't reallocate these callbacks every
-                // call to read
-                let alloc: AllocCallback = |_| unsafe {
-                    slice_to_uv_buf(*buf_ptr)
+                watcher.read_stop();
+
+                let result = if status.is_none() {
+                    assert!(nread >= 0);
+                    Ok(nread as uint)
+                } else {
+                    Err(())
                 };
-                do watcher.read_start(alloc) |watcher, nread, _buf, status| {
-
-                    // Stop reading so that no read callbacks are
-                    // triggered before the user calls `read` again.
-                    // XXX: Is there a performance impact to calling
-                    // stop here?
-                    let mut watcher = watcher;
-                    watcher.read_stop();
-
-                    let result = if status.is_none() {
-                        assert!(nread >= 0);
-                        Ok(nread as uint)
-                    } else {
-                        Err(())
-                    };
-
-                    unsafe { (*result_cell_ptr).put_back(result); }
-
-                    do Scheduler::unsafe_local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Scheduler::unsafe_local_borrow();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -289,29 +283,27 @@ impl Stream for UvStream {
     fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
         let result_cell = empty_cell();
         let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
-        do Scheduler::unsafe_local |scheduler| {
-            assert!(scheduler.in_task_context());
-            let watcher = self.watcher();
-            let buf_ptr: *&[u8] = &buf;
-            do scheduler.deschedule_running_task_and_then |_, task| {
-                let mut watcher = watcher;
-                let task_cell = Cell(task);
-                let buf = unsafe { &*buf_ptr };
-                // XXX: OMGCOPIES
-                let buf = buf.to_vec();
-                do watcher.write(buf) |_watcher, status| {
-                    let result = if status.is_none() {
-                        Ok(())
-                    } else {
-                        Err(())
-                    };
-
-                    unsafe { (*result_cell_ptr).put_back(result); }
-
-                    do Scheduler::unsafe_local |scheduler| {
-                        scheduler.resume_task_immediately(task_cell.take());
-                    }
-                }
+        let scheduler = Scheduler::unsafe_local_borrow();
+        assert!(scheduler.in_task_context());
+        let watcher = self.watcher();
+        let buf_ptr: *&[u8] = &buf;
+        do scheduler.deschedule_running_task_and_then |_, task| {
+            let mut watcher = watcher;
+            let task_cell = Cell(task);
+            let buf = unsafe { &*buf_ptr };
+            // XXX: OMGCOPIES
+            let buf = buf.to_vec();
+            do watcher.write(buf) |_watcher, status| {
+                let result = if status.is_none() {
+                    Ok(())
+                } else {
+                    Err(())
+                };
+
+                unsafe { (*result_cell_ptr).put_back(result); }
+
+                let scheduler = Scheduler::unsafe_local_borrow();
+                scheduler.resume_task_immediately(task_cell.take());
             }
         }
 
@@ -326,12 +318,11 @@ fn test_simple_io_no_connect() {
     do run_in_bare_thread {
         let mut sched = ~UvEventLoop::new_scheduler();
         let task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::unsafe_local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let addr = Ipv4(127, 0, 0, 1, 2926);
-                let maybe_chan = io.connect(addr);
-                assert!(maybe_chan.is_none());
-            }
+            let sched = Scheduler::unsafe_local_borrow();
+            let io = sched.event_loop.io().unwrap();
+            let addr = Ipv4(127, 0, 0, 1, 2926);
+            let maybe_chan = io.connect(addr);
+            assert!(maybe_chan.is_none());
         };
         sched.task_queue.push_back(task);
         sched.run();
@@ -346,29 +337,27 @@ fn test_simple_tcp_server_and_client() {
         let addr = Ipv4(127, 0, 0, 1, 2929);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::unsafe_local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut stream = io.connect(addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.close();
-            }
+            let sched = Scheduler::unsafe_local_borrow();
+            let io = sched.event_loop.io().unwrap();
+            let mut stream = io.connect(addr).unwrap();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.close();
         };
 
         let server_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::unsafe_local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
-                let mut buf = [0, .. 2048];
-                let nread = stream.read(buf).unwrap();
-                assert!(nread == 8);
-                for uint::range(0, nread) |i| {
-                    rtdebug!("%u", buf[i] as uint);
-                    assert!(buf[i] == i as u8);
-                }
-                stream.close();
-                listener.close();
+            let sched = Scheduler::unsafe_local_borrow();
+            let io = sched.event_loop.io().unwrap();
+            let mut listener = io.bind(addr).unwrap();
+            let mut stream = listener.listen().unwrap();
+            let mut buf = [0, .. 2048];
+            let nread = stream.read(buf).unwrap();
+            assert!(nread == 8);
+            for uint::range(0, nread) |i| {
+                rtdebug!("%u", buf[i] as uint);
+                assert!(buf[i] == i as u8);
             }
+            stream.close();
+            listener.close();
         };
 
         // Start the server first so it listens before the client connects
@@ -385,53 +374,50 @@ fn test_read_and_block() {
         let addr = Ipv4(127, 0, 0, 1, 2930);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::unsafe_local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut stream = io.connect(addr).unwrap();
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
-                stream.close();
-            }
+            let sched = Scheduler::unsafe_local_borrow();
+            let io = sched.event_loop.io().unwrap();
+            let mut stream = io.connect(addr).unwrap();
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
+            stream.close();
         };
 
         let server_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::unsafe_local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut listener = io.bind(addr).unwrap();
-                let mut stream = listener.listen().unwrap();
-                let mut buf = [0, .. 2048];
-
-                let expected = 32;
-                let mut current = 0;
-                let mut reads = 0;
-
-                while current < expected {
-                    let nread = stream.read(buf).unwrap();
-                    for uint::range(0, nread) |i| {
-                        let val = buf[i] as uint;
-                        assert!(val == current % 8);
-                        current += 1;
-                    }
-                    reads += 1;
-
-                    do Scheduler::unsafe_local |scheduler| {
-                        // Yield to the other task in hopes that it
-                        // will trigger a read callback while we are
-                        // not ready for it
-                        do scheduler.deschedule_running_task_and_then |scheduler, task| {
-                            scheduler.task_queue.push_back(task);
-                        }
-                    }
-                }
+            let sched = Scheduler::unsafe_local_borrow();
+            let io = sched.event_loop.io().unwrap();
+            let mut listener = io.bind(addr).unwrap();
+            let mut stream = listener.listen().unwrap();
+            let mut buf = [0, .. 2048];
 
-                // Make sure we had multiple reads
-                assert!(reads > 1);
+            let expected = 32;
+            let mut current = 0;
+            let mut reads = 0;
 
-                stream.close();
-                listener.close();
+            while current < expected {
+                let nread = stream.read(buf).unwrap();
+                for uint::range(0, nread) |i| {
+                    let val = buf[i] as uint;
+                    assert!(val == current % 8);
+                    current += 1;
+                }
+                reads += 1;
+
+                let scheduler = Scheduler::unsafe_local_borrow();
+                // Yield to the other task in hopes that it
+                // will trigger a read callback while we are
+                // not ready for it
+                do scheduler.deschedule_running_task_and_then |scheduler, task| {
+                    scheduler.task_queue.push_back(task);
+                }
             }
+
+            // Make sure we had multiple reads
+            assert!(reads > 1);
+
+            stream.close();
+            listener.close();
         };
 
         // Start the server first so it listens before the client connects
@@ -448,19 +434,18 @@ fn test_read_read_read() {
         let addr = Ipv4(127, 0, 0, 1, 2931);
 
         let client_task = ~do Task::new(&mut sched.stack_pool) {
-            do Scheduler::unsafe_local |sched| {
-                let io = sched.event_loop.io().unwrap();
-                let mut stream = io.connect(addr).unwrap();
-                let mut buf = [0, .. 2048];
-                let mut total_bytes_read = 0;
-                while total_bytes_read < 500000000 {
-                    let nread = stream.read(buf).unwrap();
-                    rtdebug!("read %u bytes", nread as uint);
-                    total_bytes_read += nread;
-                }
-                rtdebug_!("read %u bytes total", total_bytes_read as uint);
-                stream.close();
+            let sched = Scheduler::unsafe_local_borrow();
+            let io = sched.event_loop.io().unwrap();
+            let mut stream = io.connect(addr).unwrap();
+            let mut buf = [0, .. 2048];
+            let mut total_bytes_read = 0;
+            while total_bytes_read < 500000000 {
+                let nread = stream.read(buf).unwrap();
+                rtdebug!("read %u bytes", nread as uint);
+                total_bytes_read += nread;
             }
+            rtdebug_!("read %u bytes total", total_bytes_read as uint);
+            stream.close();
         };
 
         sched.task_queue.push_back(client_task);