Support set status flag for socket

This commit is contained in:
Jianfeng Jiang
2023-06-13 17:49:44 +08:00
committed by Tate, Hongliang Tian
parent c7dda21e96
commit db489f09a8
5 changed files with 121 additions and 7 deletions

View File

@ -1,3 +1,6 @@
use core::sync::atomic::{AtomicBool, Ordering};
use crate::fs::utils::StatusFlags;
use crate::net::iface::IpEndpoint; use crate::net::iface::IpEndpoint;
use crate::{ use crate::{
@ -20,6 +23,7 @@ use super::always_some::AlwaysSome;
use super::common::{bind_socket, get_ephemeral_endpoint}; use super::common::{bind_socket, get_ephemeral_endpoint};
pub struct DatagramSocket { pub struct DatagramSocket {
nonblocking: AtomicBool,
inner: RwLock<Inner>, inner: RwLock<Inner>,
} }
@ -128,6 +132,7 @@ impl DatagramSocket {
let udp_socket = AnyUnboundSocket::new_udp(); let udp_socket = AnyUnboundSocket::new_udp();
Self { Self {
inner: RwLock::new(Inner::Unbound(AlwaysSome::new(udp_socket))), inner: RwLock::new(Inner::Unbound(AlwaysSome::new(udp_socket))),
nonblocking: AtomicBool::new(false),
} }
} }
@ -156,6 +161,14 @@ impl DatagramSocket {
"udp should provide remote addr", "udp should provide remote addr",
)) ))
} }
pub fn nonblocking(&self) -> bool {
self.nonblocking.load(Ordering::SeqCst)
}
pub fn set_nonblocking(&self, nonblocking: bool) {
self.nonblocking.store(nonblocking, Ordering::SeqCst);
}
} }
impl FileLike for DatagramSocket { impl FileLike for DatagramSocket {
@ -179,6 +192,23 @@ impl FileLike for DatagramSocket {
fn as_socket(&self) -> Option<&dyn Socket> { fn as_socket(&self) -> Option<&dyn Socket> {
Some(self) Some(self)
} }
fn status_flags(&self) -> StatusFlags {
if self.nonblocking() {
StatusFlags::O_NONBLOCK
} else {
StatusFlags::empty()
}
}
fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> {
if new_flags.contains(StatusFlags::O_NONBLOCK) {
self.set_nonblocking(true);
} else {
self.set_nonblocking(false);
}
Ok(())
}
} }
impl Socket for DatagramSocket { impl Socket for DatagramSocket {

View File

@ -1,3 +1,5 @@
use core::sync::atomic::{AtomicBool, Ordering};
use crate::net::iface::IpEndpoint; use crate::net::iface::IpEndpoint;
use crate::{ use crate::{
fs::utils::{IoEvents, Poller}, fs::utils::{IoEvents, Poller},
@ -10,13 +12,19 @@ use crate::{
}; };
pub struct ConnectedStream { pub struct ConnectedStream {
nonblocking: AtomicBool,
bound_socket: Arc<AnyBoundSocket>, bound_socket: Arc<AnyBoundSocket>,
remote_endpoint: IpEndpoint, remote_endpoint: IpEndpoint,
} }
impl ConnectedStream { impl ConnectedStream {
pub fn new(bound_socket: Arc<AnyBoundSocket>, remote_endpoint: IpEndpoint) -> Self { pub fn new(
nonblocking: bool,
bound_socket: Arc<AnyBoundSocket>,
remote_endpoint: IpEndpoint,
) -> Self {
Self { Self {
nonblocking: AtomicBool::new(nonblocking),
bound_socket, bound_socket,
remote_endpoint, remote_endpoint,
} }
@ -92,4 +100,12 @@ impl ConnectedStream {
pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents {
self.bound_socket.poll(mask, poller) self.bound_socket.poll(mask, poller)
} }
pub fn nonblocking(&self) -> bool {
self.nonblocking.load(Ordering::SeqCst)
}
pub fn set_nonblocking(&self, nonblocking: bool) {
self.nonblocking.store(nonblocking, Ordering::SeqCst);
}
} }

View File

@ -1,3 +1,5 @@
use core::sync::atomic::{AtomicBool, Ordering};
use crate::fs::utils::{IoEvents, Poller}; use crate::fs::utils::{IoEvents, Poller};
use crate::net::iface::Iface; use crate::net::iface::Iface;
use crate::net::iface::IpEndpoint; use crate::net::iface::IpEndpoint;
@ -9,6 +11,8 @@ use crate::prelude::*;
pub struct InitStream { pub struct InitStream {
inner: RwLock<Inner>, inner: RwLock<Inner>,
// TODO: deal with nonblocking
nonblocking: AtomicBool,
} }
enum Inner { enum Inner {
@ -114,6 +118,7 @@ impl InitStream {
let socket = AnyUnboundSocket::new_tcp(); let socket = AnyUnboundSocket::new_tcp();
let inner = Inner::Unbound(AlwaysSome::new(socket)); let inner = Inner::Unbound(AlwaysSome::new(socket));
Self { Self {
nonblocking: AtomicBool::new(false),
inner: RwLock::new(inner), inner: RwLock::new(inner),
} }
} }
@ -172,4 +177,12 @@ impl InitStream {
pub fn bound_socket(&self) -> Option<Arc<AnyBoundSocket>> { pub fn bound_socket(&self) -> Option<Arc<AnyBoundSocket>> {
self.inner.read().bound_socket().map(Clone::clone) self.inner.read().bound_socket().map(Clone::clone)
} }
pub fn nonblocking(&self) -> bool {
self.nonblocking.load(Ordering::SeqCst)
}
pub fn set_nonblocking(&self, nonblocking: bool) {
self.nonblocking.store(nonblocking, Ordering::SeqCst);
}
} }

View File

@ -1,3 +1,5 @@
use core::sync::atomic::{AtomicBool, Ordering};
use crate::net::iface::{AnyUnboundSocket, BindPortConfig, IpEndpoint}; use crate::net::iface::{AnyUnboundSocket, BindPortConfig, IpEndpoint};
use crate::fs::utils::{IoEvents, Poller}; use crate::fs::utils::{IoEvents, Poller};
@ -7,16 +9,22 @@ use crate::{net::poll_ifaces, prelude::*};
use super::connected::ConnectedStream; use super::connected::ConnectedStream;
pub struct ListenStream { pub struct ListenStream {
nonblocking: AtomicBool,
backlog: usize, backlog: usize,
/// Sockets also listening at LocalEndPoint when called `listen` /// Sockets also listening at LocalEndPoint when called `listen`
backlog_sockets: RwLock<Vec<BacklogSocket>>, backlog_sockets: RwLock<Vec<BacklogSocket>>,
} }
impl ListenStream { impl ListenStream {
pub fn new(bound_socket: Arc<AnyBoundSocket>, backlog: usize) -> Result<Self> { pub fn new(
nonblocking: bool,
bound_socket: Arc<AnyBoundSocket>,
backlog: usize,
) -> Result<Self> {
debug_assert!(backlog >= 1); debug_assert!(backlog >= 1);
let backlog_socket = BacklogSocket::new(&bound_socket)?; let backlog_socket = BacklogSocket::new(&bound_socket)?;
let listen_stream = Self { let listen_stream = Self {
nonblocking: AtomicBool::new(nonblocking),
backlog, backlog,
backlog_sockets: RwLock::new(vec![backlog_socket]), backlog_sockets: RwLock::new(vec![backlog_socket]),
}; };
@ -43,7 +51,8 @@ impl ListenStream {
let BacklogSocket { let BacklogSocket {
bound_socket: backlog_socket, bound_socket: backlog_socket,
} = accepted_socket; } = accepted_socket;
ConnectedStream::new(backlog_socket, remote_endpoint) let nonblocking = self.nonblocking();
ConnectedStream::new(nonblocking, backlog_socket, remote_endpoint)
}; };
return Ok((connected_stream, remote_endpoint)); return Ok((connected_stream, remote_endpoint));
} }
@ -100,6 +109,14 @@ impl ListenStream {
fn bound_socket(&self) -> Arc<AnyBoundSocket> { fn bound_socket(&self) -> Arc<AnyBoundSocket> {
self.backlog_sockets.read()[0].bound_socket.clone() self.backlog_sockets.read()[0].bound_socket.clone()
} }
pub fn nonblocking(&self) -> bool {
self.nonblocking.load(Ordering::SeqCst)
}
pub fn set_nonblocking(&self, nonblocking: bool) {
self.nonblocking.store(nonblocking, Ordering::SeqCst);
}
} }
struct BacklogSocket { struct BacklogSocket {

View File

@ -1,6 +1,6 @@
use crate::fs::{ use crate::fs::{
file_handle::FileLike, file_handle::FileLike,
utils::{IoEvents, Poller}, utils::{IoEvents, Poller, StatusFlags},
}; };
use crate::net::socket::{ use crate::net::socket::{
util::{ util::{
@ -37,6 +37,22 @@ impl StreamSocket {
state: RwLock::new(state), state: RwLock::new(state),
} }
} }
fn nonblocking(&self) -> bool {
match &*self.state.read() {
State::Init(init) => init.nonblocking(),
State::Connected(connected) => connected.nonblocking(),
State::Listen(listen) => listen.nonblocking(),
}
}
fn set_nonblocking(&self, nonblocking: bool) {
match &*self.state.read() {
State::Init(init) => init.set_nonblocking(nonblocking),
State::Connected(connected) => connected.set_nonblocking(nonblocking),
State::Listen(listen) => listen.set_nonblocking(nonblocking),
}
}
} }
impl FileLike for StreamSocket { impl FileLike for StreamSocket {
@ -62,6 +78,23 @@ impl FileLike for StreamSocket {
} }
} }
fn status_flags(&self) -> StatusFlags {
if self.nonblocking() {
StatusFlags::O_NONBLOCK
} else {
StatusFlags::empty()
}
}
fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> {
if new_flags.contains(StatusFlags::O_NONBLOCK) {
self.set_nonblocking(true);
} else {
self.set_nonblocking(false);
}
Ok(())
}
fn as_socket(&self) -> Option<&dyn Socket> { fn as_socket(&self) -> Option<&dyn Socket> {
Some(self) Some(self)
} }
@ -84,9 +117,13 @@ impl Socket for StreamSocket {
match &*state { match &*state {
State::Init(init_stream) => { State::Init(init_stream) => {
init_stream.connect(&remote_endpoint)?; init_stream.connect(&remote_endpoint)?;
let nonblocking = init_stream.nonblocking();
let bound_socket = init_stream.bound_socket().unwrap(); let bound_socket = init_stream.bound_socket().unwrap();
let connected_stream = let connected_stream = Arc::new(ConnectedStream::new(
Arc::new(ConnectedStream::new(bound_socket, remote_endpoint)); nonblocking,
bound_socket,
remote_endpoint,
));
*state = State::Connected(connected_stream); *state = State::Connected(connected_stream);
Ok(()) Ok(())
} }
@ -101,8 +138,9 @@ impl Socket for StreamSocket {
if !init_stream.is_bound() { if !init_stream.is_bound() {
return_errno_with_message!(Errno::EINVAL, "cannot listen without bound"); return_errno_with_message!(Errno::EINVAL, "cannot listen without bound");
} }
let nonblocking = init_stream.nonblocking();
let bound_socket = init_stream.bound_socket().unwrap(); let bound_socket = init_stream.bound_socket().unwrap();
let listener = Arc::new(ListenStream::new(bound_socket, backlog)?); let listener = Arc::new(ListenStream::new(nonblocking, bound_socket, backlog)?);
*state = State::Listen(listener); *state = State::Listen(listener);
Ok(()) Ok(())
} }