about summary refs log tree commit diff
path: root/skim/src
diff options
context:
space:
mode:
Diffstat (limited to 'skim/src')
-rw-r--r--skim/src/count.rs71
-rw-r--r--skim/src/ethertype.rs20
-rw-r--r--skim/src/interface.rs18
-rw-r--r--skim/src/ippacket.rs99
-rw-r--r--skim/src/layer3.rs65
-rw-r--r--skim/src/main.rs215
6 files changed, 488 insertions, 0 deletions
diff --git a/skim/src/count.rs b/skim/src/count.rs
new file mode 100644
index 0000000..8f133bc
--- /dev/null
+++ b/skim/src/count.rs
@@ -0,0 +1,71 @@
+const U16_MAX: usize = u16::MAX as usize;
+
+pub struct Count {
+	tcp_tx: Box<[usize]>,
+	tcp_rx: Box<[usize]>,
+	udp_tx: Box<[usize]>,
+	udp_rx: Box<[usize]>,
+}
+
+impl Count {
+	pub fn new() -> Self {
+		Self {
+			tcp_tx: Box::new([0; U16_MAX]),
+			tcp_rx: Box::new([0; U16_MAX]),
+			udp_tx: Box::new([0; U16_MAX]),
+			udp_rx: Box::new([0; U16_MAX]),
+		}
+	}
+
+	pub fn push_tcp(&self, tx_flag: bool, src: u16, dst: u16, len: usize) {
+		if tx_flag {
+			let bad_mut = unsafe { &mut *(self.tcp_tx.as_ptr() as *mut [usize; U16_MAX]) };
+			bad_mut[src as usize] += len;
+		} else {
+			let bad_mut = unsafe { &mut *(self.tcp_rx.as_ptr() as *mut [usize; U16_MAX]) };
+			bad_mut[dst as usize] += len;
+		}
+	}
+
+	pub fn push_udp(&self, tx_flag: bool, src: u16, dst: u16, len: usize) {
+		if tx_flag {
+			let bad_mut = unsafe { &mut *(self.udp_tx.as_ptr() as *mut [usize; U16_MAX]) };
+			bad_mut[src as usize] += len;
+		} else {
+			let bad_mut = unsafe { &mut *(self.udp_rx.as_ptr() as *mut [usize; U16_MAX]) };
+			bad_mut[dst as usize] += len;
+		}
+	}
+
+	pub fn tcp_tx_slice(&self) -> &[usize] {
+		&self.tcp_tx
+	}
+
+	pub fn tcp_rx_slice(&self) -> &[usize] {
+		&self.tcp_rx
+	}
+
+	pub fn udp_tx_slice(&self) -> &[usize] {
+		&self.udp_tx
+	}
+
+	pub fn udp_rx_slice(&self) -> &[usize] {
+		&self.udp_rx
+	}
+
+	pub fn many_tcp_tx(&self, ports: &[u16]) -> usize {
+		ports.iter().fold(0, |acc, port| acc + self.tcp_tx[*port as usize])
+	}
+
+	pub fn many_tcp_rx(&self, ports: &[u16]) -> usize {
+		ports.iter().fold(0, |acc, port| acc + self.tcp_rx[*port as usize])
+	}
+
+	pub fn many_udp_tx(&self, ports: &[u16]) -> usize {
+		ports.iter().fold(0, |acc, port| acc + self.udp_tx[*port as usize])
+	}
+
+	pub fn many_udp_rx(&self, ports: &[u16]) -> usize {
+		ports.iter().fold(0, |acc, port| acc + self.udp_rx[*port as usize])
+	}
+}
diff --git a/skim/src/ethertype.rs b/skim/src/ethertype.rs
new file mode 100644
index 0000000..0d728f3
--- /dev/null
+++ b/skim/src/ethertype.rs
@@ -0,0 +1,20 @@
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum EtherType {
+	/// The frame type is IEEE 802.3 and this is it's length
+	Length(u16),
+	IPv4,
+	IPv6,
+	Unknown(u16),
+}
+
+impl EtherType {
+	//TODO: check ethertype is correct
+	pub fn new(n: u16) -> Self {
+		match n {
+			n if n <= 1500 => Self::Length(n),
+			0x0800 => Self::IPv4,
+			0x86DD => Self::IPv6,
+			n => Self::Unknown(n),
+		}
+	}
+}
diff --git a/skim/src/interface.rs b/skim/src/interface.rs
new file mode 100644
index 0000000..514896e
--- /dev/null
+++ b/skim/src/interface.rs
@@ -0,0 +1,18 @@
+#[derive(facet::Facet)]
+pub struct Query {
+	pub tcp: Option<Vec<u16>>,
+	pub udp: Option<Vec<u16>>,
+}
+
+#[derive(facet::Facet)]
+pub struct Response {
+	pub tcp: Vec<PortNetworkPair>,
+	pub udp: Vec<PortNetworkPair>,
+}
+
+#[derive(facet::Facet)]
+pub struct PortNetworkPair {
+	pub port: u16,
+	pub tx: usize,
+	pub rx: usize,
+}
diff --git a/skim/src/ippacket.rs b/skim/src/ippacket.rs
new file mode 100644
index 0000000..60858d2
--- /dev/null
+++ b/skim/src/ippacket.rs
@@ -0,0 +1,99 @@
+use std::net::Ipv4Addr;
+
+pub struct Ipv4Packet<'p> {
+	/// The computed size of the IP packet
+	header_length: usize,
+	/// The total length of the IP packet when all fragments are combined
+	total_length: u16,
+	more_fragments: bool,
+	fragment_offset: usize,
+	next_header: IpNextHeader,
+	pub src: Ipv4Addr,
+	pub dst: Ipv4Addr,
+	payload: &'p [u8],
+}
+
+impl<'p> Ipv4Packet<'p> {
+	pub fn new(buffer: &'p [u8]) -> Self {
+		let ihl = buffer[0].to_be() & 0b0000_1111;
+		let header_length = ihl as usize * 4;
+
+		let total_length = u16::from_be_bytes([buffer[2], buffer[3]]);
+
+		// Fragmentation
+		let more_fragments = (buffer[6] & 0b0010_0000) > 0;
+		let fragment_offset = u16::from_be_bytes([buffer[6] & 0b0001_1111, buffer[7]]);
+		// Fragments are in units of 8 bytes
+		let true_frag_offset = fragment_offset as usize * 8;
+
+		let next_header = IpNextHeader::new(buffer[9]);
+
+		let src = Ipv4Addr::new(buffer[12], buffer[13], buffer[14], buffer[15]);
+		let dst = Ipv4Addr::new(buffer[16], buffer[17], buffer[18], buffer[19]);
+
+		Self {
+			header_length,
+			total_length,
+			more_fragments,
+			fragment_offset: true_frag_offset,
+			next_header,
+			src,
+			dst,
+			payload: &buffer[header_length..],
+		}
+	}
+
+	pub fn get_source(&self) -> Ipv4Addr {
+		self.src
+	}
+
+	pub fn get_destination(&self) -> Ipv4Addr {
+		self.dst
+	}
+
+	pub fn has_more_fragments(&self) -> bool {
+		self.more_fragments
+	}
+
+	pub fn get_fragment_offset(&self) -> usize {
+		self.fragment_offset
+	}
+
+	pub fn get_next_header(&self) -> IpNextHeader {
+		self.next_header
+	}
+
+	pub fn get_payload(&self) -> &[u8] {
+		&self.payload
+	}
+
+	pub fn get_packet_len(&self) -> usize {
+		self.total_length as usize
+	}
+
+	pub fn get_payload_len(&self) -> usize {
+		// An IPv4 header is always 20 bytes long
+		self.total_length as usize - 20
+	}
+}
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum IpNextHeader {
+	Icmp,
+	Igmp,
+	Tcp,
+	Udp,
+	Unknown(u8),
+}
+
+impl IpNextHeader {
+	pub fn new(n: u8) -> Self {
+		match n {
+			1 => Self::Icmp,
+			2 => Self::Igmp,
+			6 => Self::Tcp,
+			17 => Self::Udp,
+			n => Self::Unknown(n),
+		}
+	}
+}
diff --git a/skim/src/layer3.rs b/skim/src/layer3.rs
new file mode 100644
index 0000000..7521c0d
--- /dev/null
+++ b/skim/src/layer3.rs
@@ -0,0 +1,65 @@
+pub struct Tcp<'p> {
+	src: u16,
+	dst: u16,
+	payload: &'p [u8],
+}
+
+impl<'p> Tcp<'p> {
+	pub fn new(buffer: &'p [u8]) -> Self {
+		let src = u16::from_be_bytes([buffer[0], buffer[1]]);
+		let dst = u16::from_be_bytes([buffer[2], buffer[3]]);
+		let data_offset = (buffer[12].to_be() & 0b1111_0000) >> 4;
+		// Offset is number of 32-bit words
+		let true_offset = data_offset as usize * 4;
+
+		Self {
+			src,
+			dst,
+			payload: &buffer[true_offset..],
+		}
+	}
+
+	pub fn source_port(&self) -> u16 {
+		self.src
+	}
+
+	pub fn destination_port(&self) -> u16 {
+		self.dst
+	}
+
+	pub fn payload(&self) -> &[u8] {
+		self.payload
+	}
+}
+
+pub struct Udp<'p> {
+	src: u16,
+	dst: u16,
+	payload: &'p [u8],
+}
+
+impl<'p> Udp<'p> {
+	pub fn new(buffer: &'p [u8]) -> Self {
+		let src = u16::from_be_bytes([buffer[0], buffer[1]]);
+		let dst = u16::from_be_bytes([buffer[2], buffer[3]]);
+		let _len = u16::from_be_bytes([buffer[4], buffer[5]]);
+
+		Self {
+			src,
+			dst,
+			payload: &buffer[8..],
+		}
+	}
+
+	pub fn source_port(&self) -> u16 {
+		self.src
+	}
+
+	pub fn destination_port(&self) -> u16 {
+		self.dst
+	}
+
+	pub fn payload(&self) -> &[u8] {
+		self.payload
+	}
+}
diff --git a/skim/src/main.rs b/skim/src/main.rs
new file mode 100644
index 0000000..4a82106
--- /dev/null
+++ b/skim/src/main.rs
@@ -0,0 +1,215 @@
+use std::{cell::UnsafeCell, error::Error, net::Ipv4Addr, rc::Rc, thread::JoinHandle};
+
+use count::Count;
+use ethertype::EtherType;
+use interface::{PortNetworkPair, Query, Response};
+use ippacket::{IpNextHeader, Ipv4Packet};
+use layer3::{Tcp, Udp};
+use pnet_datalink::{Channel, Config};
+use scurvy::{Argument, Scurvy};
+use smol::{
+	LocalExecutor,
+	channel::{self, Receiver, Sender},
+	io::{AsyncReadExt, AsyncWriteExt},
+	net::TcpListener,
+};
+
+mod count;
+mod ethertype;
+mod interface;
+mod ippacket;
+mod layer3;
+
+fn main() -> Result<(), Box<dyn Error>> {
+	let args = vec![
+		Argument::new(&["interface", "iface"]).arg("dev"),
+		Argument::new("ip").arg("addr"),
+	];
+	let scurvy = Scurvy::make(args);
+
+	let interface_name = scurvy.get("interface").unwrap();
+	let interface = match pnet_datalink::interfaces().iter().find(|i| i.name == interface_name) {
+		None => {
+			eprintln!("No interface found named '{interface_name}'");
+			return Ok(());
+		}
+		Some(i) => i.clone(),
+	};
+
+	let ip_want: Ipv4Addr = scurvy.get("ip").unwrap().parse()?;
+
+	let mut channel = match pnet_datalink::channel(&interface, Config::default())? {
+		Channel::Ethernet(_tx, rx) => rx,
+		_ => unimplemented!(),
+	};
+
+	let stat = stat_thread();
+
+	loop {
+		let pkt = channel.next()?;
+
+		let ethertype = EtherType::new(u16::from_be_bytes([pkt[12], pkt[13]]));
+		let eth_payload = &pkt[14..];
+
+		match ethertype {
+			EtherType::IPv4 => {
+				let ip = Ipv4Packet::new(eth_payload);
+				let ip_payload = ip.get_payload();
+
+				let ip_tx = if ip.src == ip_want {
+					true
+				} else if ip.dst == ip_want {
+					false
+				} else {
+					continue;
+				};
+
+				// 6 byte per MAC (x2), 2 byte ethertype, 2 byte crc
+				let total_l2_len = ip.get_packet_len() + 18;
+
+				match ip.get_next_header() {
+					IpNextHeader::Tcp => {
+						let tcp = Tcp::new(ip_payload);
+
+						stat.tx.send_blocking(Meow::Tcp {
+							tx: ip_tx,
+							src: tcp.source_port(),
+							dst: tcp.destination_port(),
+							len: total_l2_len,
+						})?;
+					}
+					IpNextHeader::Udp => {
+						let udp = Udp::new(ip_payload);
+
+						stat.tx.send_blocking(Meow::Udp {
+							tx: ip_tx,
+							src: udp.source_port(),
+							dst: udp.destination_port(),
+							len: total_l2_len,
+						})?;
+					}
+					_ => (),
+				}
+			}
+			_ => (),
+		}
+	}
+}
+
+#[allow(dead_code)]
+struct StatHandle {
+	hwnd: JoinHandle<()>,
+	tx: Sender<Meow>,
+}
+
+enum Meow {
+	Tcp {
+		tx: bool,
+		src: u16,
+		dst: u16,
+		len: usize,
+	},
+	Udp {
+		tx: bool,
+		src: u16,
+		dst: u16,
+		len: usize,
+	},
+}
+
+fn stat_thread() -> StatHandle {
+	let (tx, rx) = channel::unbounded();
+
+	let hwnd = std::thread::spawn(|| stat(rx));
+
+	StatHandle { hwnd, tx }
+}
+
+fn stat(rx: Receiver<Meow>) {
+	println!("started stat thread");
+	let cat = Rc::new(Count::new());
+	let ex = Rc::new(LocalExecutor::new());
+
+	let mut adder_cat = Rc::clone(&cat);
+	ex.spawn(async move {
+		println!("ready to receive events");
+		loop {
+			match rx.recv().await {
+				Err(_e) => {
+					eprintln!("error receiving! breaking from loop");
+					break;
+				}
+				Ok(Meow::Tcp { tx, src, dst, len }) => {
+					adder_cat.push_tcp(tx, src, dst, len);
+				}
+				Ok(Meow::Udp { tx, src, dst, len }) => {
+					adder_cat.push_udp(tx, src, dst, len);
+				}
+			}
+		}
+	})
+	.detach();
+
+	let run_cat = Rc::clone(&cat);
+	let run_ex = Rc::clone(&ex);
+	let run = ex.run(async move {
+		let socket = TcpListener::bind("127.0.0.1:6666").await.unwrap();
+
+		println!("listening on 127.0.0.1:6666");
+		loop {
+			let (mut stream, _caddr) = socket.accept().await.unwrap();
+			let per_spawn_cat = Rc::clone(&run_cat);
+
+			run_ex
+				.spawn(async move {
+					let mut size_bytes = [0u8; 4];
+					stream.read_exact(&mut size_bytes).await.unwrap();
+					let size = u32::from_be_bytes(size_bytes);
+
+					let mut json_raw = vec![0u8; size as usize];
+					stream.read_exact(&mut json_raw).await.unwrap();
+
+					let json_str = String::from_utf8(json_raw).unwrap();
+					let json: Query = facet_json::from_str(&json_str).unwrap();
+
+					let mut resp = Response {
+						tcp: vec![],
+						udp: vec![],
+					};
+
+					if let Some(tcp_ports) = json.tcp {
+						let tcp_tx = per_spawn_cat.tcp_tx_slice();
+						let tcp_rx = per_spawn_cat.tcp_rx_slice();
+
+						tcp_ports.into_iter().for_each(|p| {
+							resp.tcp.push(PortNetworkPair {
+								port: p,
+								tx: tcp_tx[p as usize],
+								rx: tcp_rx[p as usize],
+							});
+						});
+					}
+
+					if let Some(udp_ports) = json.udp {
+						let udp_tx = per_spawn_cat.udp_tx_slice();
+						let udp_rx = per_spawn_cat.udp_rx_slice();
+
+						udp_ports.into_iter().for_each(|p| {
+							resp.tcp.push(PortNetworkPair {
+								port: p,
+								tx: udp_tx[p as usize],
+								rx: udp_rx[p as usize],
+							});
+						});
+					}
+
+					let response_json = facet_json::to_string(&resp);
+					stream.write_all(&(response_json.len() as u32).to_be_bytes()).await.unwrap();
+					stream.write_all(response_json.as_bytes()).await.unwrap();
+				})
+				.detach();
+		}
+	});
+
+	smol::block_on(run);
+}