diff options
Diffstat (limited to 'corgi/src')
-rw-r--r-- | corgi/src/main.rs | 29 | ||||
-rw-r--r-- | corgi/src/stats.rs | 134 |
2 files changed, 159 insertions, 4 deletions
diff --git a/corgi/src/main.rs b/corgi/src/main.rs index 1772d68..1192c4c 100644 --- a/corgi/src/main.rs +++ b/corgi/src/main.rs @@ -1,5 +1,10 @@ use core::fmt; -use std::{net::SocketAddr, pin::Pin, sync::Arc}; +use std::{ + net::SocketAddr, + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; use caller::HttpRequest; use http_body_util::{BodyExt, Full}; @@ -40,11 +45,33 @@ async fn run(settings: Settings, stats: Stats) { client_addr: addr, }; + let mut last_clean = None; + loop { + // Clean at the top so we do it once on boot, but keep out of the + // flow of the request to keep it speedy. This will delay accepting + // a new connection when the clean actually runs, but that is fine. + match last_clean { + None => { + let count = svc.stats.cleanup_ephemeral_requests(); + println!("cleaned {count} requests from the ephemeral table"); + last_clean = Some(Instant::now()); + } + Some(inst) if inst.elapsed() >= Duration::from_secs(60 * 60) => { + let count = svc.stats.cleanup_ephemeral_requests(); + println!("cleaned {count} requests from the ephemeral table"); + last_clean = Some(Instant::now()); + } + _ => (), + } + + // Now we accept the connection and spawn a handler let (stream, caddr) = listen.accept().await.unwrap(); let io = TokioIo::new(stream); + let mut svc_clone = svc.clone(); svc_clone.client_addr = caddr; + tokio::task::spawn( async move { http1::Builder::new().serve_connection(io, svc_clone).await }, ); diff --git a/corgi/src/stats.rs b/corgi/src/stats.rs index 23e85a0..c407cd9 100644 --- a/corgi/src/stats.rs +++ b/corgi/src/stats.rs @@ -3,6 +3,7 @@ use std::{net::IpAddr, path::Path, sync::Mutex}; use base64::{Engine, prelude::BASE64_STANDARD_NO_PAD}; use rusqlite::{Connection, OptionalExtension, params}; use sha2::{Digest, Sha256}; +use time::{Duration, OffsetDateTime, PrimitiveDateTime}; #[derive(Debug)] pub struct Stats { @@ -21,8 +22,30 @@ impl Stats { Self::set_wal(&conn); - conn.execute(CREATE_TABLE_AGENT, ()).unwrap(); - conn.execute(CREATE_TABLE_REQUESTS, ()).unwrap(); + // "agents" exists and trigger does not; we need to alter and prime + if Self::table_exists(&conn, "agents") && !Self::trigger_exists(&conn, "agent_count") { + println!("agents table exists, but needs request_count column. Altering and priming"); + conn.execute(MIGRATE_AGENTS_ADD_REQUEST_COUNT, ()).unwrap(); + + Self::prime_agents_request_count(&conn); + } else { + conn.execute(CREATE_TABLE_AGENT, ()).unwrap(); + conn.execute(CREATE_TABLE_REQUESTS, ()).unwrap(); + } + + conn.execute(CREATE_TRIGGER_COUNT_AGENT, ()).unwrap(); + + // Instead of just an IF NOT EXISTS here, we're checking it exists + // so we can copy an initial amount of requests from the main table + // to the ephemeral table. + if !Self::table_exists(&conn, "ephemeral_requests") { + println!("ephemeral_requests does not exist. Creating and priming"); + conn.execute(CREATE_TRIGGER_EPHEMERAL, ()).unwrap(); + conn.execute(CREATE_TABLE_EPHEMERAL_REQUESTS, ()).unwrap(); + + let count = Self::prime_ephemeral_table(&conn); + println!("Primed with {count} rows"); + } } fn set_wal(conn: &Connection) { @@ -38,6 +61,32 @@ impl Stats { } } + fn table_exists(conn: &Connection, name: &str) -> bool { + let exist: Option<String> = conn + .query_row( + "SELECT name FROM sqlite_schema WHERE type='table' AND name=?1;", + params![name], + |r| r.get(0), + ) + .optional() + .unwrap(); + + exist.is_some() + } + + fn trigger_exists(conn: &Connection, name: &str) -> bool { + let exist: Option<String> = conn + .query_row( + "SELECT name FROM sqlite_schema WHERE type='trigger' AND name=?1;", + params![name], + |r| r.get(0), + ) + .optional() + .unwrap(); + + exist.is_some() + } + pub fn log_request(&self, request: Request) { let Request { agent, @@ -83,6 +132,61 @@ impl Stats { ) .unwrap(); } + + /// Small, single line function to return the lower-bound date of ephemeral + /// requests. + fn ephemeral_lifetime() -> OffsetDateTime { + OffsetDateTime::now_utc() - Duration::days(1) + } + + pub fn cleanup_ephemeral_requests(&self) -> usize { + let lower = Self::ephemeral_lifetime(); + + let sql = "DELETE FROM ephemeral_requests WHERE timestamp < ?1;"; + + let conn = self.conn.lock().unwrap(); + + match conn.execute(sql, params![lower]) { + Err(e) => { + eprintln!("ERROR failed to run ephemeral clean: {e}"); + panic!(); + } + Ok(count) => count, + } + } + + fn prime_ephemeral_table(conn: &Connection) -> usize { + let lower = Self::ephemeral_lifetime(); + + let sql = "INSERT INTO ephemeral_requests SELECT id, timestamp FROM requests WHERE timestamp > ?1;"; + match conn.execute(sql, params![lower]) { + Err(e) => { + eprintln!("ERROR failed to prime ephemeral: {e}"); + panic!(); + } + Ok(count) => count, + } + } + + fn prime_agents_request_count(conn: &Connection) { + let sql = "SELECT agent_id, count(id) as count FROM requests GROUP BY agent_id"; + let mut prepared = conn.prepare(sql).unwrap(); + + let counts: Vec<(i64, i64)> = prepared + .query_map((), |row| Ok((row.get(0)?, row.get(1)?))) + .optional() + .unwrap() + .map(|iter| iter.map(|e| e.unwrap()).collect()) + .unwrap(); + + for (agent, count) in counts { + conn.execute( + "UPDATE agents SET request_count = ?1 WHERE id = ?2;", + params![count, agent], + ) + .unwrap(); + } + } } pub struct Request<'r> { @@ -96,9 +200,13 @@ const CREATE_TABLE_AGENT: &'static str = "\ CREATE TABLE IF NOT EXISTS agents( id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT NOT NULL, - agent TEXT NOT NULL + agent TEXT NOT NULL, + request_count INTEGER NOT NULL DEFAULT 0 );"; +const MIGRATE_AGENTS_ADD_REQUEST_COUNT: &'static str = + "ALTER TABLE agents ADD COLUMN request_count INTEGER NOT NULL DEFAULT 0"; + const CREATE_TABLE_REQUESTS: &'static str = "\ CREATE TABLE IF NOT EXISTS requests( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -110,3 +218,23 @@ const CREATE_TABLE_REQUESTS: &'static str = "\ FOREIGN KEY (agent_id) REFERENCES agents(id) );"; + +const CREATE_TRIGGER_EPHEMERAL: &'static str = "\ + CREATE TRIGGER IF NOT EXISTS requests_copy_ephemeral AFTER INSERT ON requests + BEGIN + INSERT INTO ephemeral_requests(request_id, timestamp) VALUES(new.id, new.timestamp); + END;"; + +const CREATE_TRIGGER_COUNT_AGENT: &'static str = "\ + CREATE TRIGGER IF NOT EXISTS agent_count AFTER INSERT ON requests + BEGIN + UPDATE agents SET request_count = request_count + 1 WHERE agents.id = new.agent_id; + END;"; + +const CREATE_TABLE_EPHEMERAL_REQUESTS: &'static str = "\ + CREATE TABLE IF NOT EXISTS ephemeral_requests( + request_id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (request_id) + REFERENCES requests(id) + );"; |