about summary refs log tree commit diff
diff options
context:
space:
mode:
authorgennyble <gen@nyble.dev>2025-10-28 15:38:56 -0500
committergennyble <gen@nyble.dev>2025-10-28 15:38:56 -0500
commitfcac0d06767b036e6e51a13d339763bcc32991d4 (patch)
treed559d316ed645b7e15b6848eedaf98667bd45b63
parent4699eea8aeae3430a9ecdbd90f01bdc61fb92b59 (diff)
downloadcorgi-main.tar.gz
corgi-main.zip
Stream body from child to client and reap children HEAD main
-rw-r--r--Cargo.lock53
-rw-r--r--TODO.md4
-rw-r--r--corgi/Cargo.toml3
-rw-r--r--corgi/src/caller.rs312
-rw-r--r--corgi/src/main.rs30
5 files changed, 345 insertions, 57 deletions
diff --git a/Cargo.lock b/Cargo.lock
index b8d94f6..742a5aa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -86,6 +86,8 @@ version = "1.1.0"
 dependencies = [
  "base64",
  "confindent",
+ "futures-lite",
+ "futures-util",
  "http-body-util",
  "hyper",
  "hyper-util",
@@ -94,6 +96,7 @@ dependencies = [
  "sha2",
  "time",
  "tokio",
+ "tokio-util",
 ]
 
 [[package]]
@@ -155,6 +158,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
 
 [[package]]
+name = "fastrand"
+version = "2.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
+
+[[package]]
 name = "fnv"
 version = "1.0.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -182,6 +191,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
 
 [[package]]
+name = "futures-io"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+
+[[package]]
+name = "futures-lite"
+version = "2.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
+dependencies = [
+ "fastrand",
+ "futures-core",
+ "futures-io",
+ "parking",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
+
+[[package]]
 name = "futures-task"
 version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -378,6 +412,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "parking"
+version = "2.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
+
+[[package]]
 name = "parrot"
 version = "0.1.0"
 
@@ -571,6 +611,19 @@ dependencies = [
 ]
 
 [[package]]
+name = "tokio-util"
+version = "0.7.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
+[[package]]
 name = "typenum"
 version = "1.18.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/TODO.md b/TODO.md
index 198fddf..a39516c 100644
--- a/TODO.md
+++ b/TODO.md
@@ -28,6 +28,10 @@
 	mostly "cgit", so that could get dramatically reduced.
 	https://github.com/phiresky/sqlite-zstd
 
+(8) Add "first appeared" column for the agents table
+	It would be nice to be able to "DEFAULT CURRENT_TIMESTAMP" but that's
+	not in the SQLite cards, I guess.
+
 DONE :) DONE :) DONE :) DONE :) DONE :) DONE :) DONE :) DONE :) DONE :)
 =======================================================================
 
diff --git a/corgi/Cargo.toml b/corgi/Cargo.toml
index b2e56ba..725d0cf 100644
--- a/corgi/Cargo.toml
+++ b/corgi/Cargo.toml
@@ -11,12 +11,15 @@ edition = "2024"
 
 [dependencies]
 base64 = "0.22.1"
+futures-lite = "2.6.1"
+futures-util = { version = "0.3.31", default-features = false }
 http-body-util = "0.1.3"
 hyper-util = { version = "0.1.10", features = ["tokio"] }
 regex-lite = "0.1.6"
 rusqlite = { version = "0.34.0", features = ["bundled", "time"] }
 sha2 = "0.10.8"
 time = { version = "0.3.41", features = ["formatting"] }
+tokio-util = { version = "0.7.16", features = ["io"] }
 
 [dependencies.confindent]
 version = "2.2.1"
diff --git a/corgi/src/caller.rs b/corgi/src/caller.rs
index 29be5ca..385177f 100644
--- a/corgi/src/caller.rs
+++ b/corgi/src/caller.rs
@@ -1,9 +1,80 @@
-use std::{net::IpAddr, process::Stdio};
+use std::{
+	net::IpAddr,
+	pin::Pin,
+	process::Stdio,
+	sync::{
+		OnceLock,
+		mpsc::{Sender, TryRecvError},
+	},
+	task::Poll,
+	time::{Duration, Instant},
+};
 
-use tokio::{io::AsyncWriteExt, process::Command};
+use futures_util::stream::StreamExt;
+use hyper::body::{Body, Bytes, Frame};
+use tokio::{
+	io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
+	process::{Child, ChildStdout, Command},
+};
+use tokio_util::io::ReaderStream;
 
 use crate::Script;
 
+static GRAVEYARD: OnceLock<Sender<GraveyardEvent>> = OnceLock::new();
+const GRAVEYARD_WAIT_MS: u128 = 1000;
+
+enum GraveyardEvent {
+	Kill { child: Child },
+}
+
+pub fn start_graveyard() {
+	let (tx, rx) = std::sync::mpsc::channel();
+	GRAVEYARD.set(tx).unwrap();
+
+	tokio::task::spawn(async move {
+		let mut children = vec![];
+
+		loop {
+			let start = Instant::now();
+
+			'child_collector: loop {
+				match rx.try_recv() {
+					Ok(GraveyardEvent::Kill { child }) => children.push(child),
+					Err(TryRecvError::Empty) => {
+						break 'child_collector;
+					}
+					Err(TryRecvError::Disconnected) => {
+						eprintln!("graveyard channel closed");
+						//return;
+					}
+				}
+			}
+
+			let start_len = children.len();
+			children.retain_mut(|child| match child.try_wait() {
+				Err(e) => {
+					eprintln!("[graveyard] error waiting on child. {e}");
+					let _ = child.start_kill();
+					false
+				}
+				Ok(None) => true,
+				Ok(Some(_code)) => false,
+			});
+			let end_len = children.len();
+			println!("[graveyard] reaped {} children", start_len - end_len);
+
+			let elapsed = start.elapsed();
+
+			tokio::time::sleep(Duration::from_millis(
+				GRAVEYARD_WAIT_MS
+					.saturating_sub(elapsed.as_millis())
+					.max(100) as u64,
+			))
+			.await;
+		}
+	});
+}
+
 pub struct HttpRequest {
 	pub content_type: String,
 	// gateway_interface = "CGI/1.1"
@@ -64,7 +135,7 @@ pub async fn call_and_parse_cgi(script: Script, http: HttpRequest) -> CgiRespons
 		});
 
 	let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
-	let output = if let Some(bytes) = http.body {
+	let child = if let Some(bytes) = http.body {
 		cmd.env("CONTENT_LENGTH", bytes.len().to_string());
 		let mut child = cmd.stdin(Stdio::piped()).spawn().unwrap();
 
@@ -78,76 +149,229 @@ pub async fn call_and_parse_cgi(script: Script, http: HttpRequest) -> CgiRespons
 		cmd_stdin.flush().await.unwrap();
 		drop(cmd_stdin);
 
-		child.wait_with_output().await.unwrap()
+		child
 	} else {
-		cmd.spawn().unwrap().wait_with_output().await.unwrap()
+		cmd.spawn().unwrap()
 	};
 
-	parse_cgi_response(&output.stdout)
+	parse_cgi_response(child).await
 }
 
-fn parse_cgi_response(stdout: &[u8]) -> CgiResponse {
-	let mut response = CgiResponse {
-		// Default status code is 200 per RFC
-		status: 200,
-		headers: vec![],
-		body: None,
-	};
+async fn parse_cgi_response(mut child: Child) -> CgiResponse {
+	let mut status = 200;
+	let mut headers = vec![];
+
+	let mut stdout = child.stdout.take().unwrap();
+	let mut weird = WeirdBuffer::new();
 
-	let mut curr = stdout;
 	loop {
-		// Find the newline to know where this header ends
-		let nl = curr.iter().position(|b| *b == b'\n').expect("no nl in header");
-		let line = &curr[..nl];
+		let newline_position = match weird.data().iter().position(|b| *b == b'\n') {
+			None => {
+				weird.reclaim();
+
+				let count = stdout.read(weird.free_mut()).await.unwrap();
+				if count == 0 {
+					panic!("data malformed. body not found")
+				}
+
+				weird.consume_free(count);
+				continue;
+			}
+			Some(nl) => nl,
+		};
+
+		// take_data consuming from the end of garbage is weird, but if we have
+		// the function also run consume_data for use, we need a mut reference
+		// which will block us from taking out immutable references later, like
+		// when forming the body
+		weird.consume_data(newline_position + 1);
+		let line = &weird.take_data(newline_position + 1)[..newline_position];
+
+		// zero-length line, we've hit the body!
+		if newline_position == 0 || (newline_position == 1 && line[0] == b'\r') {
+			let buffer = if weird.data_len() > 0 {
+				Some(weird.data().to_vec())
+			} else {
+				None
+			};
 
-		// Find the colon to separate the key from the value
-		let colon = line.iter().position(|b| *b == b':').expect("no colon in header");
-		let key = &line[..colon];
-		let mut value = &line[colon + 1..];
+			let body = StreamedBody {
+				buffer,
+				stream: ReaderStream::new(stdout),
+			};
+
+			GRAVEYARD
+				.get()
+				.unwrap()
+				.send(GraveyardEvent::Kill { child })
+				.unwrap();
+
+			return CgiResponse {
+				status,
+				headers,
+				body,
+			};
+		}
 
+		let colon_position = match line.iter().position(|b| *b == b':') {
+			None => {
+				panic!("malformed header: no colon in header")
+			}
+			Some(cpos) => cpos,
+		};
+
+		let key = &line[..colon_position];
+		let mut value = &line[colon_position + 1..];
 		if value[0] == b' ' {
 			value = &value[1..];
 		}
-		if value[value.len().saturating_sub(1)] == b'\r' {
-			value = &value[..value.len().saturating_sub(1)];
+		if value[value.len() - 1] == b'\r' {
+			value = &value[..value.len() - 1];
 		}
 
-		response.headers.push((key.to_vec(), value.to_vec()));
+		headers.push((key.to_vec(), value.to_vec()));
 
-		// Is this header a status line?
+		// Look for and extract status line
 		let key_string = String::from_utf8_lossy(key);
 		if key_string == "Status" {
 			let value_string = String::from_utf8_lossy(value);
 			if let Some((raw_code, _raw_msg)) = value_string.trim().split_once(' ') {
 				let code: u16 = raw_code.parse().unwrap();
-				response.status = code;
+				status = code;
 			}
 		}
-
-		// Body next?
-		let next_nl = curr[nl + 1] == b'\n';
-		let next_crlf = curr[nl + 1] == b'\r' && curr[nl + 2] == b'\n';
-		if next_nl || next_crlf {
-			let offset = if next_nl { 2 } else { 3 };
-			let body = &curr[nl + offset..];
-			if body.len() > 0 {
-				response.body = Some(body.to_vec());
-			}
-
-			return response;
-		}
-
-		// Move past the newline
-		curr = &curr[nl + 1..];
 	}
 }
 
-#[derive(Debug)]
 pub struct CgiResponse {
 	/// The Status header of the CGI response
 	pub status: u16,
 	/// Headers except "Status"
 	pub headers: Vec<(Vec<u8>, Vec<u8>)>,
 	/// CGI response body
-	pub body: Option<Vec<u8>>,
+	pub body: StreamedBody<ChildStdout>,
+}
+
+pub type StreamedChild = StreamedBody<ChildStdout>;
+
+pub struct StreamedBody<R: AsyncRead + Unpin> {
+	buffer: Option<Vec<u8>>,
+	stream: ReaderStream<R>,
+}
+
+impl<R: AsyncRead + Unpin> Body for StreamedBody<R> {
+	type Data = Bytes;
+	type Error = std::io::Error;
+
+	fn poll_frame(
+		self: Pin<&mut Self>,
+		cx: &mut std::task::Context<'_>,
+	) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
+		let this = self.get_mut();
+
+		if let Some(buffer) = this.buffer.take() {
+			Poll::Ready(Some(Ok(Frame::data(Bytes::from(buffer)))))
+		} else {
+			match this.stream.poll_next_unpin(cx) {
+				Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
+				Poll::Ready(Some(Ok(by))) => Poll::Ready(Some(Ok(Frame::data(by)))),
+				Poll::Ready(None) => Poll::Ready(None),
+				Poll::Pending => Poll::Pending,
+			}
+		}
+	}
+}
+
+struct WeirdBuffer {
+	buffer: [u8; 8096],
+	data_start: usize,
+	free_start: usize,
+}
+
+impl WeirdBuffer {
+	pub fn new() -> Self {
+		Self {
+			buffer: [0; 8096],
+			data_start: 0,
+			free_start: 0,
+		}
+	}
+
+	pub fn data_len(&self) -> usize {
+		self.free_start - self.data_start
+	}
+
+	pub fn data(&self) -> &[u8] {
+		&self.buffer[self.data_start..self.free_start]
+	}
+
+	pub fn data_mut(&mut self) -> &mut [u8] {
+		&mut self.buffer[self.data_start..self.free_start]
+	}
+
+	pub fn take_data(&self, count: usize) -> &[u8] {
+		let data = &self.buffer[self.data_start - count..self.data_start];
+		data
+	}
+
+	pub fn consume_data(&mut self, count: usize) {
+		if self.data_start + count > self.free_start {
+			panic!(
+				"tried to consume more data than there was. tried to consume {} from data section of len {}",
+				count,
+				self.data_len()
+			)
+		}
+
+		self.data_start += count
+	}
+
+	pub fn consume_free(&mut self, count: usize) {
+		if self.free_start + count > self.buffer.len() {
+			panic!(
+				"tried to consume more free space than there was. tried to consume {} from free section of {}",
+				count,
+				self.free_len()
+			)
+		}
+
+		self.free_start += count;
+	}
+
+	pub fn garbage_len(&self) -> usize {
+		self.data_start
+	}
+
+	pub fn garbage(&self) -> &[u8] {
+		&self.buffer[..self.data_start]
+	}
+
+	pub fn reclaim(&mut self) {
+		let src = self.buffer[self.data_start..self.free_start].as_ptr();
+		let len = self.data_len();
+		let dst = self.buffer[..len].as_mut_ptr();
+
+		if self.data_start == 0 {
+			return;
+		}
+
+		unsafe {
+			std::ptr::copy(src, dst, self.data_len());
+		}
+
+		self.data_start = 0;
+		self.free_start = len;
+	}
+
+	pub fn free_len(&self) -> usize {
+		self.buffer.len() - self.free_start
+	}
+
+	pub fn free(&self) -> &[u8] {
+		&self.buffer[self.free_start..]
+	}
+
+	pub fn free_mut(&mut self) -> &mut [u8] {
+		&mut self.buffer[self.free_start..]
+	}
 }
diff --git a/corgi/src/main.rs b/corgi/src/main.rs
index 1192c4c..bb44f05 100644
--- a/corgi/src/main.rs
+++ b/corgi/src/main.rs
@@ -7,7 +7,7 @@ use std::{
 };
 
 use caller::HttpRequest;
-use http_body_util::{BodyExt, Full};
+use http_body_util::{BodyExt, Either, Full};
 use hyper::{
 	HeaderMap, Request, Response, StatusCode,
 	body::{Bytes, Incoming},
@@ -20,6 +20,8 @@ use stats::Stats;
 use tokio::{net::TcpListener, runtime::Runtime};
 use util::owned_header;
 
+use crate::caller::StreamedChild;
+
 mod caller;
 mod settings;
 mod stats;
@@ -47,6 +49,9 @@ async fn run(settings: Settings, stats: Stats) {
 
 	let mut last_clean = None;
 
+	caller::start_graveyard();
+	println!("started graveyard!");
+
 	loop {
 		// Clean at the top so we do it once on boot, but keep out of the
 		// flow of the request to keep it speedy. This will delay accepting
@@ -86,7 +91,7 @@ struct Svc {
 }
 
 impl Service<Request<Incoming>> for Svc {
-	type Response = Response<Full<Bytes>>;
+	type Response = Response<Either<Full<Bytes>, StreamedChild>>;
 	type Error = hyper::Error;
 	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
 
@@ -105,7 +110,7 @@ impl Svc {
 		stats: Arc<Stats>,
 		caddr: SocketAddr,
 		req: Request<Incoming>,
-	) -> Response<Full<Bytes>> {
+	) -> Response<Either<Full<Bytes>, StreamedChild>> {
 		match Self::handle_fallible(settings, stats, caddr, req).await {
 			Err(re) => re.into_response(),
 			Ok(response) => response,
@@ -117,7 +122,7 @@ impl Svc {
 		stats: Arc<Stats>,
 		caddr: SocketAddr,
 		req: Request<Incoming>,
-	) -> Result<Response<Full<Bytes>>, RuntimeError> {
+	) -> Result<Response<Either<Full<Bytes>, StreamedChild>>, RuntimeError> {
 		// Collect things we need from the request before we eat it's body
 		let method = req.method().as_str().to_ascii_uppercase();
 		let version = req.version();
@@ -191,12 +196,8 @@ impl Svc {
 
 		stats.log_request(db_req);
 
-		let response_body = cgi_response
-			.body
-			.map(|v| Bytes::from(v))
-			.unwrap_or(Bytes::new());
-
-		Ok(response.body(Full::new(response_body)).unwrap())
+		let response_body = cgi_response.body;
+		Ok(response.body(Either::Right(response_body)).unwrap())
 	}
 
 	fn select_script<'s>(settings: &'s Settings, path: &str) -> Option<&'s Script> {
@@ -246,7 +247,10 @@ impl Svc {
 	}
 }
 
-fn status_page<D: fmt::Display>(status: u16, msg: D) -> Response<Full<Bytes>> {
+fn status_page<D: fmt::Display>(
+	status: u16,
+	msg: D,
+) -> Response<Either<Full<Bytes>, StreamedChild>> {
 	let body_str = format!(
 		"<html>\n\
 			\t<head><title>{status}</title></head>\n\
@@ -261,7 +265,7 @@ fn status_page<D: fmt::Display>(status: u16, msg: D) -> Response<Full<Bytes>> {
 	Response::builder()
 		.status(status)
 		.header("Content-Type", "text/html")
-		.body(Full::new(body_str.into()))
+		.body(Either::Left(Full::new(body_str.into())))
 		.unwrap()
 }
 
@@ -271,7 +275,7 @@ enum RuntimeError {
 }
 
 impl RuntimeError {
-	pub fn into_response(&self) -> Response<Full<Bytes>> {
+	pub fn into_response(&self) -> Response<Either<Full<Bytes>, StreamedChild>> {
 		match self {
 			Self::MalformedRequest => status_page(400, "bad request"),
 			Self::NoScript => status_page(404, "failed to route request"),