mirror of
https://github.com/asterinas/asterinas.git
synced 2025-06-20 04:56:32 +00:00
Unpack states from Arc
in UNIX sockets
This commit is contained in:
committed by
Tate, Hongliang Tian
parent
8df51ab001
commit
e83e1fc01b
@ -25,26 +25,18 @@ impl Connected {
|
|||||||
self.local_endpoint.peer_addr()
|
self.local_endpoint.peer_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn write(&self, buf: &[u8]) -> Result<usize> {
|
pub(super) fn try_write(&self, buf: &[u8]) -> Result<usize> {
|
||||||
self.local_endpoint.write(buf)
|
self.local_endpoint.try_write(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn read(&self, buf: &mut [u8]) -> Result<usize> {
|
pub(super) fn try_read(&self, buf: &mut [u8]) -> Result<usize> {
|
||||||
self.local_endpoint.read(buf)
|
self.local_endpoint.try_read(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
|
pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
|
||||||
self.local_endpoint.shutdown(cmd)
|
self.local_endpoint.shutdown(cmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn is_nonblocking(&self) -> bool {
|
|
||||||
self.local_endpoint.is_nonblocking()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn set_nonblocking(&self, is_nonblocking: bool) {
|
|
||||||
self.local_endpoint.set_nonblocking(is_nonblocking).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
||||||
self.local_endpoint.poll(mask, poller)
|
self.local_endpoint.poll(mask, poller)
|
||||||
}
|
}
|
||||||
|
@ -1,15 +1,11 @@
|
|||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
use core::sync::atomic::AtomicBool;
|
|
||||||
|
|
||||||
use atomic::Ordering;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
events::IoEvents,
|
events::IoEvents,
|
||||||
fs::utils::{Channel, Consumer, Producer},
|
fs::utils::{Channel, Consumer, Producer},
|
||||||
net::socket::{unix::addr::UnixSocketAddrBound, SockShutdownCmd},
|
net::socket::{unix::addr::UnixSocketAddrBound, SockShutdownCmd},
|
||||||
prelude::*,
|
prelude::*,
|
||||||
process::signal::{Pollable, Poller},
|
process::signal::Poller,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(super) struct Endpoint {
|
pub(super) struct Endpoint {
|
||||||
@ -17,14 +13,12 @@ pub(super) struct Endpoint {
|
|||||||
peer_addr: Option<UnixSocketAddrBound>,
|
peer_addr: Option<UnixSocketAddrBound>,
|
||||||
reader: Consumer<u8>,
|
reader: Consumer<u8>,
|
||||||
writer: Producer<u8>,
|
writer: Producer<u8>,
|
||||||
is_nonblocking: AtomicBool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Endpoint {
|
impl Endpoint {
|
||||||
pub(super) fn new_pair(
|
pub(super) fn new_pair(
|
||||||
addr: Option<UnixSocketAddrBound>,
|
addr: Option<UnixSocketAddrBound>,
|
||||||
peer_addr: Option<UnixSocketAddrBound>,
|
peer_addr: Option<UnixSocketAddrBound>,
|
||||||
is_nonblocking: bool,
|
|
||||||
) -> (Endpoint, Endpoint) {
|
) -> (Endpoint, Endpoint) {
|
||||||
let (writer_this, reader_peer) = Channel::new(DAFAULT_BUF_SIZE).split();
|
let (writer_this, reader_peer) = Channel::new(DAFAULT_BUF_SIZE).split();
|
||||||
let (writer_peer, reader_this) = Channel::new(DAFAULT_BUF_SIZE).split();
|
let (writer_peer, reader_this) = Channel::new(DAFAULT_BUF_SIZE).split();
|
||||||
@ -34,14 +28,12 @@ impl Endpoint {
|
|||||||
peer_addr: peer_addr.clone(),
|
peer_addr: peer_addr.clone(),
|
||||||
reader: reader_this,
|
reader: reader_this,
|
||||||
writer: writer_this,
|
writer: writer_this,
|
||||||
is_nonblocking: AtomicBool::new(is_nonblocking),
|
|
||||||
};
|
};
|
||||||
let peer = Endpoint {
|
let peer = Endpoint {
|
||||||
addr: peer_addr,
|
addr: peer_addr,
|
||||||
peer_addr: addr,
|
peer_addr: addr,
|
||||||
reader: reader_peer,
|
reader: reader_peer,
|
||||||
writer: writer_peer,
|
writer: writer_peer,
|
||||||
is_nonblocking: AtomicBool::new(is_nonblocking),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
(this, peer)
|
(this, peer)
|
||||||
@ -55,29 +47,12 @@ impl Endpoint {
|
|||||||
self.peer_addr.as_ref()
|
self.peer_addr.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn is_nonblocking(&self) -> bool {
|
pub(super) fn try_read(&self, buf: &mut [u8]) -> Result<usize> {
|
||||||
self.is_nonblocking.load(Ordering::Relaxed)
|
self.reader.try_read(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn set_nonblocking(&self, is_nonblocking: bool) -> Result<()> {
|
pub(super) fn try_write(&self, buf: &[u8]) -> Result<usize> {
|
||||||
self.is_nonblocking.store(is_nonblocking, Ordering::Relaxed);
|
self.writer.try_write(buf)
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn read(&self, buf: &mut [u8]) -> Result<usize> {
|
|
||||||
if self.is_nonblocking() {
|
|
||||||
self.reader.try_read(buf)
|
|
||||||
} else {
|
|
||||||
self.wait_events(IoEvents::IN, || self.reader.try_read(buf))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn write(&self, buf: &[u8]) -> Result<usize> {
|
|
||||||
if self.is_nonblocking() {
|
|
||||||
self.writer.try_write(buf)
|
|
||||||
} else {
|
|
||||||
self.wait_events(IoEvents::OUT, || self.writer.try_write(buf))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
|
pub(super) fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
|
||||||
@ -115,10 +90,4 @@ impl Endpoint {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Pollable for Endpoint {
|
|
||||||
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
|
||||||
self.poll(mask, poller)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const DAFAULT_BUF_SIZE: usize = 4096;
|
const DAFAULT_BUF_SIZE: usize = 4096;
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
use core::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
|
|
||||||
use super::{connected::Connected, endpoint::Endpoint, listener::push_incoming};
|
use super::{connected::Connected, endpoint::Endpoint, listener::push_incoming};
|
||||||
use crate::{
|
use crate::{
|
||||||
events::IoEvents,
|
events::IoEvents,
|
||||||
@ -18,15 +16,13 @@ use crate::{
|
|||||||
pub(super) struct Init {
|
pub(super) struct Init {
|
||||||
addr: Mutex<Option<UnixSocketAddrBound>>,
|
addr: Mutex<Option<UnixSocketAddrBound>>,
|
||||||
pollee: Pollee,
|
pollee: Pollee,
|
||||||
is_nonblocking: AtomicBool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Init {
|
impl Init {
|
||||||
pub(super) fn new(is_nonblocking: bool) -> Self {
|
pub(super) fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
addr: Mutex::new(None),
|
addr: Mutex::new(None),
|
||||||
pollee: Pollee::new(IoEvents::empty()),
|
pollee: Pollee::new(IoEvents::empty()),
|
||||||
is_nonblocking: AtomicBool::new(is_nonblocking),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,8 +53,7 @@ impl Init {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (this_end, remote_end) =
|
let (this_end, remote_end) = Endpoint::new_pair(addr, Some(remote_addr.clone()));
|
||||||
Endpoint::new_pair(addr, Some(remote_addr.clone()), self.is_nonblocking());
|
|
||||||
|
|
||||||
push_incoming(remote_addr, remote_end)?;
|
push_incoming(remote_addr, remote_end)?;
|
||||||
Ok(Connected::new(this_end))
|
Ok(Connected::new(this_end))
|
||||||
@ -68,14 +63,6 @@ impl Init {
|
|||||||
self.addr.lock().clone()
|
self.addr.lock().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn is_nonblocking(&self) -> bool {
|
|
||||||
self.is_nonblocking.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn set_nonblocking(&self, is_nonblocking: bool) {
|
|
||||||
self.is_nonblocking.store(is_nonblocking, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
pub(super) fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
||||||
self.pollee.poll(mask, poller)
|
self.pollee.poll(mask, poller)
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
use core::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
|
|
||||||
use keyable_arc::KeyableWeak;
|
use keyable_arc::KeyableWeak;
|
||||||
|
|
||||||
use super::{connected::Connected, endpoint::Endpoint, UnixStreamSocket};
|
use super::{connected::Connected, endpoint::Endpoint, UnixStreamSocket};
|
||||||
@ -18,40 +16,23 @@ use crate::{
|
|||||||
|
|
||||||
pub(super) struct Listener {
|
pub(super) struct Listener {
|
||||||
addr: UnixSocketAddrBound,
|
addr: UnixSocketAddrBound,
|
||||||
is_nonblocking: AtomicBool,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Listener {
|
impl Listener {
|
||||||
pub(super) fn new(
|
pub(super) fn new(addr: UnixSocketAddrBound, backlog: usize) -> Result<Self> {
|
||||||
addr: UnixSocketAddrBound,
|
|
||||||
backlog: usize,
|
|
||||||
nonblocking: bool,
|
|
||||||
) -> Result<Self> {
|
|
||||||
BACKLOG_TABLE.add_backlog(&addr, backlog)?;
|
BACKLOG_TABLE.add_backlog(&addr, backlog)?;
|
||||||
Ok(Self {
|
Ok(Self { addr })
|
||||||
addr,
|
|
||||||
is_nonblocking: AtomicBool::new(nonblocking),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn addr(&self) -> &UnixSocketAddrBound {
|
pub(super) fn addr(&self) -> &UnixSocketAddrBound {
|
||||||
&self.addr
|
&self.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn is_nonblocking(&self) -> bool {
|
pub(super) fn try_accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> {
|
||||||
self.is_nonblocking.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn set_nonblocking(&self, is_nonblocking: bool) {
|
|
||||||
self.is_nonblocking.store(is_nonblocking, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> {
|
|
||||||
let addr = self.addr().clone();
|
let addr = self.addr().clone();
|
||||||
let is_nonblocking = self.is_nonblocking();
|
|
||||||
|
|
||||||
let connected = {
|
let connected = {
|
||||||
let local_endpoint = BACKLOG_TABLE.pop_incoming(is_nonblocking, &addr)?;
|
let local_endpoint = BACKLOG_TABLE.pop_incoming(&addr)?;
|
||||||
Connected::new(local_endpoint)
|
Connected::new(local_endpoint)
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -60,7 +41,7 @@ impl Listener {
|
|||||||
Some(addr) => SocketAddr::from(addr.clone()),
|
Some(addr) => SocketAddr::from(addr.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let socket = Arc::new(UnixStreamSocket::new_connected(connected));
|
let socket = UnixStreamSocket::new_connected(connected, false);
|
||||||
|
|
||||||
Ok((socket, peer_addr))
|
Ok((socket, peer_addr))
|
||||||
}
|
}
|
||||||
@ -118,32 +99,13 @@ impl BacklogTable {
|
|||||||
.ok_or_else(|| Error::with_message(Errno::EINVAL, "the socket is not listened"))
|
.ok_or_else(|| Error::with_message(Errno::EINVAL, "the socket is not listened"))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pop_incoming(&self, nonblocking: bool, addr: &UnixSocketAddrBound) -> Result<Endpoint> {
|
fn pop_incoming(&self, addr: &UnixSocketAddrBound) -> Result<Endpoint> {
|
||||||
let mut poller = Poller::new();
|
let backlog = self.get_backlog(addr)?;
|
||||||
loop {
|
|
||||||
let backlog = self.get_backlog(addr)?;
|
|
||||||
|
|
||||||
if let Some(endpoint) = backlog.pop_incoming() {
|
if let Some(endpoint) = backlog.pop_incoming() {
|
||||||
return Ok(endpoint);
|
Ok(endpoint)
|
||||||
}
|
} else {
|
||||||
|
return_errno_with_message!(Errno::EAGAIN, "no pending connection is available")
|
||||||
if nonblocking {
|
|
||||||
return_errno_with_message!(Errno::EAGAIN, "no connection comes");
|
|
||||||
}
|
|
||||||
|
|
||||||
let events = {
|
|
||||||
let mask = IoEvents::IN;
|
|
||||||
backlog.poll(mask, Some(&mut poller))
|
|
||||||
};
|
|
||||||
|
|
||||||
if events.contains(IoEvents::ERR) | events.contains(IoEvents::HUP) {
|
|
||||||
return_errno_with_message!(Errno::ECONNABORTED, "connection is aborted");
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: deal with accept timeout
|
|
||||||
if events.is_empty() {
|
|
||||||
poller.wait()?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use core::sync::atomic::AtomicBool;
|
||||||
|
|
||||||
|
use atomic::Ordering;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
connected::Connected,
|
connected::Connected,
|
||||||
endpoint::Endpoint,
|
endpoint::Endpoint,
|
||||||
@ -27,87 +31,104 @@ use crate::{
|
|||||||
util::IoVec,
|
util::IoVec,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct UnixStreamSocket(RwLock<State>);
|
pub struct UnixStreamSocket {
|
||||||
|
state: RwLock<State>,
|
||||||
|
is_nonblocking: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
impl UnixStreamSocket {
|
impl UnixStreamSocket {
|
||||||
pub(super) fn new_init(init: Init) -> Self {
|
pub(super) fn new_init(init: Init, is_nonblocking: bool) -> Arc<Self> {
|
||||||
Self(RwLock::new(State::Init(Arc::new(init))))
|
Arc::new(Self {
|
||||||
|
state: RwLock::new(State::Init(init)),
|
||||||
|
is_nonblocking: AtomicBool::new(is_nonblocking),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn new_connected(connected: Connected) -> Self {
|
pub(super) fn new_connected(connected: Connected, is_nonblocking: bool) -> Arc<Self> {
|
||||||
Self(RwLock::new(State::Connected(Arc::new(connected))))
|
Arc::new(Self {
|
||||||
|
state: RwLock::new(State::Connected(connected)),
|
||||||
|
is_nonblocking: AtomicBool::new(is_nonblocking),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum State {
|
enum State {
|
||||||
Init(Arc<Init>),
|
Init(Init),
|
||||||
Listen(Arc<Listener>),
|
Listen(Listener),
|
||||||
Connected(Arc<Connected>),
|
Connected(Connected),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UnixStreamSocket {
|
impl UnixStreamSocket {
|
||||||
pub fn new(nonblocking: bool) -> Self {
|
pub fn new(is_nonblocking: bool) -> Arc<Self> {
|
||||||
let init = Init::new(nonblocking);
|
Self::new_init(Init::new(), is_nonblocking)
|
||||||
Self::new_init(init)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_pair(nonblocking: bool) -> (Arc<Self>, Arc<Self>) {
|
pub fn new_pair(is_nonblocking: bool) -> (Arc<Self>, Arc<Self>) {
|
||||||
let (end_a, end_b) = Endpoint::new_pair(None, None, nonblocking);
|
let (end_a, end_b) = Endpoint::new_pair(None, None);
|
||||||
|
(
|
||||||
let connected_a = {
|
Self::new_connected(Connected::new(end_a), is_nonblocking),
|
||||||
let connected = Connected::new(end_a);
|
Self::new_connected(Connected::new(end_b), is_nonblocking),
|
||||||
Self::new_connected(connected)
|
)
|
||||||
};
|
|
||||||
let connected_b = {
|
|
||||||
let connected = Connected::new(end_b);
|
|
||||||
Self::new_connected(connected)
|
|
||||||
};
|
|
||||||
|
|
||||||
(Arc::new(connected_a), Arc::new(connected_b))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bound_addr(&self) -> Option<UnixSocketAddrBound> {
|
fn bound_addr(&self) -> Option<UnixSocketAddrBound> {
|
||||||
let status = self.0.read();
|
let state = self.state.read();
|
||||||
match &*status {
|
match &*state {
|
||||||
State::Init(init) => init.addr(),
|
State::Init(init) => init.addr(),
|
||||||
State::Listen(listen) => Some(listen.addr().clone()),
|
State::Listen(listen) => Some(listen.addr().clone()),
|
||||||
State::Connected(connected) => connected.addr().cloned(),
|
State::Connected(connected) => connected.addr().cloned(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mask_flags(status_flags: &StatusFlags) -> StatusFlags {
|
fn send(&self, buf: &[u8], flags: SendRecvFlags) -> Result<usize> {
|
||||||
const SUPPORTED_FLAGS: StatusFlags = StatusFlags::O_NONBLOCK;
|
if self.is_nonblocking() {
|
||||||
const UNSUPPORTED_FLAGS: StatusFlags = SUPPORTED_FLAGS.complement();
|
self.try_send(buf, flags)
|
||||||
|
} else {
|
||||||
if status_flags.intersects(UNSUPPORTED_FLAGS) {
|
self.wait_events(IoEvents::OUT, || self.try_send(buf, flags))
|
||||||
warn!("ignore unsupported flags");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
status_flags.intersection(SUPPORTED_FLAGS)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send(&self, buf: &[u8], _flags: SendRecvFlags) -> Result<usize> {
|
fn try_send(&self, buf: &[u8], _flags: SendRecvFlags) -> Result<usize> {
|
||||||
let connected = match &*self.0.read() {
|
match &*self.state.read() {
|
||||||
State::Connected(connected) => connected.clone(),
|
State::Connected(connected) => connected.try_write(buf),
|
||||||
_ => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"),
|
_ => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"),
|
||||||
};
|
}
|
||||||
|
|
||||||
connected.write(buf)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv(&self, buf: &mut [u8], _flags: SendRecvFlags) -> Result<usize> {
|
fn recv(&self, buf: &mut [u8], flags: SendRecvFlags) -> Result<usize> {
|
||||||
let connected = match &*self.0.read() {
|
if self.is_nonblocking() {
|
||||||
State::Connected(connected) => connected.clone(),
|
self.try_recv(buf, flags)
|
||||||
_ => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"),
|
} else {
|
||||||
};
|
self.wait_events(IoEvents::IN, || self.try_recv(buf, flags))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
connected.read(buf)
|
fn try_recv(&self, buf: &mut [u8], _flags: SendRecvFlags) -> Result<usize> {
|
||||||
|
match &*self.state.read() {
|
||||||
|
State::Connected(connected) => connected.try_read(buf),
|
||||||
|
_ => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> {
|
||||||
|
match &*self.state.read() {
|
||||||
|
State::Listen(listen) => listen.try_accept() as _,
|
||||||
|
_ => return_errno_with_message!(Errno::EINVAL, "the socket is not listening"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_nonblocking(&self) -> bool {
|
||||||
|
self.is_nonblocking.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_nonblocking(&self, nonblocking: bool) {
|
||||||
|
self.is_nonblocking.store(nonblocking, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Pollable for UnixStreamSocket {
|
impl Pollable for UnixStreamSocket {
|
||||||
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
fn poll(&self, mask: IoEvents, poller: Option<&mut Poller>) -> IoEvents {
|
||||||
let inner = self.0.read();
|
let inner = self.state.read();
|
||||||
match &*inner {
|
match &*inner {
|
||||||
State::Init(init) => init.poll(mask, poller),
|
State::Init(init) => init.poll(mask, poller),
|
||||||
State::Listen(listen) => listen.poll(mask, poller),
|
State::Listen(listen) => listen.poll(mask, poller),
|
||||||
@ -134,15 +155,7 @@ impl FileLike for UnixStreamSocket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn status_flags(&self) -> StatusFlags {
|
fn status_flags(&self) -> StatusFlags {
|
||||||
let inner = self.0.read();
|
if self.is_nonblocking() {
|
||||||
let is_nonblocking = match &*inner {
|
|
||||||
State::Init(init) => init.is_nonblocking(),
|
|
||||||
State::Listen(listen) => listen.is_nonblocking(),
|
|
||||||
State::Connected(connected) => connected.is_nonblocking(),
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: when we fully support O_ASYNC, return the flag
|
|
||||||
if is_nonblocking {
|
|
||||||
StatusFlags::O_NONBLOCK
|
StatusFlags::O_NONBLOCK
|
||||||
} else {
|
} else {
|
||||||
StatusFlags::empty()
|
StatusFlags::empty()
|
||||||
@ -150,17 +163,7 @@ impl FileLike for UnixStreamSocket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> {
|
fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> {
|
||||||
let is_nonblocking = {
|
self.set_nonblocking(new_flags.contains(StatusFlags::O_NONBLOCK));
|
||||||
let supported_flags = Self::mask_flags(&new_flags);
|
|
||||||
supported_flags.contains(StatusFlags::O_NONBLOCK)
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut inner = self.0.write();
|
|
||||||
match &mut *inner {
|
|
||||||
State::Init(init) => init.set_nonblocking(is_nonblocking),
|
|
||||||
State::Listen(listen) => listen.set_nonblocking(is_nonblocking),
|
|
||||||
State::Connected(connected) => connected.set_nonblocking(is_nonblocking),
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -169,16 +172,14 @@ impl Socket for UnixStreamSocket {
|
|||||||
fn bind(&self, socket_addr: SocketAddr) -> Result<()> {
|
fn bind(&self, socket_addr: SocketAddr) -> Result<()> {
|
||||||
let addr = UnixSocketAddr::try_from(socket_addr)?;
|
let addr = UnixSocketAddr::try_from(socket_addr)?;
|
||||||
|
|
||||||
let init = match &*self.0.read() {
|
match &*self.state.read() {
|
||||||
State::Init(init) => init.clone(),
|
State::Init(init) => init.bind(&addr),
|
||||||
_ => return_errno_with_message!(
|
_ => return_errno_with_message!(
|
||||||
Errno::EINVAL,
|
Errno::EINVAL,
|
||||||
"cannot bind a listening or connected socket"
|
"cannot bind a listening or connected socket"
|
||||||
),
|
),
|
||||||
// FIXME: Maybe binding a connected socket should also be allowed?
|
// FIXME: Maybe binding a connected socket should also be allowed?
|
||||||
};
|
}
|
||||||
|
|
||||||
init.bind(&addr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn connect(&self, socket_addr: SocketAddr) -> Result<()> {
|
fn connect(&self, socket_addr: SocketAddr) -> Result<()> {
|
||||||
@ -195,23 +196,27 @@ impl Socket for UnixStreamSocket {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let init = match &*self.0.read() {
|
let connected = match &*self.state.read() {
|
||||||
State::Init(init) => init.clone(),
|
State::Init(init) => init.connect(&remote_addr)?,
|
||||||
State::Listen(_) => return_errno_with_message!(Errno::EINVAL, "the socket is listened"),
|
State::Listen(_) => return_errno_with_message!(Errno::EINVAL, "the socket is listened"),
|
||||||
State::Connected(_) => {
|
State::Connected(_) => {
|
||||||
return_errno_with_message!(Errno::EISCONN, "the socket is connected")
|
return_errno_with_message!(Errno::EISCONN, "the socket is connected")
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let connected = init.connect(&remote_addr)?;
|
*self.state.write() = State::Connected(connected);
|
||||||
|
|
||||||
*self.0.write() = State::Connected(Arc::new(connected));
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn listen(&self, backlog: usize) -> Result<()> {
|
fn listen(&self, backlog: usize) -> Result<()> {
|
||||||
let init = match &*self.0.read() {
|
let addr = match &*self.state.read() {
|
||||||
State::Init(init) => init.clone(),
|
State::Init(init) => init
|
||||||
|
.addr()
|
||||||
|
.ok_or(Error::with_message(
|
||||||
|
Errno::EINVAL,
|
||||||
|
"the socket is not bound",
|
||||||
|
))?
|
||||||
|
.clone(),
|
||||||
State::Listen(_) => {
|
State::Listen(_) => {
|
||||||
return_errno_with_message!(Errno::EINVAL, "the socket is already listening")
|
return_errno_with_message!(Errno::EINVAL, "the socket is already listening")
|
||||||
}
|
}
|
||||||
@ -220,36 +225,28 @@ impl Socket for UnixStreamSocket {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let addr = init.addr().ok_or(Error::with_message(
|
let listener = Listener::new(addr, backlog)?;
|
||||||
Errno::EINVAL,
|
*self.state.write() = State::Listen(listener);
|
||||||
"the socket is not bound",
|
|
||||||
))?;
|
|
||||||
|
|
||||||
let listener = Listener::new(addr.clone(), backlog, init.is_nonblocking())?;
|
|
||||||
*self.0.write() = State::Listen(Arc::new(listener));
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> {
|
fn accept(&self) -> Result<(Arc<dyn FileLike>, SocketAddr)> {
|
||||||
let listen = match &*self.0.read() {
|
if self.is_nonblocking() {
|
||||||
State::Listen(listen) => listen.clone(),
|
self.try_accept()
|
||||||
_ => return_errno_with_message!(Errno::EINVAL, "the socket is not listening"),
|
} else {
|
||||||
};
|
self.wait_events(IoEvents::IN, || self.try_accept())
|
||||||
|
}
|
||||||
listen.accept()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
|
fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> {
|
||||||
let connected = match &*self.0.read() {
|
match &*self.state.read() {
|
||||||
State::Connected(connected) => connected.clone(),
|
State::Connected(connected) => connected.shutdown(cmd),
|
||||||
_ => return_errno_with_message!(Errno::ENOTCONN, "the socked is not connected"),
|
_ => return_errno_with_message!(Errno::ENOTCONN, "the socked is not connected"),
|
||||||
};
|
}
|
||||||
|
|
||||||
connected.shutdown(cmd)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addr(&self) -> Result<SocketAddr> {
|
fn addr(&self) -> Result<SocketAddr> {
|
||||||
let addr = match &*self.0.read() {
|
let addr = match &*self.state.read() {
|
||||||
State::Init(init) => init.addr(),
|
State::Init(init) => init.addr(),
|
||||||
State::Listen(listen) => Some(listen.addr().clone()),
|
State::Listen(listen) => Some(listen.addr().clone()),
|
||||||
State::Connected(connected) => connected.addr().cloned(),
|
State::Connected(connected) => connected.addr().cloned(),
|
||||||
@ -263,14 +260,14 @@ impl Socket for UnixStreamSocket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn peer_addr(&self) -> Result<SocketAddr> {
|
fn peer_addr(&self) -> Result<SocketAddr> {
|
||||||
let connected = match &*self.0.read() {
|
let peer_addr = match &*self.state.read() {
|
||||||
State::Connected(connected) => connected.clone(),
|
State::Connected(connected) => connected.peer_addr().cloned(),
|
||||||
_ => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"),
|
_ => return_errno_with_message!(Errno::ENOTCONN, "the socket is not connected"),
|
||||||
};
|
};
|
||||||
|
|
||||||
match connected.peer_addr() {
|
match peer_addr {
|
||||||
None => Ok(SocketAddr::Unix(UnixSocketAddr::Path(String::new()))),
|
None => Ok(SocketAddr::Unix(UnixSocketAddr::Path(String::new()))),
|
||||||
Some(peer_addr) => Ok(SocketAddr::from(peer_addr.clone())),
|
Some(peer_addr) => Ok(SocketAddr::from(peer_addr)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -323,7 +320,7 @@ impl Drop for UnixStreamSocket {
|
|||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
if let State::Listen(_) = &*self.0.read() {
|
if let State::Listen(_) = &*self.state.read() {
|
||||||
unregister_backlog(&bound_addr);
|
unregister_backlog(&bound_addr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ pub fn sys_socket(domain: i32, type_: i32, protocol: i32) -> Result<SyscallRetur
|
|||||||
let nonblocking = sock_flags.contains(SockFlags::SOCK_NONBLOCK);
|
let nonblocking = sock_flags.contains(SockFlags::SOCK_NONBLOCK);
|
||||||
let file_like = match (domain, sock_type, protocol) {
|
let file_like = match (domain, sock_type, protocol) {
|
||||||
(CSocketAddrFamily::AF_UNIX, SockType::SOCK_STREAM, _) => {
|
(CSocketAddrFamily::AF_UNIX, SockType::SOCK_STREAM, _) => {
|
||||||
Arc::new(UnixStreamSocket::new(nonblocking)) as Arc<dyn FileLike>
|
UnixStreamSocket::new(nonblocking) as Arc<dyn FileLike>
|
||||||
}
|
}
|
||||||
(
|
(
|
||||||
CSocketAddrFamily::AF_INET,
|
CSocketAddrFamily::AF_INET,
|
||||||
|
Reference in New Issue
Block a user