-
Notifications
You must be signed in to change notification settings - Fork 21
Add UDP Packet Mode endpoint #82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
1644b6d
6a9a2a0
c13c127
51328fa
3d05550
77f0261
6f59da9
e2b5208
ea826cf
9cc591b
f58e082
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -159,6 +159,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { | |||||
| log::info!("#{number2} UDP closed, session count {c}"); | ||||||
| }); | ||||||
| } | ||||||
|
|
||||||
| IpStackStream::UdpEdp(mut endpoint) => { | ||||||
| let c = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; | ||||||
| let number2 = number; | ||||||
| log::info!("#{number2} UDP Packet Endpoint starting, session count {c}"); | ||||||
|
|
||||||
| tokio::spawn(async move { | ||||||
| loop { | ||||||
| tokio::select! { | ||||||
| res = endpoint.recv() => { | ||||||
| match res { | ||||||
| Some((_src_addr, _dst_addr, _payload)) => { | ||||||
|
|
||||||
|
|
||||||
| } | ||||||
| None => { | ||||||
| log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭"); | ||||||
|
||||||
| log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭"); | |
| log::info!("#{number2} UDP Packet Endpoint underlying channel has been closed"); |
Copilot
AI
Mar 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling/grammar in the (commented) log line: "faild" should be "failed" (and "shut down" is typically two words). Even though it’s commented out, it’s part of the example users will copy/paste.
| // log::warn!("#{number2} faild to send packet: {}", e); | |
| // log::warn!("#{number2} failed to send packet: {}", e); |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -85,6 +85,43 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { | |||||
| println!("==== end UDP connection ===="); | ||||||
| }); | ||||||
| } | ||||||
| IpStackStream::UdpEdp(mut endpoint) => { | ||||||
| tokio::spawn(async move { | ||||||
| loop { | ||||||
| tokio::select! { | ||||||
| res = endpoint.recv() => { | ||||||
| match res { | ||||||
| Some((_src_addr, _dst_addr, _payload)) => { | ||||||
| //your logic to process the packet | ||||||
| } | ||||||
| None => { | ||||||
| log::info!(" UDP Packet Endpoint the channel have been shutdown"); | ||||||
|
Uxx0 marked this conversation as resolved.
Outdated
|
||||||
| break; | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| // res = app.readpacket() => { | ||||||
| // match res { | ||||||
| // Ok(Some((remote_player_addr, my_local_addr, payload))) => { | ||||||
| // log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len()); | ||||||
|
|
||||||
| // | ||||||
| // if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) { | ||||||
| // log::warn!("#{number2} faild to send packet: {}", e); | ||||||
|
||||||
| // log::warn!("#{number2} faild to send packet: {}", e); | |
| // log::warn!("#{number2} failed to send packet: {}", e); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| use ahash::AHashMap; | ||
| use packet::{NetworkPacket, NetworkTuple, TransportHeader}; | ||
| use std::net::SocketAddr; | ||
| use std::{sync::Arc, time::Duration}; | ||
| use tokio::{ | ||
| io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, | ||
|
|
@@ -10,6 +11,9 @@ use tokio::{ | |
| task::JoinHandle, | ||
| }; | ||
|
|
||
| use std::sync::atomic::{AtomicU64, Ordering}; | ||
| use std::time::{SystemTime, UNIX_EPOCH}; | ||
|
|
||
| pub(crate) type PacketSender = UnboundedSender<NetworkPacket>; | ||
| pub(crate) type PacketReceiver = UnboundedReceiver<NetworkPacket>; | ||
| pub(crate) type SessionCollection = AHashMap<NetworkTuple, PacketSender>; | ||
|
|
@@ -19,7 +23,7 @@ mod packet; | |
| mod stream; | ||
|
|
||
| pub use self::error::{IpStackError, Result}; | ||
| pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport}; | ||
| pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpPacketEndpoint, IpStackUdpStream, IpStackUnknownTransport}; | ||
| pub use self::stream::{TcpConfig, TcpOptions}; | ||
| pub use etherparse::IpNumber; | ||
|
|
||
|
|
@@ -75,6 +79,8 @@ pub struct IpStackConfig { | |
| /// Timeout for UDP connections. | ||
| /// Default is 30 seconds. | ||
| pub udp_timeout: Duration, | ||
|
|
||
|
Uxx0 marked this conversation as resolved.
Outdated
|
||
| pub udp_packet_mode: bool, | ||
| } | ||
|
|
||
| impl Default for IpStackConfig { | ||
|
|
@@ -84,6 +90,7 @@ impl Default for IpStackConfig { | |
| packet_information: false, | ||
| tcp_config: Arc::new(TcpConfig::default()), | ||
| udp_timeout: Duration::from_secs(30), | ||
| udp_packet_mode: false, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -177,6 +184,12 @@ impl IpStackConfig { | |
| self.packet_information = packet_information; | ||
| self | ||
| } | ||
|
|
||
| /// Enable or disable UDP packet mode. | ||
| pub fn udp_packet_mode(&mut self, udp_packet_mode: bool) -> &mut Self { | ||
| self.udp_packet_mode = udp_packet_mode; | ||
| self | ||
| } | ||
| } | ||
|
|
||
| /// The main IP stack instance. | ||
|
|
@@ -310,7 +323,12 @@ fn run<Device: AsyncRead + AsyncWrite + Unpin + Send + 'static>( | |
| accept_sender: UnboundedSender<IpStackStream>, | ||
| ) -> JoinHandle<Result<()>> { | ||
| let mut sessions: SessionCollection = AHashMap::new(); | ||
| //UDPendpoints | ||
| let mut packet_endpoints: AHashMap<SocketAddr, (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec<u8>)>, Arc<AtomicU64>)> = | ||
| AHashMap::new(); | ||
| let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::<NetworkTuple>(); | ||
| //udp endpoints rm channel | ||
| let (edp_remove_tx, mut edp_remove_rx) = mpsc::unbounded_channel::<SocketAddr>(); | ||
| let pi = config.packet_information; | ||
| let offset = if pi && cfg!(unix) { 4 } else { 0 }; | ||
| let mut buffer = vec![0_u8; config.mtu as usize + offset]; | ||
|
|
@@ -320,7 +338,7 @@ fn run<Device: AsyncRead + AsyncWrite + Unpin + Send + 'static>( | |
| loop { | ||
| select! { | ||
| Ok(n) = device.read(&mut buffer) => { | ||
| if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &up_pkt_sender, &config, &accept_sender).await { | ||
| if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx,&edp_remove_tx, &up_pkt_sender, &config, &accept_sender, &mut packet_endpoints).await { | ||
| let io_err: std::io::Error = e.into(); | ||
| if io_err.kind() == std::io::ErrorKind::ConnectionRefused { | ||
| log::trace!("Received junk data: {io_err}"); | ||
|
|
@@ -329,6 +347,11 @@ fn run<Device: AsyncRead + AsyncWrite + Unpin + Send + 'static>( | |
| } | ||
| } | ||
| } | ||
| //udp endpoint remove | ||
| Some(src_addr) = edp_remove_rx.recv() => { | ||
| packet_endpoints.remove(&src_addr); | ||
| log::debug!("Packet endpoint destroyed and removed: {}", src_addr); | ||
| } | ||
| Some(network_tuple) = session_remove_rx.recv() => { | ||
| sessions.remove(&network_tuple); | ||
| log::debug!("session destroyed: {network_tuple}"); | ||
|
|
@@ -345,9 +368,11 @@ async fn process_device_read( | |
| data: &[u8], | ||
| sessions: &mut SessionCollection, | ||
| session_remove_tx: &UnboundedSender<NetworkTuple>, | ||
| edp_remove_tx: &tokio::sync::mpsc::UnboundedSender<SocketAddr>, | ||
| up_pkt_sender: &PacketSender, | ||
| config: &IpStackConfig, | ||
| accept_sender: &UnboundedSender<IpStackStream>, | ||
| packet_endpoints: &mut AHashMap<SocketAddr, (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec<u8>)>, Arc<AtomicU64>)>, | ||
| ) -> Result<()> { | ||
| let Ok(packet) = NetworkPacket::parse(data) else { | ||
| let stream = IpStackStream::UnknownNetwork(data.to_owned()); | ||
|
|
@@ -368,6 +393,75 @@ async fn process_device_read( | |
| return Ok(()); | ||
| } | ||
|
|
||
| //UDP packet | ||
| if let TransportHeader::Udp(_udp_header) = packet.transport_header() { | ||
| if config.udp_packet_mode { | ||
| let src_addr = packet.src_addr(); | ||
| let dst_addr = packet.dst_addr(); | ||
| let payload = packet.payload.unwrap_or_default(); | ||
|
|
||
| match packet_endpoints.entry(src_addr) { | ||
| std::collections::hash_map::Entry::Occupied(entry) => { | ||
| let (tx, last_activity) = entry.get(); | ||
| last_activity.store(now_secs(), Ordering::Relaxed); | ||
|
|
||
| if let Err(e) = tx.send((src_addr, dst_addr, payload)) { | ||
| log::warn!("Failed to send to packet endpoint: {}", e); | ||
|
Uxx0 marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
|
|
||
| std::collections::hash_map::Entry::Vacant(entry) => { | ||
| //announce to destroy the channel when timeout or application layer take out | ||
| let (destroy_tx, mut destroy_rx) = tokio::sync::oneshot::channel::<()>(); | ||
|
|
||
| let last_activity = Arc::new(AtomicU64::new(now_secs())); | ||
| let last_activity_clone = last_activity.clone(); | ||
|
|
||
| let timeout_secs = config.udp_timeout.as_secs(); | ||
|
|
||
|
||
| let edp_remove_tx_clone = edp_remove_tx.clone(); | ||
| let src_addr_clone = src_addr; | ||
|
|
||
| tokio::spawn(async move { | ||
| loop { | ||
| let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed); | ||
| if elapsed >= timeout_secs { | ||
| log::info!("remove the channel of {} because not data in {} ", src_addr_clone, elapsed); | ||
|
Uxx0 marked this conversation as resolved.
Outdated
|
||
| break; | ||
| } | ||
|
||
|
|
||
| let sleep_duration = std::time::Duration::from_secs(timeout_secs - elapsed); | ||
|
|
||
| tokio::select! { | ||
|
|
||
| //sleep until timeout | ||
| _ = tokio::time::sleep(sleep_duration) => {} | ||
|
|
||
| // application layer take out | ||
| _ = &mut destroy_rx => { | ||
| log::debug!("application layer Endpoint:{} removed the channel", src_addr_clone); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let _ = edp_remove_tx_clone.send(src_addr_clone); | ||
| }); | ||
| //ipstack to application layer channel | ||
| let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); | ||
|
|
||
| let endpoint = IpStackUdpPacketEndpoint::new(rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx); | ||
|
|
||
| accept_sender.send(IpStackStream::UdpEdp(endpoint)).unwrap(); | ||
|
|
||
| entry.insert((tx.clone(), last_activity)); | ||
| tx.send((src_addr, dst_addr, payload)).unwrap(); | ||
|
Uxx0 marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
| return Ok(()); | ||
| } | ||
| } | ||
|
|
||
| let network_tuple = packet.network_tuple(); | ||
| match sessions.entry(network_tuple) { | ||
| std::collections::hash_map::Entry::Occupied(entry) => { | ||
|
|
@@ -439,3 +533,8 @@ async fn process_upstream_recv<Device: AsyncWrite + Unpin + 'static>( | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| //time | ||
| fn now_secs() -> u64 { | ||
| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() | ||
|
Uxx0 marked this conversation as resolved.
Outdated
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,9 +2,9 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; | |
|
|
||
| pub use self::tcp::IpStackTcpStream; | ||
| pub use self::tcp::{TcpConfig, TcpOptions}; | ||
| pub use self::udp::IpStackUdpPacketEndpoint; | ||
| pub use self::udp::IpStackUdpStream; | ||
| pub use self::unknown::IpStackUnknownTransport; | ||
|
|
||
| mod seqnum; | ||
| mod tcb; | ||
| mod tcp; | ||
|
|
@@ -27,6 +27,8 @@ pub enum IpStackStream { | |
| Tcp(IpStackTcpStream), | ||
| /// A UDP stream. | ||
| Udp(IpStackUdpStream), | ||
| /// UDP PACKET. | ||
| UdpEdp(IpStackUdpPacketEndpoint), | ||
| /// A stream for unknown transport protocols. | ||
|
Comment on lines
32
to
36
|
||
| UnknownTransport(IpStackUnknownTransport), | ||
| /// Raw network packets that couldn't be parsed. | ||
|
|
@@ -52,6 +54,7 @@ impl IpStackStream { | |
| match self { | ||
| IpStackStream::Tcp(tcp) => tcp.local_addr(), | ||
| IpStackStream::Udp(udp) => udp.local_addr(), | ||
| IpStackStream::UdpEdp(udp_edp) => udp_edp.local_addr(), | ||
| IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), | ||
|
Uxx0 marked this conversation as resolved.
|
||
| IpStackStream::UnknownTransport(unknown) => match unknown.src_addr() { | ||
| IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)), | ||
|
|
@@ -78,6 +81,7 @@ impl IpStackStream { | |
| match self { | ||
| IpStackStream::Tcp(tcp) => tcp.peer_addr(), | ||
| IpStackStream::Udp(udp) => udp.peer_addr(), | ||
| IpStackStream::UdpEdp(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), | ||
| IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), | ||
| IpStackStream::UnknownTransport(unknown) => match unknown.dst_addr() { | ||
| IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This example adds a
UdpEdpmatch arm, but the example never enablesIpStackConfig::udp_packet_mode(true), so this branch will never be hit with the default config. Either enable packet mode in the example configuration or remove/guard this branch so the example remains accurate.