diff options
Diffstat (limited to 'skim/src')
-rw-r--r-- | skim/src/count.rs | 71 | ||||
-rw-r--r-- | skim/src/ethertype.rs | 20 | ||||
-rw-r--r-- | skim/src/interface.rs | 18 | ||||
-rw-r--r-- | skim/src/ippacket.rs | 99 | ||||
-rw-r--r-- | skim/src/layer3.rs | 65 | ||||
-rw-r--r-- | skim/src/main.rs | 215 |
6 files changed, 488 insertions, 0 deletions
diff --git a/skim/src/count.rs b/skim/src/count.rs new file mode 100644 index 0000000..8f133bc --- /dev/null +++ b/skim/src/count.rs @@ -0,0 +1,71 @@ +const U16_MAX: usize = u16::MAX as usize; + +pub struct Count { + tcp_tx: Box<[usize]>, + tcp_rx: Box<[usize]>, + udp_tx: Box<[usize]>, + udp_rx: Box<[usize]>, +} + +impl Count { + pub fn new() -> Self { + Self { + tcp_tx: Box::new([0; U16_MAX]), + tcp_rx: Box::new([0; U16_MAX]), + udp_tx: Box::new([0; U16_MAX]), + udp_rx: Box::new([0; U16_MAX]), + } + } + + pub fn push_tcp(&self, tx_flag: bool, src: u16, dst: u16, len: usize) { + if tx_flag { + let bad_mut = unsafe { &mut *(self.tcp_tx.as_ptr() as *mut [usize; U16_MAX]) }; + bad_mut[src as usize] += len; + } else { + let bad_mut = unsafe { &mut *(self.tcp_rx.as_ptr() as *mut [usize; U16_MAX]) }; + bad_mut[dst as usize] += len; + } + } + + pub fn push_udp(&self, tx_flag: bool, src: u16, dst: u16, len: usize) { + if tx_flag { + let bad_mut = unsafe { &mut *(self.udp_tx.as_ptr() as *mut [usize; U16_MAX]) }; + bad_mut[src as usize] += len; + } else { + let bad_mut = unsafe { &mut *(self.udp_rx.as_ptr() as *mut [usize; U16_MAX]) }; + bad_mut[dst as usize] += len; + } + } + + pub fn tcp_tx_slice(&self) -> &[usize] { + &self.tcp_tx + } + + pub fn tcp_rx_slice(&self) -> &[usize] { + &self.tcp_rx + } + + pub fn udp_tx_slice(&self) -> &[usize] { + &self.udp_tx + } + + pub fn udp_rx_slice(&self) -> &[usize] { + &self.udp_rx + } + + pub fn many_tcp_tx(&self, ports: &[u16]) -> usize { + ports.iter().fold(0, |acc, port| acc + self.tcp_tx[*port as usize]) + } + + pub fn many_tcp_rx(&self, ports: &[u16]) -> usize { + ports.iter().fold(0, |acc, port| acc + self.tcp_rx[*port as usize]) + } + + pub fn many_udp_tx(&self, ports: &[u16]) -> usize { + ports.iter().fold(0, |acc, port| acc + self.udp_tx[*port as usize]) + } + + pub fn many_udp_rx(&self, ports: &[u16]) -> usize { + ports.iter().fold(0, |acc, port| acc + self.udp_rx[*port as usize]) + } +} diff --git a/skim/src/ethertype.rs b/skim/src/ethertype.rs new file mode 100644 index 0000000..0d728f3 --- /dev/null +++ b/skim/src/ethertype.rs @@ -0,0 +1,20 @@ +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum EtherType { + /// The frame type is IEEE 802.3 and this is it's length + Length(u16), + IPv4, + IPv6, + Unknown(u16), +} + +impl EtherType { + //TODO: check ethertype is correct + pub fn new(n: u16) -> Self { + match n { + n if n <= 1500 => Self::Length(n), + 0x0800 => Self::IPv4, + 0x86DD => Self::IPv6, + n => Self::Unknown(n), + } + } +} diff --git a/skim/src/interface.rs b/skim/src/interface.rs new file mode 100644 index 0000000..514896e --- /dev/null +++ b/skim/src/interface.rs @@ -0,0 +1,18 @@ +#[derive(facet::Facet)] +pub struct Query { + pub tcp: Option<Vec<u16>>, + pub udp: Option<Vec<u16>>, +} + +#[derive(facet::Facet)] +pub struct Response { + pub tcp: Vec<PortNetworkPair>, + pub udp: Vec<PortNetworkPair>, +} + +#[derive(facet::Facet)] +pub struct PortNetworkPair { + pub port: u16, + pub tx: usize, + pub rx: usize, +} diff --git a/skim/src/ippacket.rs b/skim/src/ippacket.rs new file mode 100644 index 0000000..60858d2 --- /dev/null +++ b/skim/src/ippacket.rs @@ -0,0 +1,99 @@ +use std::net::Ipv4Addr; + +pub struct Ipv4Packet<'p> { + /// The computed size of the IP packet + header_length: usize, + /// The total length of the IP packet when all fragments are combined + total_length: u16, + more_fragments: bool, + fragment_offset: usize, + next_header: IpNextHeader, + pub src: Ipv4Addr, + pub dst: Ipv4Addr, + payload: &'p [u8], +} + +impl<'p> Ipv4Packet<'p> { + pub fn new(buffer: &'p [u8]) -> Self { + let ihl = buffer[0].to_be() & 0b0000_1111; + let header_length = ihl as usize * 4; + + let total_length = u16::from_be_bytes([buffer[2], buffer[3]]); + + // Fragmentation + let more_fragments = (buffer[6] & 0b0010_0000) > 0; + let fragment_offset = u16::from_be_bytes([buffer[6] & 0b0001_1111, buffer[7]]); + // Fragments are in units of 8 bytes + let true_frag_offset = fragment_offset as usize * 8; + + let next_header = IpNextHeader::new(buffer[9]); + + let src = Ipv4Addr::new(buffer[12], buffer[13], buffer[14], buffer[15]); + let dst = Ipv4Addr::new(buffer[16], buffer[17], buffer[18], buffer[19]); + + Self { + header_length, + total_length, + more_fragments, + fragment_offset: true_frag_offset, + next_header, + src, + dst, + payload: &buffer[header_length..], + } + } + + pub fn get_source(&self) -> Ipv4Addr { + self.src + } + + pub fn get_destination(&self) -> Ipv4Addr { + self.dst + } + + pub fn has_more_fragments(&self) -> bool { + self.more_fragments + } + + pub fn get_fragment_offset(&self) -> usize { + self.fragment_offset + } + + pub fn get_next_header(&self) -> IpNextHeader { + self.next_header + } + + pub fn get_payload(&self) -> &[u8] { + &self.payload + } + + pub fn get_packet_len(&self) -> usize { + self.total_length as usize + } + + pub fn get_payload_len(&self) -> usize { + // An IPv4 header is always 20 bytes long + self.total_length as usize - 20 + } +} + +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum IpNextHeader { + Icmp, + Igmp, + Tcp, + Udp, + Unknown(u8), +} + +impl IpNextHeader { + pub fn new(n: u8) -> Self { + match n { + 1 => Self::Icmp, + 2 => Self::Igmp, + 6 => Self::Tcp, + 17 => Self::Udp, + n => Self::Unknown(n), + } + } +} diff --git a/skim/src/layer3.rs b/skim/src/layer3.rs new file mode 100644 index 0000000..7521c0d --- /dev/null +++ b/skim/src/layer3.rs @@ -0,0 +1,65 @@ +pub struct Tcp<'p> { + src: u16, + dst: u16, + payload: &'p [u8], +} + +impl<'p> Tcp<'p> { + pub fn new(buffer: &'p [u8]) -> Self { + let src = u16::from_be_bytes([buffer[0], buffer[1]]); + let dst = u16::from_be_bytes([buffer[2], buffer[3]]); + let data_offset = (buffer[12].to_be() & 0b1111_0000) >> 4; + // Offset is number of 32-bit words + let true_offset = data_offset as usize * 4; + + Self { + src, + dst, + payload: &buffer[true_offset..], + } + } + + pub fn source_port(&self) -> u16 { + self.src + } + + pub fn destination_port(&self) -> u16 { + self.dst + } + + pub fn payload(&self) -> &[u8] { + self.payload + } +} + +pub struct Udp<'p> { + src: u16, + dst: u16, + payload: &'p [u8], +} + +impl<'p> Udp<'p> { + pub fn new(buffer: &'p [u8]) -> Self { + let src = u16::from_be_bytes([buffer[0], buffer[1]]); + let dst = u16::from_be_bytes([buffer[2], buffer[3]]); + let _len = u16::from_be_bytes([buffer[4], buffer[5]]); + + Self { + src, + dst, + payload: &buffer[8..], + } + } + + pub fn source_port(&self) -> u16 { + self.src + } + + pub fn destination_port(&self) -> u16 { + self.dst + } + + pub fn payload(&self) -> &[u8] { + self.payload + } +} diff --git a/skim/src/main.rs b/skim/src/main.rs new file mode 100644 index 0000000..4a82106 --- /dev/null +++ b/skim/src/main.rs @@ -0,0 +1,215 @@ +use std::{cell::UnsafeCell, error::Error, net::Ipv4Addr, rc::Rc, thread::JoinHandle}; + +use count::Count; +use ethertype::EtherType; +use interface::{PortNetworkPair, Query, Response}; +use ippacket::{IpNextHeader, Ipv4Packet}; +use layer3::{Tcp, Udp}; +use pnet_datalink::{Channel, Config}; +use scurvy::{Argument, Scurvy}; +use smol::{ + LocalExecutor, + channel::{self, Receiver, Sender}, + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, +}; + +mod count; +mod ethertype; +mod interface; +mod ippacket; +mod layer3; + +fn main() -> Result<(), Box<dyn Error>> { + let args = vec![ + Argument::new(&["interface", "iface"]).arg("dev"), + Argument::new("ip").arg("addr"), + ]; + let scurvy = Scurvy::make(args); + + let interface_name = scurvy.get("interface").unwrap(); + let interface = match pnet_datalink::interfaces().iter().find(|i| i.name == interface_name) { + None => { + eprintln!("No interface found named '{interface_name}'"); + return Ok(()); + } + Some(i) => i.clone(), + }; + + let ip_want: Ipv4Addr = scurvy.get("ip").unwrap().parse()?; + + let mut channel = match pnet_datalink::channel(&interface, Config::default())? { + Channel::Ethernet(_tx, rx) => rx, + _ => unimplemented!(), + }; + + let stat = stat_thread(); + + loop { + let pkt = channel.next()?; + + let ethertype = EtherType::new(u16::from_be_bytes([pkt[12], pkt[13]])); + let eth_payload = &pkt[14..]; + + match ethertype { + EtherType::IPv4 => { + let ip = Ipv4Packet::new(eth_payload); + let ip_payload = ip.get_payload(); + + let ip_tx = if ip.src == ip_want { + true + } else if ip.dst == ip_want { + false + } else { + continue; + }; + + // 6 byte per MAC (x2), 2 byte ethertype, 2 byte crc + let total_l2_len = ip.get_packet_len() + 18; + + match ip.get_next_header() { + IpNextHeader::Tcp => { + let tcp = Tcp::new(ip_payload); + + stat.tx.send_blocking(Meow::Tcp { + tx: ip_tx, + src: tcp.source_port(), + dst: tcp.destination_port(), + len: total_l2_len, + })?; + } + IpNextHeader::Udp => { + let udp = Udp::new(ip_payload); + + stat.tx.send_blocking(Meow::Udp { + tx: ip_tx, + src: udp.source_port(), + dst: udp.destination_port(), + len: total_l2_len, + })?; + } + _ => (), + } + } + _ => (), + } + } +} + +#[allow(dead_code)] +struct StatHandle { + hwnd: JoinHandle<()>, + tx: Sender<Meow>, +} + +enum Meow { + Tcp { + tx: bool, + src: u16, + dst: u16, + len: usize, + }, + Udp { + tx: bool, + src: u16, + dst: u16, + len: usize, + }, +} + +fn stat_thread() -> StatHandle { + let (tx, rx) = channel::unbounded(); + + let hwnd = std::thread::spawn(|| stat(rx)); + + StatHandle { hwnd, tx } +} + +fn stat(rx: Receiver<Meow>) { + println!("started stat thread"); + let cat = Rc::new(Count::new()); + let ex = Rc::new(LocalExecutor::new()); + + let mut adder_cat = Rc::clone(&cat); + ex.spawn(async move { + println!("ready to receive events"); + loop { + match rx.recv().await { + Err(_e) => { + eprintln!("error receiving! breaking from loop"); + break; + } + Ok(Meow::Tcp { tx, src, dst, len }) => { + adder_cat.push_tcp(tx, src, dst, len); + } + Ok(Meow::Udp { tx, src, dst, len }) => { + adder_cat.push_udp(tx, src, dst, len); + } + } + } + }) + .detach(); + + let run_cat = Rc::clone(&cat); + let run_ex = Rc::clone(&ex); + let run = ex.run(async move { + let socket = TcpListener::bind("127.0.0.1:6666").await.unwrap(); + + println!("listening on 127.0.0.1:6666"); + loop { + let (mut stream, _caddr) = socket.accept().await.unwrap(); + let per_spawn_cat = Rc::clone(&run_cat); + + run_ex + .spawn(async move { + let mut size_bytes = [0u8; 4]; + stream.read_exact(&mut size_bytes).await.unwrap(); + let size = u32::from_be_bytes(size_bytes); + + let mut json_raw = vec![0u8; size as usize]; + stream.read_exact(&mut json_raw).await.unwrap(); + + let json_str = String::from_utf8(json_raw).unwrap(); + let json: Query = facet_json::from_str(&json_str).unwrap(); + + let mut resp = Response { + tcp: vec![], + udp: vec![], + }; + + if let Some(tcp_ports) = json.tcp { + let tcp_tx = per_spawn_cat.tcp_tx_slice(); + let tcp_rx = per_spawn_cat.tcp_rx_slice(); + + tcp_ports.into_iter().for_each(|p| { + resp.tcp.push(PortNetworkPair { + port: p, + tx: tcp_tx[p as usize], + rx: tcp_rx[p as usize], + }); + }); + } + + if let Some(udp_ports) = json.udp { + let udp_tx = per_spawn_cat.udp_tx_slice(); + let udp_rx = per_spawn_cat.udp_rx_slice(); + + udp_ports.into_iter().for_each(|p| { + resp.tcp.push(PortNetworkPair { + port: p, + tx: udp_tx[p as usize], + rx: udp_rx[p as usize], + }); + }); + } + + let response_json = facet_json::to_string(&resp); + stream.write_all(&(response_json.len() as u32).to_be_bytes()).await.unwrap(); + stream.write_all(response_json.as_bytes()).await.unwrap(); + }) + .detach(); + } + }); + + smol::block_on(run); +} |