about summary refs log tree commit diff
path: root/corgi/src
diff options
context:
space:
mode:
Diffstat (limited to 'corgi/src')
-rw-r--r--corgi/src/main.rs29
-rw-r--r--corgi/src/stats.rs134
2 files changed, 159 insertions, 4 deletions
diff --git a/corgi/src/main.rs b/corgi/src/main.rs
index 1772d68..1192c4c 100644
--- a/corgi/src/main.rs
+++ b/corgi/src/main.rs
@@ -1,5 +1,10 @@
 use core::fmt;
-use std::{net::SocketAddr, pin::Pin, sync::Arc};
+use std::{
+	net::SocketAddr,
+	pin::Pin,
+	sync::Arc,
+	time::{Duration, Instant},
+};
 
 use caller::HttpRequest;
 use http_body_util::{BodyExt, Full};
@@ -40,11 +45,33 @@ async fn run(settings: Settings, stats: Stats) {
 		client_addr: addr,
 	};
 
+	let mut last_clean = None;
+
 	loop {
+		// Clean at the top so we do it once on boot, but keep out of the
+		// flow of the request to keep it speedy. This will delay accepting
+		// a new connection when the clean actually runs, but that is fine.
+		match last_clean {
+			None => {
+				let count = svc.stats.cleanup_ephemeral_requests();
+				println!("cleaned {count} requests from the ephemeral table");
+				last_clean = Some(Instant::now());
+			}
+			Some(inst) if inst.elapsed() >= Duration::from_secs(60 * 60) => {
+				let count = svc.stats.cleanup_ephemeral_requests();
+				println!("cleaned {count} requests from the ephemeral table");
+				last_clean = Some(Instant::now());
+			}
+			_ => (),
+		}
+
+		// Now we accept the connection and spawn a handler
 		let (stream, caddr) = listen.accept().await.unwrap();
 		let io = TokioIo::new(stream);
+
 		let mut svc_clone = svc.clone();
 		svc_clone.client_addr = caddr;
+
 		tokio::task::spawn(
 			async move { http1::Builder::new().serve_connection(io, svc_clone).await },
 		);
diff --git a/corgi/src/stats.rs b/corgi/src/stats.rs
index 23e85a0..c407cd9 100644
--- a/corgi/src/stats.rs
+++ b/corgi/src/stats.rs
@@ -3,6 +3,7 @@ use std::{net::IpAddr, path::Path, sync::Mutex};
 use base64::{Engine, prelude::BASE64_STANDARD_NO_PAD};
 use rusqlite::{Connection, OptionalExtension, params};
 use sha2::{Digest, Sha256};
+use time::{Duration, OffsetDateTime, PrimitiveDateTime};
 
 #[derive(Debug)]
 pub struct Stats {
@@ -21,8 +22,30 @@ impl Stats {
 
 		Self::set_wal(&conn);
 
-		conn.execute(CREATE_TABLE_AGENT, ()).unwrap();
-		conn.execute(CREATE_TABLE_REQUESTS, ()).unwrap();
+		// "agents" exists and trigger does not; we need to alter and prime
+		if Self::table_exists(&conn, "agents") && !Self::trigger_exists(&conn, "agent_count") {
+			println!("agents table exists, but needs request_count column. Altering and priming");
+			conn.execute(MIGRATE_AGENTS_ADD_REQUEST_COUNT, ()).unwrap();
+
+			Self::prime_agents_request_count(&conn);
+		} else {
+			conn.execute(CREATE_TABLE_AGENT, ()).unwrap();
+			conn.execute(CREATE_TABLE_REQUESTS, ()).unwrap();
+		}
+
+		conn.execute(CREATE_TRIGGER_COUNT_AGENT, ()).unwrap();
+
+		// Instead of just an IF NOT EXISTS here, we're checking it exists
+		// so we can copy an initial amount of requests from the main table
+		// to the ephemeral table.
+		if !Self::table_exists(&conn, "ephemeral_requests") {
+			println!("ephemeral_requests does not exist. Creating and priming");
+			conn.execute(CREATE_TRIGGER_EPHEMERAL, ()).unwrap();
+			conn.execute(CREATE_TABLE_EPHEMERAL_REQUESTS, ()).unwrap();
+
+			let count = Self::prime_ephemeral_table(&conn);
+			println!("Primed with {count} rows");
+		}
 	}
 
 	fn set_wal(conn: &Connection) {
@@ -38,6 +61,32 @@ impl Stats {
 		}
 	}
 
+	fn table_exists(conn: &Connection, name: &str) -> bool {
+		let exist: Option<String> = conn
+			.query_row(
+				"SELECT name FROM sqlite_schema WHERE type='table' AND name=?1;",
+				params![name],
+				|r| r.get(0),
+			)
+			.optional()
+			.unwrap();
+
+		exist.is_some()
+	}
+
+	fn trigger_exists(conn: &Connection, name: &str) -> bool {
+		let exist: Option<String> = conn
+			.query_row(
+				"SELECT name FROM sqlite_schema WHERE type='trigger' AND name=?1;",
+				params![name],
+				|r| r.get(0),
+			)
+			.optional()
+			.unwrap();
+
+		exist.is_some()
+	}
+
 	pub fn log_request(&self, request: Request) {
 		let Request {
 			agent,
@@ -83,6 +132,61 @@ impl Stats {
 		)
 		.unwrap();
 	}
+
+	/// Small, single line function to return the lower-bound date of ephemeral
+	/// requests.
+	fn ephemeral_lifetime() -> OffsetDateTime {
+		OffsetDateTime::now_utc() - Duration::days(1)
+	}
+
+	pub fn cleanup_ephemeral_requests(&self) -> usize {
+		let lower = Self::ephemeral_lifetime();
+
+		let sql = "DELETE FROM ephemeral_requests WHERE timestamp < ?1;";
+
+		let conn = self.conn.lock().unwrap();
+
+		match conn.execute(sql, params![lower]) {
+			Err(e) => {
+				eprintln!("ERROR failed to run ephemeral clean: {e}");
+				panic!();
+			}
+			Ok(count) => count,
+		}
+	}
+
+	fn prime_ephemeral_table(conn: &Connection) -> usize {
+		let lower = Self::ephemeral_lifetime();
+
+		let sql = "INSERT INTO ephemeral_requests SELECT id, timestamp FROM requests WHERE timestamp > ?1;";
+		match conn.execute(sql, params![lower]) {
+			Err(e) => {
+				eprintln!("ERROR failed to prime ephemeral: {e}");
+				panic!();
+			}
+			Ok(count) => count,
+		}
+	}
+
+	fn prime_agents_request_count(conn: &Connection) {
+		let sql = "SELECT  agent_id, count(id) as count FROM requests GROUP BY agent_id";
+		let mut prepared = conn.prepare(sql).unwrap();
+
+		let counts: Vec<(i64, i64)> = prepared
+			.query_map((), |row| Ok((row.get(0)?, row.get(1)?)))
+			.optional()
+			.unwrap()
+			.map(|iter| iter.map(|e| e.unwrap()).collect())
+			.unwrap();
+
+		for (agent, count) in counts {
+			conn.execute(
+				"UPDATE agents SET request_count = ?1 WHERE id = ?2;",
+				params![count, agent],
+			)
+			.unwrap();
+		}
+	}
 }
 
 pub struct Request<'r> {
@@ -96,9 +200,13 @@ const CREATE_TABLE_AGENT: &'static str = "\
 	CREATE TABLE IF NOT EXISTS agents(
 		id INTEGER PRIMARY KEY AUTOINCREMENT,
 		hash TEXT NOT NULL,
-		agent TEXT NOT NULL
+		agent TEXT NOT NULL,
+		request_count INTEGER NOT NULL DEFAULT 0
 	);";
 
+const MIGRATE_AGENTS_ADD_REQUEST_COUNT: &'static str =
+	"ALTER TABLE agents ADD COLUMN request_count INTEGER NOT NULL DEFAULT 0";
+
 const CREATE_TABLE_REQUESTS: &'static str = "\
 	CREATE TABLE IF NOT EXISTS requests(
 		id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -110,3 +218,23 @@ const CREATE_TABLE_REQUESTS: &'static str = "\
 		FOREIGN KEY (agent_id)
 			REFERENCES agents(id)
 	);";
+
+const CREATE_TRIGGER_EPHEMERAL: &'static str = "\
+	CREATE TRIGGER IF NOT EXISTS requests_copy_ephemeral AFTER INSERT ON requests
+	BEGIN
+		INSERT INTO ephemeral_requests(request_id, timestamp) VALUES(new.id, new.timestamp);
+	END;";
+
+const CREATE_TRIGGER_COUNT_AGENT: &'static str = "\
+	CREATE TRIGGER IF NOT EXISTS agent_count AFTER INSERT ON requests
+	BEGIN
+		UPDATE agents SET request_count = request_count + 1 WHERE agents.id = new.agent_id;
+	END;";
+
+const CREATE_TABLE_EPHEMERAL_REQUESTS: &'static str = "\
+	CREATE TABLE IF NOT EXISTS ephemeral_requests(
+		request_id INTEGER PRIMARY KEY AUTOINCREMENT,
+		timestamp TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
+		FOREIGN KEY (request_id)
+			REFERENCES requests(id)
+	);";