feat(net): 实现tcp backlog功能 (#714)

* feat:实现tcp的backlog功能
This commit is contained in:
Saga1718 2024-04-14 23:51:47 +08:00 committed by GitHub
parent 9621ab16ef
commit c719ddc631
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 256 additions and 59 deletions

View File

@ -584,7 +584,6 @@ impl IndexNode for LockedRamFSInode {
fn special_node(&self) -> Option<super::vfs::SpecialNodeData> {
return self.0.lock().special_node.clone();
}
/// # 用于重命名内存中的文件或目录
fn rename(&self, _old_name: &str, _new_name: &str) -> Result<(), SystemError> {
let old_inode: Arc<dyn IndexNode> = self.find(_old_name)?;

View File

@ -67,6 +67,7 @@ pub fn vfs_init() -> Result<(), SystemError> {
root_inode
.create("sys", FileType::Dir, ModeType::from_bits_truncate(0o755))
.expect("Failed to create /sys");
kdebug!("dir in root:{:?}", root_inode.list());
procfs_init().expect("Failed to initialize procfs");
@ -131,6 +132,7 @@ fn migrate_virtual_filesystem(new_fs: Arc<dyn FileSystem>) -> Result<(), SystemE
do_migrate(new_root_inode.clone(), "proc", proc)?;
do_migrate(new_root_inode.clone(), "dev", dev)?;
do_migrate(new_root_inode.clone(), "sys", sys)?;
unsafe {
// drop旧的Root inode
let old_root_inode = __ROOT_INODE.take().unwrap();

View File

@ -160,7 +160,6 @@ pub fn poll_ifaces_try_lock(times: u16) -> Result<(), SystemError> {
send_event(&sockets)?;
return Ok(());
}
// 尝试次数用完,返回错误
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}

View File

@ -7,6 +7,7 @@ use smoltcp::{
use system_error::SystemError;
use crate::{
arch::rand::rand,
driver::net::NetDriver,
kerror, kwarn,
libs::rwlock::RwLock,
@ -281,7 +282,7 @@ impl UdpSocket {
ip.port = PORT_MANAGER.get_ephemeral_port(self.metadata.socket_type)?;
}
// 检测端口是否已被占用
PORT_MANAGER.bind_port(self.metadata.socket_type, ip.port, self.handle.clone())?;
PORT_MANAGER.bind_port(self.metadata.socket_type, ip.port, self.clone())?;
let bind_res = if ip.addr.is_unspecified() {
socket.bind(ip.port)
@ -457,7 +458,7 @@ impl Socket for UdpSocket {
/// https://man7.org/linux/man-pages/man7/tcp.7.html
#[derive(Debug, Clone)]
pub struct TcpSocket {
handle: Arc<GlobalSocketHandle>,
handles: Vec<Arc<GlobalSocketHandle>>,
local_endpoint: Option<wire::IpEndpoint>, // save local endpoint for bind()
is_listening: bool,
metadata: SocketMetadata,
@ -481,13 +482,10 @@ impl TcpSocket {
///
/// @return 返回创建的tcp的socket
pub fn new(options: SocketOptions) -> Self {
let rx_buffer = tcp::SocketBuffer::new(vec![0; Self::DEFAULT_RX_BUF_SIZE]);
let tx_buffer = tcp::SocketBuffer::new(vec![0; Self::DEFAULT_TX_BUF_SIZE]);
let socket = tcp::Socket::new(rx_buffer, tx_buffer);
// 把socket添加到socket集合中并得到socket的句柄
let handle: Arc<GlobalSocketHandle> =
GlobalSocketHandle::new(SOCKET_SET.lock_irqsave().add(socket));
// 创建handles数组并把socket添加到socket集合中并得到socket的句柄
let handles: Vec<Arc<GlobalSocketHandle>> = vec![GlobalSocketHandle::new(
SOCKET_SET.lock_irqsave().add(Self::create_new_socket()),
)];
let metadata = SocketMetadata::new(
SocketType::Tcp,
@ -496,9 +494,10 @@ impl TcpSocket {
Self::DEFAULT_METADATA_BUF_SIZE,
options,
);
// kdebug!("when there's a new tcp socket,its'len: {}",handles.len());
return Self {
handle,
handles,
local_endpoint: None,
is_listening: false,
metadata,
@ -517,7 +516,6 @@ impl TcpSocket {
// kdebug!("Tcp Socket Listen on {local_endpoint}");
socket.listen(local_endpoint)
};
// TODO: 增加端口占用检查
return match listen_result {
Ok(()) => {
// kdebug!(
@ -531,6 +529,16 @@ impl TcpSocket {
Err(_) => Err(SystemError::EINVAL),
};
}
/// # create_new_socket - 创建新的TCP套接字
///
/// 该函数用于创建一个新的TCP套接字并返回该套接字的引用。
fn create_new_socket() -> tcp::Socket<'static> {
// 初始化tcp的buffer
let rx_buffer = tcp::SocketBuffer::new(vec![0; Self::DEFAULT_RX_BUF_SIZE]);
let tx_buffer = tcp::SocketBuffer::new(vec![0; Self::DEFAULT_TX_BUF_SIZE]);
tcp::Socket::new(rx_buffer, tx_buffer)
}
}
impl Socket for TcpSocket {
@ -545,11 +553,12 @@ impl Socket for TcpSocket {
return (Err(SystemError::ENOTCONN), Endpoint::Ip(None));
}
// kdebug!("tcp socket: read, buf len={}", buf.len());
// kdebug!("tcp socket:read, socket'len={}",self.handle.len());
loop {
poll_ifaces();
let mut socket_set_guard = SOCKET_SET.lock_irqsave();
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handle.0);
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handles.get(0).unwrap().0);
// 如果socket已经关闭返回错误
if !socket.is_active() {
@ -613,8 +622,11 @@ impl Socket for TcpSocket {
{
return Err(SystemError::ENOTCONN);
}
// kdebug!("tcp socket:write, socket'len={}",self.handle.len());
let mut socket_set_guard = SOCKET_SET.lock_irqsave();
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handle.0);
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handles.get(0).unwrap().0);
if socket.is_open() {
if socket.can_send() {
@ -639,8 +651,9 @@ impl Socket for TcpSocket {
fn poll(&self) -> EPollEventType {
let mut socket_set_guard = SOCKET_SET.lock_irqsave();
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handle.0);
// kdebug!("tcp socket:poll, socket'len={}",self.handle.len());
let socket = socket_set_guard.get_mut::<tcp::Socket>(self.handles.get(0).unwrap().0);
return SocketPollMethod::tcp_poll(
socket,
HANDLE_MAP
@ -653,12 +666,14 @@ impl Socket for TcpSocket {
fn connect(&mut self, endpoint: Endpoint) -> Result<(), SystemError> {
let mut sockets = SOCKET_SET.lock_irqsave();
let socket = sockets.get_mut::<tcp::Socket>(self.handle.0);
// kdebug!("tcp socket:connect, socket'len={}",self.handle.len());
let socket = sockets.get_mut::<tcp::Socket>(self.handles.get(0).unwrap().0);
if let Endpoint::Ip(Some(ip)) = endpoint {
let temp_port = PORT_MANAGER.get_ephemeral_port(self.metadata.socket_type)?;
// 检测端口是否被占用
PORT_MANAGER.bind_port(self.metadata.socket_type, temp_port, self.handle.clone())?;
PORT_MANAGER.bind_port(self.metadata.socket_type, temp_port, self.clone())?;
// kdebug!("temp_port: {}", temp_port);
let iface: Arc<dyn NetDriver> = NET_DRIVERS.write_irqsave().get(&0).unwrap().clone();
@ -674,7 +689,7 @@ impl Socket for TcpSocket {
loop {
poll_ifaces();
let mut sockets = SOCKET_SET.lock_irqsave();
let socket = sockets.get_mut::<tcp::Socket>(self.handle.0);
let socket = sockets.get_mut::<tcp::Socket>(self.handles.get(0).unwrap().0);
match socket.state() {
tcp::State::Established => {
@ -710,21 +725,43 @@ impl Socket for TcpSocket {
/// @brief tcp socket 监听 local_endpoint 端口
///
/// @param backlog 未处理的连接队列的最大长度. 由于smoltcp不支持backlog所以这个参数目前无效
fn listen(&mut self, _backlog: usize) -> Result<(), SystemError> {
fn listen(&mut self, backlog: usize) -> Result<(), SystemError> {
if self.is_listening {
return Ok(());
}
let local_endpoint = self.local_endpoint.ok_or(SystemError::EINVAL)?;
let mut sockets = SOCKET_SET.lock_irqsave();
let socket = sockets.get_mut::<tcp::Socket>(self.handle.0);
// 获取handle的数量
let handlen = self.handles.len();
let backlog = handlen.max(backlog);
if socket.is_listening() {
// kdebug!("Tcp Socket is already listening on {local_endpoint}");
return Ok(());
// 添加剩余需要构建的socket
// kdebug!("tcp socket:before listen, socket'len={}",self.handle.len());
let mut handle_guard = HANDLE_MAP.write_irqsave();
self.handles.extend((handlen..backlog).map(|_| {
let socket = Self::create_new_socket();
let handle = GlobalSocketHandle::new(sockets.add(socket));
let handle_item = SocketHandleItem::new();
handle_guard.insert(handle.0, handle_item);
handle
}));
// kdebug!("tcp socket:listen, socket'len={}",self.handle.len());
// kdebug!("tcp socket:listen, backlog={backlog}");
// 监听所有的socket
for i in 0..backlog {
let handle = self.handles.get(i).unwrap();
let socket = sockets.get_mut::<tcp::Socket>(handle.0);
if !socket.is_listening() {
// kdebug!("Tcp Socket is already listening on {local_endpoint}");
self.do_listen(socket, local_endpoint)?;
}
// kdebug!("Tcp Socket before listen, open={}", socket.is_open());
}
// kdebug!("Tcp Socket before listen, open={}", socket.is_open());
return self.do_listen(socket, local_endpoint);
return Ok(());
}
fn bind(&mut self, endpoint: Endpoint) -> Result<(), SystemError> {
@ -734,7 +771,8 @@ impl Socket for TcpSocket {
}
// 检测端口是否已被占用
PORT_MANAGER.bind_port(self.metadata.socket_type, ip.port, self.handle.clone())?;
PORT_MANAGER.bind_port(self.metadata.socket_type, ip.port, self.clone())?;
// kdebug!("tcp socket:bind, socket'len={}",self.handle.len());
self.local_endpoint = Some(ip);
self.is_listening = false;
@ -758,21 +796,22 @@ impl Socket for TcpSocket {
loop {
// kdebug!("tcp accept: poll_ifaces()");
poll_ifaces();
// kdebug!("tcp socket:accept, socket'len={}",self.handle.len());
let mut sockets = SOCKET_SET.lock_irqsave();
let socket = sockets.get_mut::<tcp::Socket>(self.handle.0);
// 随机获取访问的socket的handle
let index: usize = rand() % self.handles.len();
let handle = self.handles.get(index).unwrap();
let socket = sockets.get_mut::<tcp::Socket>(handle.0);
if socket.is_active() {
// kdebug!("tcp accept: socket.is_active()");
let remote_ep = socket.remote_endpoint().ok_or(SystemError::ENOTCONN)?;
let new_socket = {
// Initialize the TCP socket's buffers.
let rx_buffer = tcp::SocketBuffer::new(vec![0; Self::DEFAULT_RX_BUF_SIZE]);
let tx_buffer = tcp::SocketBuffer::new(vec![0; Self::DEFAULT_TX_BUF_SIZE]);
// The new TCP socket used for sending and receiving data.
let mut tcp_socket = tcp::Socket::new(rx_buffer, tx_buffer);
let mut tcp_socket = Self::create_new_socket();
self.do_listen(&mut tcp_socket, endpoint)
.expect("do_listen failed");
@ -781,17 +820,10 @@ impl Socket for TcpSocket {
// 之所以把old_handle存入new_socket, 是因为当前时刻smoltcp已经把old_handle对应的socket与远程的endpoint关联起来了
// 因此需要再为当前的socket分配一个新的handle
let new_handle = GlobalSocketHandle::new(sockets.add(tcp_socket));
let old_handle = ::core::mem::replace(&mut self.handle, new_handle.clone());
// 更新端口与 handle 的绑定
if let Some(Endpoint::Ip(Some(ip))) = self.endpoint() {
PORT_MANAGER.unbind_port(self.metadata.socket_type, ip.port)?;
PORT_MANAGER.bind_port(
self.metadata.socket_type,
ip.port,
new_handle.clone(),
)?;
}
let old_handle = ::core::mem::replace(
&mut *self.handles.get_mut(index).unwrap(),
new_handle.clone(),
);
let metadata = SocketMetadata::new(
SocketType::Tcp,
@ -802,19 +834,33 @@ impl Socket for TcpSocket {
);
let new_socket = Box::new(TcpSocket {
handle: old_handle.clone(),
handles: vec![old_handle.clone()],
local_endpoint: self.local_endpoint,
is_listening: false,
metadata,
});
// kdebug!("tcp socket:after accept, socket'len={}",new_socket.handle.len());
// 更新端口与 socket 的绑定
if let Some(Endpoint::Ip(Some(ip))) = self.endpoint() {
PORT_MANAGER.unbind_port(self.metadata.socket_type, ip.port)?;
PORT_MANAGER.bind_port(
self.metadata.socket_type,
ip.port,
*new_socket.clone(),
)?;
}
// 更新handle表
let mut handle_guard = HANDLE_MAP.write_irqsave();
// 先删除原来的
let item = handle_guard.remove(&old_handle.0).unwrap();
// 按照smoltcp行为将新的handle绑定到原来的item
handle_guard.insert(new_handle.0, item);
let new_item = SocketHandleItem::new();
// 插入新的item
handle_guard.insert(old_handle.0, new_item);
@ -826,13 +872,11 @@ impl Socket for TcpSocket {
return Ok((new_socket, Endpoint::Ip(Some(remote_ep))));
}
drop(sockets);
// kdebug!("tcp socket:before sleep, handle_guard'len={}",HANDLE_MAP.write_irqsave().len());
SocketHandleItem::sleep(
self.socket_handle(),
Self::CAN_ACCPET,
HANDLE_MAP.read_irqsave(),
);
drop(sockets);
SocketHandleItem::sleep(handle.0, Self::CAN_ACCPET, HANDLE_MAP.read_irqsave());
// kdebug!("tcp socket:after sleep, handle_guard'len={}",HANDLE_MAP.write_irqsave().len());
}
}
@ -841,7 +885,9 @@ impl Socket for TcpSocket {
if result.is_none() {
let sockets = SOCKET_SET.lock_irqsave();
let socket = sockets.get::<tcp::Socket>(self.handle.0);
// kdebug!("tcp socket:endpoint, socket'len={}",self.handle.len());
let socket = sockets.get::<tcp::Socket>(self.handles.get(0).unwrap().0);
if let Some(ep) = socket.local_endpoint() {
result = Some(Endpoint::Ip(Some(ep)));
}
@ -851,7 +897,9 @@ impl Socket for TcpSocket {
fn peer_endpoint(&self) -> Option<Endpoint> {
let sockets = SOCKET_SET.lock_irqsave();
let socket = sockets.get::<tcp::Socket>(self.handle.0);
// kdebug!("tcp socket:peer_endpoint, socket'len={}",self.handle.len());
let socket = sockets.get::<tcp::Socket>(self.handles.get(0).unwrap().0);
return socket.remote_endpoint().map(|x| Endpoint::Ip(Some(x)));
}
@ -864,7 +912,9 @@ impl Socket for TcpSocket {
}
fn socket_handle(&self) -> SocketHandle {
self.handle.0
// kdebug!("tcp socket:socket_handle, socket'len={}",self.handle.len());
self.handles.get(0).unwrap().0
}
fn as_any_ref(&self) -> &dyn core::any::Any {

View File

@ -330,6 +330,7 @@ impl IndexNode for SocketInode {
.remove(&socket.socket_handle())
.unwrap();
}
Ok(())
}
@ -455,9 +456,9 @@ impl SocketHandleItem {
/// 如果 TCP/UDP 的 socket 绑定了某个端口,它会在对应的表中记录,以检测端口冲突。
pub struct PortManager {
// TCP 端口记录表
tcp_port_table: SpinLock<HashMap<u16, Arc<GlobalSocketHandle>>>,
tcp_port_table: SpinLock<HashMap<u16, Arc<dyn Socket>>>,
// UDP 端口记录表
udp_port_table: SpinLock<HashMap<u16, Arc<GlobalSocketHandle>>>,
udp_port_table: SpinLock<HashMap<u16, Arc<dyn Socket>>>,
}
impl PortManager {
@ -513,7 +514,7 @@ impl PortManager {
&self,
socket_type: SocketType,
port: u16,
handle: Arc<GlobalSocketHandle>,
socket: impl Socket,
) -> Result<(), SystemError> {
if port > 0 {
let mut listen_table_guard = match socket_type {
@ -523,7 +524,7 @@ impl PortManager {
};
match listen_table_guard.get(&port) {
Some(_) => return Err(SystemError::EADDRINUSE),
None => listen_table_guard.insert(port, handle),
None => listen_table_guard.insert(port, Arc::new(socket)),
};
drop(listen_table_guard);
}

View File

@ -1041,6 +1041,11 @@ impl Syscall {
let name = args[0] as *mut PosixOldUtsName;
Self::uname(name)
}
SYS_PRCTL => {
// todo: 这个系统调用还没有实现
Err(SystemError::EINVAL)
}
SYS_SHMGET => {
let key = ShmKey::new(args[0]);

View File

@ -11,7 +11,7 @@
#define MAX_REQUEST_SIZE 1500
#define MAX_RESPONSE_SIZE 1500
// 网页根目录
#define WEB_ROOT "/wwwroot/First-WebPage-On-DragonOS"
#define WEB_ROOT "/var/www/html/"
#define EXIT_CODE 1
#define min(a, b) ((a) < (b) ? (a) : (b))

View File

@ -0,0 +1,2 @@
[build]
target = "x86_64-unknown-linux-musl"

3
user/apps/test-backlog/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target
Cargo.lock
/install/

View File

@ -0,0 +1,12 @@
[package]
name = "test-backlog"
version = "0.1.0"
edition = "2021"
description = "test the tcp backlog"
authors = [ "saga" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web={ version = "3.0.0",default-features = false,features=["rust-tls"] }

View File

@ -0,0 +1,56 @@
TOOLCHAIN="+nightly-2023-08-15-x86_64-unknown-linux-gnu"
RUSTFLAGS+=""
ifdef DADK_CURRENT_BUILD_DIR
# 如果是在dadk中编译那么安装到dadk的安装目录中
INSTALL_DIR = $(DADK_CURRENT_BUILD_DIR)
else
# 如果是在本地编译那么安装到当前目录下的install目录中
INSTALL_DIR = ./install
endif
ifeq ($(ARCH), x86_64)
export RUST_TARGET=x86_64-unknown-linux-musl
else ifeq ($(ARCH), riscv64)
export RUST_TARGET=riscv64gc-unknown-linux-gnu
else
# 默认为x86_86用于本地编译
export RUST_TARGET=x86_64-unknown-linux-musl
endif
run:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) run --target $(RUST_TARGET) --features"rust-tls"
build:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) build --target $(RUST_TARGET) --features"rust-tls"
clean:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) clean --target $(RUST_TARGET)
test:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) test --target $(RUST_TARGET)
doc:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) doc --target $(RUST_TARGET)
fmt:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) fmt
fmt-check:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) fmt --check
run-release:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) run --target $(RUST_TARGET) --release
build-release:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) build --target $(RUST_TARGET) --release
clean-release:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) clean --target $(RUST_TARGET) --release
test-release:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) test --target $(RUST_TARGET) --release
.PHONY: install
install:
RUSTFLAGS=$(RUSTFLAGS) cargo $(TOOLCHAIN) install --target $(RUST_TARGET) --path . --no-track --root $(INSTALL_DIR) --force

View File

@ -0,0 +1,9 @@
## 程序说明
用于测试tcp的backlog功能的测试程序
本程序绑定到0.0.0.0:12580端口并在外部手动多线程请求这个端口来测试backlog功能
## 使用方法
1. 打开系统的/bin目录
2. 输入指令exec test-backlog即可开始测试
3. 可以在外部使用apifox进行多次请求当请求数目超过backlog时会有几个请求失败

View File

@ -0,0 +1,33 @@
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use std::io;
async fn index(req: HttpRequest) -> HttpResponse {
// 获取请求方法
let method = req.method().to_string();
// 获取请求路径
let path = req.path().to_string();
// 获取请求头部信息
let headers = req.headers().clone();
// 获取查询参数
let query_params = req.query_string().to_string();
// 打印请求信息
println!("Received {} request to {}", method, path);
println!("Headers: {:?}", headers);
println!("Query params: {}", query_params);
// 返回响应
HttpResponse::Ok().body("Hello, World!")
}
#[actix_web::main]
async fn main() -> io::Result<()> {
// 设置 TCP backlog 大小为 5
let backlog_size = 5;
HttpServer::new(|| App::new().route("/", web::get().to(index)))
.backlog(backlog_size) // 设置 TCP backlog 大小
.bind("0.0.0.0:12580")?
.run()
.await
}

View File

@ -0,0 +1,26 @@
{
"name": "test-backlog",
"version": "0.1.0",
"description": "test the tcp backlog",
"rust_target": null,
"task_type": {
"BuildFromSource": {
"Local": {
"path": "apps/test-backlog"
}
}
},
"depends": [],
"build": {
"build_command": "make install"
},
"install": {
"in_dragonos_path": "/"
},
"clean": {
"clean_command": "make clean"
},
"envs": [],
"build_once": false,
"install_once": false
}