Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions examples/tun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("#{number2} UDP closed, session count {c}");
});
}

IpStackStream::UdpEdp(mut endpoint) => {
let c = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
let number2 = number;
log::info!("#{number2} UDP Packet Endpoint starting, session count {c}");

Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example adds a UdpEdp match arm, but the example never enables IpStackConfig::udp_packet_mode(true), so this branch will never be hit with the default config. Either enable packet mode in the example configuration or remove/guard this branch so the example remains accurate.

Copilot uses AI. Check for mistakes.
tokio::spawn(async move {
loop {
tokio::select! {
res = endpoint.recv() => {
match res {
Some((_src_addr, _dst_addr, _payload)) => {


}
None => {
log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭");
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log line includes non-English text ("底层通道已关闭"), while the rest of the repository examples/logs are in English. Consider replacing it with an English message so the example output is consistent and understandable for all users.

Suggested change
log::info!("#{number2} UDP Packet Endpoint 底层通道已关闭");
log::info!("#{number2} UDP Packet Endpoint underlying channel has been closed");

Copilot uses AI. Check for mistakes.
break;
}
}
}
// res = app.readpacket() => {
// match res {
// Ok(Some((remote_player_addr, my_local_addr, payload))) => {
// log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len());

//
// if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) {
// log::warn!("#{number2} faild to send packet: {}", e);
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling/grammar in the (commented) log line: "faild" should be "failed" (and "shut down" is typically two words). Even though it’s commented out, it’s part of the example users will copy/paste.

Suggested change
// log::warn!("#{number2} faild to send packet: {}", e);
// log::warn!("#{number2} failed to send packet: {}", e);

Copilot uses AI. Check for mistakes.
// }
// }
// Ok(None) | Err(_) => {
//
// break;
// }
// }
// }

}
}
let c = count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed) - 1;
log::info!("#{number2} UDP Packet Endpoint closed, session count {c}");
});
}
IpStackStream::UnknownTransport(u) => {
let n = number;
if u.src_addr().is_ipv4() && u.ip_protocol() == IpNumber::ICMP {
Expand Down
37 changes: 37 additions & 0 deletions examples/tun_wintun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,43 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("==== end UDP connection ====");
});
}
IpStackStream::UdpEdp(mut endpoint) => {
tokio::spawn(async move {
loop {
tokio::select! {
res = endpoint.recv() => {
match res {
Some((_src_addr, _dst_addr, _payload)) => {
//your logic to process the packet
}
None => {
log::info!(" UDP Packet Endpoint the channel have been shutdown");
Comment thread
Uxx0 marked this conversation as resolved.
Outdated
break;
}
}
}
// res = app.readpacket() => {
// match res {
// Ok(Some((remote_player_addr, my_local_addr, payload))) => {
// log::trace!("#{number2} [down] {} -> {} ({} bytes)", remote_player_addr, my_local_addr, payload.len());

//
// if let Err(e) = endpoint.send(remote_player_addr, my_local_addr, payload) {
// log::warn!("#{number2} faild to send packet: {}", e);
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling in the (commented) log line: "faild" should be "failed". Even though commented out, it’s part of the example text users may reuse.

Suggested change
// log::warn!("#{number2} faild to send packet: {}", e);
// log::warn!("#{number2} failed to send packet: {}", e);

Copilot uses AI. Check for mistakes.
// }
// }
// Ok(None) | Err(_) => {
//
// break;
// }
// }
// }

}
}
});
}

Comment thread
Uxx0 marked this conversation as resolved.
Outdated
IpStackStream::UnknownTransport(u) => {
if u.src_addr().is_ipv4() && u.ip_protocol() == IpNumber::ICMP {
let (icmp_header, req_payload) = Icmpv4Header::from_slice(u.payload())?;
Expand Down
103 changes: 101 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use ahash::AHashMap;
use packet::{NetworkPacket, NetworkTuple, TransportHeader};
use std::net::SocketAddr;
use std::{sync::Arc, time::Duration};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
Expand All @@ -10,6 +11,9 @@ use tokio::{
task::JoinHandle,
};

use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};

pub(crate) type PacketSender = UnboundedSender<NetworkPacket>;
pub(crate) type PacketReceiver = UnboundedReceiver<NetworkPacket>;
pub(crate) type SessionCollection = AHashMap<NetworkTuple, PacketSender>;
Expand All @@ -19,7 +23,7 @@ mod packet;
mod stream;

pub use self::error::{IpStackError, Result};
pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream, IpStackUnknownTransport};
pub use self::stream::{IpStackStream, IpStackTcpStream, IpStackUdpPacketEndpoint, IpStackUdpStream, IpStackUnknownTransport};
pub use self::stream::{TcpConfig, TcpOptions};
pub use etherparse::IpNumber;

Expand Down Expand Up @@ -75,6 +79,8 @@ pub struct IpStackConfig {
/// Timeout for UDP connections.
/// Default is 30 seconds.
pub udp_timeout: Duration,

Comment thread
Uxx0 marked this conversation as resolved.
Outdated
pub udp_packet_mode: bool,
}

impl Default for IpStackConfig {
Expand All @@ -84,6 +90,7 @@ impl Default for IpStackConfig {
packet_information: false,
tcp_config: Arc::new(TcpConfig::default()),
udp_timeout: Duration::from_secs(30),
udp_packet_mode: false,
}
}
}
Expand Down Expand Up @@ -177,6 +184,12 @@ impl IpStackConfig {
self.packet_information = packet_information;
self
}

/// Enable or disable UDP packet mode.
pub fn udp_packet_mode(&mut self, udp_packet_mode: bool) -> &mut Self {
self.udp_packet_mode = udp_packet_mode;
self
}
}

/// The main IP stack instance.
Expand Down Expand Up @@ -310,7 +323,12 @@ fn run<Device: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
accept_sender: UnboundedSender<IpStackStream>,
) -> JoinHandle<Result<()>> {
let mut sessions: SessionCollection = AHashMap::new();
//UDPendpoints
let mut packet_endpoints: AHashMap<SocketAddr, (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec<u8>)>, Arc<AtomicU64>)> =
AHashMap::new();
let (session_remove_tx, mut session_remove_rx) = mpsc::unbounded_channel::<NetworkTuple>();
//udp endpoints rm channel
let (edp_remove_tx, mut edp_remove_rx) = mpsc::unbounded_channel::<SocketAddr>();
let pi = config.packet_information;
let offset = if pi && cfg!(unix) { 4 } else { 0 };
let mut buffer = vec![0_u8; config.mtu as usize + offset];
Expand All @@ -320,7 +338,7 @@ fn run<Device: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
loop {
select! {
Ok(n) = device.read(&mut buffer) => {
if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx, &up_pkt_sender, &config, &accept_sender).await {
if let Err(e) = process_device_read(&buffer[offset..n], &mut sessions, &session_remove_tx,&edp_remove_tx, &up_pkt_sender, &config, &accept_sender, &mut packet_endpoints).await {
let io_err: std::io::Error = e.into();
if io_err.kind() == std::io::ErrorKind::ConnectionRefused {
log::trace!("Received junk data: {io_err}");
Expand All @@ -329,6 +347,11 @@ fn run<Device: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
}
}
}
//udp endpoint remove
Some(src_addr) = edp_remove_rx.recv() => {
packet_endpoints.remove(&src_addr);
log::debug!("Packet endpoint destroyed and removed: {}", src_addr);
}
Some(network_tuple) = session_remove_rx.recv() => {
sessions.remove(&network_tuple);
log::debug!("session destroyed: {network_tuple}");
Expand All @@ -345,9 +368,11 @@ async fn process_device_read(
data: &[u8],
sessions: &mut SessionCollection,
session_remove_tx: &UnboundedSender<NetworkTuple>,
edp_remove_tx: &tokio::sync::mpsc::UnboundedSender<SocketAddr>,
up_pkt_sender: &PacketSender,
config: &IpStackConfig,
accept_sender: &UnboundedSender<IpStackStream>,
packet_endpoints: &mut AHashMap<SocketAddr, (mpsc::UnboundedSender<(SocketAddr, SocketAddr, Vec<u8>)>, Arc<AtomicU64>)>,
) -> Result<()> {
let Ok(packet) = NetworkPacket::parse(data) else {
let stream = IpStackStream::UnknownNetwork(data.to_owned());
Expand All @@ -368,6 +393,75 @@ async fn process_device_read(
return Ok(());
}

//UDP packet
if let TransportHeader::Udp(_udp_header) = packet.transport_header() {
if config.udp_packet_mode {
let src_addr = packet.src_addr();
let dst_addr = packet.dst_addr();
let payload = packet.payload.unwrap_or_default();

match packet_endpoints.entry(src_addr) {
std::collections::hash_map::Entry::Occupied(entry) => {
let (tx, last_activity) = entry.get();
last_activity.store(now_secs(), Ordering::Relaxed);

if let Err(e) = tx.send((src_addr, dst_addr, payload)) {
log::warn!("Failed to send to packet endpoint: {}", e);
Comment thread
Uxx0 marked this conversation as resolved.
Outdated
}
}

std::collections::hash_map::Entry::Vacant(entry) => {
//announce to destroy the channel when timeout or application layer take out
let (destroy_tx, mut destroy_rx) = tokio::sync::oneshot::channel::<()>();

let last_activity = Arc::new(AtomicU64::new(now_secs()));
let last_activity_clone = last_activity.clone();

let timeout_secs = config.udp_timeout.as_secs();

Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout_secs = config.udp_timeout.as_secs() truncates sub-second durations to 0, making packet endpoints time out immediately for e.g. udp_timeout(500ms). Use the full Duration for timeout calculations (and store/load timestamps in a compatible unit) to match how IpStackUdpStream handles timeouts.

Copilot uses AI. Check for mistakes.
let edp_remove_tx_clone = edp_remove_tx.clone();
let src_addr_clone = src_addr;

tokio::spawn(async move {
loop {
let elapsed = now_secs() - last_activity_clone.load(Ordering::Relaxed);
if elapsed >= timeout_secs {
log::info!("remove the channel of {} because not data in {} ", src_addr_clone, elapsed);
Comment thread
Uxx0 marked this conversation as resolved.
Outdated
break;
}
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Endpoint timeout tracking uses SystemTime (wall-clock) and computes elapsed via unsigned subtraction. If the system clock jumps backwards, this can underflow and cause immediate endpoint removal. Use a monotonic clock (tokio::time::Instant / std::time::Instant) for elapsed-time tracking, or otherwise guard against clock rollback/underflow.

Copilot uses AI. Check for mistakes.

let sleep_duration = std::time::Duration::from_secs(timeout_secs - elapsed);

tokio::select! {

//sleep until timeout
_ = tokio::time::sleep(sleep_duration) => {}

// application layer take out
_ = &mut destroy_rx => {
log::debug!("application layer Endpoint:{} removed the channel", src_addr_clone);
break;
}
}
}

let _ = edp_remove_tx_clone.send(src_addr_clone);
});
//ipstack to application layer channel
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

let endpoint = IpStackUdpPacketEndpoint::new(rx, up_pkt_sender.clone(), src_addr, config.mtu, destroy_tx);

accept_sender.send(IpStackStream::UdpEdp(endpoint)).unwrap();

entry.insert((tx.clone(), last_activity));
tx.send((src_addr, dst_addr, payload)).unwrap();
Comment thread
Uxx0 marked this conversation as resolved.
Outdated
}
}
return Ok(());
}
}

let network_tuple = packet.network_tuple();
match sessions.entry(network_tuple) {
std::collections::hash_map::Entry::Occupied(entry) => {
Expand Down Expand Up @@ -439,3 +533,8 @@ async fn process_upstream_recv<Device: AsyncWrite + Unpin + 'static>(

Ok(())
}

//time
fn now_secs() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
Comment thread
Uxx0 marked this conversation as resolved.
Outdated
}
6 changes: 5 additions & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};

pub use self::tcp::IpStackTcpStream;
pub use self::tcp::{TcpConfig, TcpOptions};
pub use self::udp::IpStackUdpPacketEndpoint;
pub use self::udp::IpStackUdpStream;
pub use self::unknown::IpStackUnknownTransport;

mod seqnum;
mod tcb;
mod tcp;
Expand All @@ -27,6 +27,8 @@ pub enum IpStackStream {
Tcp(IpStackTcpStream),
/// A UDP stream.
Udp(IpStackUdpStream),
/// UDP PACKET.
UdpEdp(IpStackUdpPacketEndpoint),
/// A stream for unknown transport protocols.
Comment on lines 32 to 36
Copy link

Copilot AI Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IpStackStream::UdpEdp is an unclear/abbreviated variant name and the doc comment "UDP PACKET." is not descriptive. Consider renaming the variant to something aligned with the type name (e.g., UdpPacketEndpoint) and updating the enum docs accordingly to keep the public API self-explanatory.

Copilot uses AI. Check for mistakes.
UnknownTransport(IpStackUnknownTransport),
/// Raw network packets that couldn't be parsed.
Expand All @@ -52,6 +54,7 @@ impl IpStackStream {
match self {
IpStackStream::Tcp(tcp) => tcp.local_addr(),
IpStackStream::Udp(udp) => udp.local_addr(),
IpStackStream::UdpEdp(udp_edp) => udp_edp.local_addr(),
IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
Comment thread
Uxx0 marked this conversation as resolved.
IpStackStream::UnknownTransport(unknown) => match unknown.src_addr() {
IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)),
Expand All @@ -78,6 +81,7 @@ impl IpStackStream {
match self {
IpStackStream::Tcp(tcp) => tcp.peer_addr(),
IpStackStream::Udp(udp) => udp.peer_addr(),
IpStackStream::UdpEdp(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
IpStackStream::UnknownNetwork(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)),
IpStackStream::UnknownTransport(unknown) => match unknown.dst_addr() {
IpAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr, 0)),
Expand Down
Loading
Loading