diff options
| author | gennyble <gen@nyble.dev> | 2025-10-28 15:38:56 -0500 |
|---|---|---|
| committer | gennyble <gen@nyble.dev> | 2025-10-28 15:38:56 -0500 |
| commit | fcac0d06767b036e6e51a13d339763bcc32991d4 (patch) | |
| tree | d559d316ed645b7e15b6848eedaf98667bd45b63 | |
| parent | 4699eea8aeae3430a9ecdbd90f01bdc61fb92b59 (diff) | |
| download | corgi-main.tar.gz corgi-main.zip | |
| -rw-r--r-- | Cargo.lock | 53 | ||||
| -rw-r--r-- | TODO.md | 4 | ||||
| -rw-r--r-- | corgi/Cargo.toml | 3 | ||||
| -rw-r--r-- | corgi/src/caller.rs | 312 | ||||
| -rw-r--r-- | corgi/src/main.rs | 30 |
5 files changed, 345 insertions, 57 deletions
diff --git a/Cargo.lock b/Cargo.lock index b8d94f6..742a5aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,6 +86,8 @@ version = "1.1.0" dependencies = [ "base64", "confindent", + "futures-lite", + "futures-util", "http-body-util", "hyper", "hyper-util", @@ -94,6 +96,7 @@ dependencies = [ "sha2", "time", "tokio", + "tokio-util", ] [[package]] @@ -155,6 +158,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" [[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + +[[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -182,6 +191,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] name = "futures-task" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -378,6 +412,12 @@ dependencies = [ ] [[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + +[[package]] name = "parrot" version = "0.1.0" @@ -571,6 +611,19 @@ dependencies = [ ] [[package]] +name = "tokio-util" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] name = "typenum" version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/TODO.md b/TODO.md index 198fddf..a39516c 100644 --- a/TODO.md +++ b/TODO.md @@ -28,6 +28,10 @@ mostly "cgit", so that could get dramatically reduced. https://github.com/phiresky/sqlite-zstd +(8) Add "first appeared" column for the agents table + It would be nice to be able to "DEFAULT CURRENT_TIMESTAMP" but that's + not in the SQLite cards, I guess. + DONE :) DONE :) DONE :) DONE :) DONE :) DONE :) DONE :) DONE :) DONE :) ======================================================================= diff --git a/corgi/Cargo.toml b/corgi/Cargo.toml index b2e56ba..725d0cf 100644 --- a/corgi/Cargo.toml +++ b/corgi/Cargo.toml @@ -11,12 +11,15 @@ edition = "2024" [dependencies] base64 = "0.22.1" +futures-lite = "2.6.1" +futures-util = { version = "0.3.31", default-features = false } http-body-util = "0.1.3" hyper-util = { version = "0.1.10", features = ["tokio"] } regex-lite = "0.1.6" rusqlite = { version = "0.34.0", features = ["bundled", "time"] } sha2 = "0.10.8" time = { version = "0.3.41", features = ["formatting"] } +tokio-util = { version = "0.7.16", features = ["io"] } [dependencies.confindent] version = "2.2.1" diff --git a/corgi/src/caller.rs b/corgi/src/caller.rs index 29be5ca..385177f 100644 --- a/corgi/src/caller.rs +++ b/corgi/src/caller.rs @@ -1,9 +1,80 @@ -use std::{net::IpAddr, process::Stdio}; +use std::{ + net::IpAddr, + pin::Pin, + process::Stdio, + sync::{ + OnceLock, + mpsc::{Sender, TryRecvError}, + }, + task::Poll, + time::{Duration, Instant}, +}; -use tokio::{io::AsyncWriteExt, process::Command}; +use futures_util::stream::StreamExt; +use hyper::body::{Body, Bytes, Frame}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + process::{Child, ChildStdout, Command}, +}; +use tokio_util::io::ReaderStream; use crate::Script; +static GRAVEYARD: OnceLock<Sender<GraveyardEvent>> = OnceLock::new(); +const GRAVEYARD_WAIT_MS: u128 = 1000; + +enum GraveyardEvent { + Kill { child: Child }, +} + +pub fn start_graveyard() { + let (tx, rx) = std::sync::mpsc::channel(); + GRAVEYARD.set(tx).unwrap(); + + tokio::task::spawn(async move { + let mut children = vec![]; + + loop { + let start = Instant::now(); + + 'child_collector: loop { + match rx.try_recv() { + Ok(GraveyardEvent::Kill { child }) => children.push(child), + Err(TryRecvError::Empty) => { + break 'child_collector; + } + Err(TryRecvError::Disconnected) => { + eprintln!("graveyard channel closed"); + //return; + } + } + } + + let start_len = children.len(); + children.retain_mut(|child| match child.try_wait() { + Err(e) => { + eprintln!("[graveyard] error waiting on child. {e}"); + let _ = child.start_kill(); + false + } + Ok(None) => true, + Ok(Some(_code)) => false, + }); + let end_len = children.len(); + println!("[graveyard] reaped {} children", start_len - end_len); + + let elapsed = start.elapsed(); + + tokio::time::sleep(Duration::from_millis( + GRAVEYARD_WAIT_MS + .saturating_sub(elapsed.as_millis()) + .max(100) as u64, + )) + .await; + } + }); +} + pub struct HttpRequest { pub content_type: String, // gateway_interface = "CGI/1.1" @@ -64,7 +135,7 @@ pub async fn call_and_parse_cgi(script: Script, http: HttpRequest) -> CgiRespons }); let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); - let output = if let Some(bytes) = http.body { + let child = if let Some(bytes) = http.body { cmd.env("CONTENT_LENGTH", bytes.len().to_string()); let mut child = cmd.stdin(Stdio::piped()).spawn().unwrap(); @@ -78,76 +149,229 @@ pub async fn call_and_parse_cgi(script: Script, http: HttpRequest) -> CgiRespons cmd_stdin.flush().await.unwrap(); drop(cmd_stdin); - child.wait_with_output().await.unwrap() + child } else { - cmd.spawn().unwrap().wait_with_output().await.unwrap() + cmd.spawn().unwrap() }; - parse_cgi_response(&output.stdout) + parse_cgi_response(child).await } -fn parse_cgi_response(stdout: &[u8]) -> CgiResponse { - let mut response = CgiResponse { - // Default status code is 200 per RFC - status: 200, - headers: vec![], - body: None, - }; +async fn parse_cgi_response(mut child: Child) -> CgiResponse { + let mut status = 200; + let mut headers = vec![]; + + let mut stdout = child.stdout.take().unwrap(); + let mut weird = WeirdBuffer::new(); - let mut curr = stdout; loop { - // Find the newline to know where this header ends - let nl = curr.iter().position(|b| *b == b'\n').expect("no nl in header"); - let line = &curr[..nl]; + let newline_position = match weird.data().iter().position(|b| *b == b'\n') { + None => { + weird.reclaim(); + + let count = stdout.read(weird.free_mut()).await.unwrap(); + if count == 0 { + panic!("data malformed. body not found") + } + + weird.consume_free(count); + continue; + } + Some(nl) => nl, + }; + + // take_data consuming from the end of garbage is weird, but if we have + // the function also run consume_data for use, we need a mut reference + // which will block us from taking out immutable references later, like + // when forming the body + weird.consume_data(newline_position + 1); + let line = &weird.take_data(newline_position + 1)[..newline_position]; + + // zero-length line, we've hit the body! + if newline_position == 0 || (newline_position == 1 && line[0] == b'\r') { + let buffer = if weird.data_len() > 0 { + Some(weird.data().to_vec()) + } else { + None + }; - // Find the colon to separate the key from the value - let colon = line.iter().position(|b| *b == b':').expect("no colon in header"); - let key = &line[..colon]; - let mut value = &line[colon + 1..]; + let body = StreamedBody { + buffer, + stream: ReaderStream::new(stdout), + }; + + GRAVEYARD + .get() + .unwrap() + .send(GraveyardEvent::Kill { child }) + .unwrap(); + + return CgiResponse { + status, + headers, + body, + }; + } + let colon_position = match line.iter().position(|b| *b == b':') { + None => { + panic!("malformed header: no colon in header") + } + Some(cpos) => cpos, + }; + + let key = &line[..colon_position]; + let mut value = &line[colon_position + 1..]; if value[0] == b' ' { value = &value[1..]; } - if value[value.len().saturating_sub(1)] == b'\r' { - value = &value[..value.len().saturating_sub(1)]; + if value[value.len() - 1] == b'\r' { + value = &value[..value.len() - 1]; } - response.headers.push((key.to_vec(), value.to_vec())); + headers.push((key.to_vec(), value.to_vec())); - // Is this header a status line? + // Look for and extract status line let key_string = String::from_utf8_lossy(key); if key_string == "Status" { let value_string = String::from_utf8_lossy(value); if let Some((raw_code, _raw_msg)) = value_string.trim().split_once(' ') { let code: u16 = raw_code.parse().unwrap(); - response.status = code; + status = code; } } - - // Body next? - let next_nl = curr[nl + 1] == b'\n'; - let next_crlf = curr[nl + 1] == b'\r' && curr[nl + 2] == b'\n'; - if next_nl || next_crlf { - let offset = if next_nl { 2 } else { 3 }; - let body = &curr[nl + offset..]; - if body.len() > 0 { - response.body = Some(body.to_vec()); - } - - return response; - } - - // Move past the newline - curr = &curr[nl + 1..]; } } -#[derive(Debug)] pub struct CgiResponse { /// The Status header of the CGI response pub status: u16, /// Headers except "Status" pub headers: Vec<(Vec<u8>, Vec<u8>)>, /// CGI response body - pub body: Option<Vec<u8>>, + pub body: StreamedBody<ChildStdout>, +} + +pub type StreamedChild = StreamedBody<ChildStdout>; + +pub struct StreamedBody<R: AsyncRead + Unpin> { + buffer: Option<Vec<u8>>, + stream: ReaderStream<R>, +} + +impl<R: AsyncRead + Unpin> Body for StreamedBody<R> { + type Data = Bytes; + type Error = std::io::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> { + let this = self.get_mut(); + + if let Some(buffer) = this.buffer.take() { + Poll::Ready(Some(Ok(Frame::data(Bytes::from(buffer))))) + } else { + match this.stream.poll_next_unpin(cx) { + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Ok(by))) => Poll::Ready(Some(Ok(Frame::data(by)))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + } +} + +struct WeirdBuffer { + buffer: [u8; 8096], + data_start: usize, + free_start: usize, +} + +impl WeirdBuffer { + pub fn new() -> Self { + Self { + buffer: [0; 8096], + data_start: 0, + free_start: 0, + } + } + + pub fn data_len(&self) -> usize { + self.free_start - self.data_start + } + + pub fn data(&self) -> &[u8] { + &self.buffer[self.data_start..self.free_start] + } + + pub fn data_mut(&mut self) -> &mut [u8] { + &mut self.buffer[self.data_start..self.free_start] + } + + pub fn take_data(&self, count: usize) -> &[u8] { + let data = &self.buffer[self.data_start - count..self.data_start]; + data + } + + pub fn consume_data(&mut self, count: usize) { + if self.data_start + count > self.free_start { + panic!( + "tried to consume more data than there was. tried to consume {} from data section of len {}", + count, + self.data_len() + ) + } + + self.data_start += count + } + + pub fn consume_free(&mut self, count: usize) { + if self.free_start + count > self.buffer.len() { + panic!( + "tried to consume more free space than there was. tried to consume {} from free section of {}", + count, + self.free_len() + ) + } + + self.free_start += count; + } + + pub fn garbage_len(&self) -> usize { + self.data_start + } + + pub fn garbage(&self) -> &[u8] { + &self.buffer[..self.data_start] + } + + pub fn reclaim(&mut self) { + let src = self.buffer[self.data_start..self.free_start].as_ptr(); + let len = self.data_len(); + let dst = self.buffer[..len].as_mut_ptr(); + + if self.data_start == 0 { + return; + } + + unsafe { + std::ptr::copy(src, dst, self.data_len()); + } + + self.data_start = 0; + self.free_start = len; + } + + pub fn free_len(&self) -> usize { + self.buffer.len() - self.free_start + } + + pub fn free(&self) -> &[u8] { + &self.buffer[self.free_start..] + } + + pub fn free_mut(&mut self) -> &mut [u8] { + &mut self.buffer[self.free_start..] + } } diff --git a/corgi/src/main.rs b/corgi/src/main.rs index 1192c4c..bb44f05 100644 --- a/corgi/src/main.rs +++ b/corgi/src/main.rs @@ -7,7 +7,7 @@ use std::{ }; use caller::HttpRequest; -use http_body_util::{BodyExt, Full}; +use http_body_util::{BodyExt, Either, Full}; use hyper::{ HeaderMap, Request, Response, StatusCode, body::{Bytes, Incoming}, @@ -20,6 +20,8 @@ use stats::Stats; use tokio::{net::TcpListener, runtime::Runtime}; use util::owned_header; +use crate::caller::StreamedChild; + mod caller; mod settings; mod stats; @@ -47,6 +49,9 @@ async fn run(settings: Settings, stats: Stats) { let mut last_clean = None; + caller::start_graveyard(); + println!("started graveyard!"); + loop { // Clean at the top so we do it once on boot, but keep out of the // flow of the request to keep it speedy. This will delay accepting @@ -86,7 +91,7 @@ struct Svc { } impl Service<Request<Incoming>> for Svc { - type Response = Response<Full<Bytes>>; + type Response = Response<Either<Full<Bytes>, StreamedChild>>; type Error = hyper::Error; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; @@ -105,7 +110,7 @@ impl Svc { stats: Arc<Stats>, caddr: SocketAddr, req: Request<Incoming>, - ) -> Response<Full<Bytes>> { + ) -> Response<Either<Full<Bytes>, StreamedChild>> { match Self::handle_fallible(settings, stats, caddr, req).await { Err(re) => re.into_response(), Ok(response) => response, @@ -117,7 +122,7 @@ impl Svc { stats: Arc<Stats>, caddr: SocketAddr, req: Request<Incoming>, - ) -> Result<Response<Full<Bytes>>, RuntimeError> { + ) -> Result<Response<Either<Full<Bytes>, StreamedChild>>, RuntimeError> { // Collect things we need from the request before we eat it's body let method = req.method().as_str().to_ascii_uppercase(); let version = req.version(); @@ -191,12 +196,8 @@ impl Svc { stats.log_request(db_req); - let response_body = cgi_response - .body - .map(|v| Bytes::from(v)) - .unwrap_or(Bytes::new()); - - Ok(response.body(Full::new(response_body)).unwrap()) + let response_body = cgi_response.body; + Ok(response.body(Either::Right(response_body)).unwrap()) } fn select_script<'s>(settings: &'s Settings, path: &str) -> Option<&'s Script> { @@ -246,7 +247,10 @@ impl Svc { } } -fn status_page<D: fmt::Display>(status: u16, msg: D) -> Response<Full<Bytes>> { +fn status_page<D: fmt::Display>( + status: u16, + msg: D, +) -> Response<Either<Full<Bytes>, StreamedChild>> { let body_str = format!( "<html>\n\ \t<head><title>{status}</title></head>\n\ @@ -261,7 +265,7 @@ fn status_page<D: fmt::Display>(status: u16, msg: D) -> Response<Full<Bytes>> { Response::builder() .status(status) .header("Content-Type", "text/html") - .body(Full::new(body_str.into())) + .body(Either::Left(Full::new(body_str.into()))) .unwrap() } @@ -271,7 +275,7 @@ enum RuntimeError { } impl RuntimeError { - pub fn into_response(&self) -> Response<Full<Bytes>> { + pub fn into_response(&self) -> Response<Either<Full<Bytes>, StreamedChild>> { match self { Self::MalformedRequest => status_page(400, "bad request"), Self::NoScript => status_page(404, "failed to route request"), |
