diff --git a/services/libs/jinux-std/src/fs/utils/dentry.rs b/services/libs/jinux-std/src/fs/utils/dentry.rs index f4823b030..89957a7a0 100644 --- a/services/libs/jinux-std/src/fs/utils/dentry.rs +++ b/services/libs/jinux-std/src/fs/utils/dentry.rs @@ -5,7 +5,7 @@ use alloc::string::String; use core::sync::atomic::{AtomicU32, Ordering}; use core::time::Duration; -use super::{FileSystem, Inode, InodeMode, InodeType, Metadata, Vnode, NAME_MAX}; +use super::{FileSystem, Inode, InodeMode, InodeType, Metadata, MountNode, Vnode, NAME_MAX}; lazy_static! { static ref DCACHE: Mutex>> = Mutex::new(BTreeMap::new()); @@ -162,6 +162,8 @@ impl Dentry { /// Get the mount node which the dentry belongs to. pub fn mount_node(&self) -> Arc { self.mount_node.upgrade().unwrap() + } + pub fn inode(&self) -> Weak { self.vnode.inode() } diff --git a/services/libs/jinux-std/src/net/socket/unix/addr.rs b/services/libs/jinux-std/src/net/socket/unix/addr.rs new file mode 100644 index 000000000..d3ebcbe72 --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/addr.rs @@ -0,0 +1,88 @@ +use crate::{ + fs::{ + fs_resolver::{split_path, FsPath}, + utils::{Dentry, InodeMode, InodeType}, + }, + net::socket::util::sockaddr::SocketAddr, + prelude::*, +}; + +#[derive(Clone)] +pub enum UnixSocketAddr { + Bound(Arc), + Unbound(String), +} + +impl TryFrom for UnixSocketAddr { + type Error = Error; + + fn try_from(value: SocketAddr) -> Result { + let SocketAddr::Unix(path) = value else { + return_errno_with_message!(Errno::EINVAL, "Invalid unix socket addr") + }; + Ok(Self::Unbound(path)) + } +} + +impl From for SocketAddr { + fn from(value: UnixSocketAddr) -> Self { + SocketAddr::Unix(value.path()) + } +} + +impl UnixSocketAddr { + pub fn create_file_and_bind(&mut self) -> Result<()> { + let Self::Unbound(path) = self else { + return_errno_with_message!(Errno::EINVAL, "the addr is already bound"); + }; + + let (parent_pathname, file_name) = split_path(path); + let parent = { + let current = current!(); + let fs = current.fs().read(); + let parent_path = FsPath::try_from(parent_pathname)?; + fs.lookup(&parent_path)? + }; + let dentry = parent.create( + file_name, + InodeType::Socket, + InodeMode::S_IRUSR | InodeMode::S_IWUSR, + )?; + *self = Self::Bound(dentry); + Ok(()) + } + + /// The dentry. If self is bound, return the bound dentry, otherwise lookup dentry in file system. + pub fn dentry(&self) -> Result> { + match self { + UnixSocketAddr::Bound(dentry) => Ok(dentry.clone()), + UnixSocketAddr::Unbound(path) => { + let dentry = { + let current = current!(); + let fs = current.fs().read(); + let fs_path = FsPath::try_from(path.as_str())?; + fs.lookup(&fs_path)? + }; + + if dentry.inode_type() != InodeType::Socket { + return_errno_with_message!(Errno::EACCES, "not a socket file") + } + + if !dentry.inode_mode().is_readable() || !dentry.inode_mode().is_writable() { + return_errno_with_message!( + Errno::EACCES, + "the socket cannot be read or written" + ) + } + return Ok(dentry); + } + } + } + + pub fn path(&self) -> String { + match self { + UnixSocketAddr::Bound(dentry) => dentry.abs_path(), + UnixSocketAddr::Unbound(path) => path.clone(), + } + } +} diff --git a/services/libs/jinux-std/src/net/socket/unix/mod.rs b/services/libs/jinux-std/src/net/socket/unix/mod.rs new file mode 100644 index 000000000..0d54d2de8 --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/mod.rs @@ -0,0 +1,5 @@ +mod addr; +mod stream; + +pub use addr::UnixSocketAddr; +pub use stream::UnixStreamSocket; diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/connected.rs b/services/libs/jinux-std/src/net/socket/unix/stream/connected.rs new file mode 100644 index 000000000..195dff259 --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/stream/connected.rs @@ -0,0 +1,55 @@ +use crate::{ + net::socket::{unix::addr::UnixSocketAddr, SockShutdownCmd}, + prelude::*, +}; + +use super::endpoint::Endpoint; + +pub struct Connected { + local_endpoint: Arc, + // The peer addr is None if peer is unnamed. + // FIXME: can a socket be bound after the socket is connected? + peer_addr: Option, +} + +impl Connected { + pub fn new(local_endpoint: Arc) -> Self { + let peer_addr = local_endpoint.peer_addr(); + Connected { + local_endpoint, + peer_addr, + } + } + + pub fn addr(&self) -> Option { + self.local_endpoint.addr() + } + + pub fn peer_addr(&self) -> Option<&UnixSocketAddr> { + self.peer_addr.as_ref() + } + + pub fn is_bound(&self) -> bool { + self.addr().is_some() + } + + pub fn write(&self, buf: &[u8]) -> Result { + self.local_endpoint.write(buf) + } + + pub fn read(&self, buf: &mut [u8]) -> Result { + self.local_endpoint.read(buf) + } + + pub fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { + self.local_endpoint.shutdown(cmd) + } + + pub fn is_nonblocking(&self) -> bool { + self.local_endpoint.is_nonblocking() + } + + pub fn set_nonblocking(&self, is_nonblocking: bool) { + self.local_endpoint.set_nonblocking(is_nonblocking).unwrap(); + } +} diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/endpoint.rs b/services/libs/jinux-std/src/net/socket/unix/stream/endpoint.rs new file mode 100644 index 000000000..6955eba9a --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/stream/endpoint.rs @@ -0,0 +1,123 @@ +use crate::{ + fs::utils::{Channel, Consumer, IoEvents, Poller, Producer, StatusFlags}, + net::socket::{unix::addr::UnixSocketAddr, SockShutdownCmd}, + prelude::*, +}; + +pub struct Endpoint(Inner); + +struct Inner { + addr: RwLock>, + reader: Consumer, + writer: Producer, + peer: Weak, +} + +impl Endpoint { + pub fn end_pair(is_nonblocking: bool) -> Result<(Arc, Arc)> { + let flags = if is_nonblocking { + StatusFlags::O_NONBLOCK + } else { + StatusFlags::empty() + }; + let (writer_a, reader_b) = + Channel::with_capacity_and_flags(DAFAULT_BUF_SIZE, flags)?.split(); + let (writer_b, reader_a) = + Channel::with_capacity_and_flags(DAFAULT_BUF_SIZE, flags)?.split(); + let mut endpoint_b = None; + let endpoint_a = Arc::new_cyclic(|endpoint_a_ref| { + let peer = Arc::new(Endpoint::new(reader_b, writer_b, endpoint_a_ref.clone())); + let endpoint_a = Endpoint::new(reader_a, writer_a, Arc::downgrade(&peer)); + endpoint_b = Some(peer); + endpoint_a + }); + Ok((endpoint_a, endpoint_b.unwrap())) + } + + fn new(reader: Consumer, writer: Producer, peer: Weak) -> Self { + Self(Inner { + addr: RwLock::new(None), + reader, + writer, + peer, + }) + } + + pub fn addr(&self) -> Option { + self.0.addr.read().clone() + } + + pub fn set_addr(&self, addr: UnixSocketAddr) { + *self.0.addr.write() = Some(addr); + } + + pub fn peer_addr(&self) -> Option { + self.0.peer.upgrade().map(|peer| peer.addr()).flatten() + } + + pub fn is_nonblocking(&self) -> bool { + let reader_status = self.0.reader.is_nonblocking(); + let writer_status = self.0.writer.is_nonblocking(); + debug_assert!(reader_status == writer_status); + reader_status + } + + pub fn set_nonblocking(&self, is_nonblocking: bool) -> Result<()> { + let reader_flags = self.0.reader.status_flags(); + self.0 + .reader + .set_status_flags(reader_flags | StatusFlags::O_NONBLOCK)?; + let writer_flags = self.0.writer.status_flags(); + self.0 + .writer + .set_status_flags(writer_flags | StatusFlags::O_NONBLOCK)?; + Ok(()) + } + + pub fn read(&self, buf: &mut [u8]) -> Result { + self.0.reader.read(buf) + } + + pub fn write(&self, buf: &[u8]) -> Result { + self.0.writer.write(buf) + } + + pub fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { + if !self.is_connected() { + return_errno_with_message!(Errno::ENOTCONN, "The socket is not connected."); + } + + if cmd.shut_read() { + self.0.reader.shutdown(); + } + + if cmd.shut_write() { + self.0.writer.shutdown(); + } + + Ok(()) + } + + pub fn is_connected(&self) -> bool { + self.0.peer.upgrade().is_some() + } + + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + let mut events = IoEvents::empty(); + // FIXME: should reader and writer use the same mask? + let reader_events = self.0.reader.poll(mask, poller); + let writer_events = self.0.writer.poll(mask, poller); + + if reader_events.contains(IoEvents::HUP) || self.0.reader.is_shutdown() { + events |= IoEvents::RDHUP | IoEvents::IN; + if writer_events.contains(IoEvents::ERR) || self.0.writer.is_shutdown() { + events |= IoEvents::HUP | IoEvents::OUT; + } + } + + events |= (reader_events & IoEvents::IN) | (writer_events & IoEvents::OUT); + events + } +} + +const DAFAULT_BUF_SIZE: usize = 4096; diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/init.rs b/services/libs/jinux-std/src/net/socket/unix/stream/init.rs new file mode 100644 index 000000000..68ffca23c --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/stream/init.rs @@ -0,0 +1,50 @@ +use core::sync::atomic::{AtomicBool, Ordering}; + +use crate::fs::utils::{IoEvents, Pollee, Poller}; +use crate::net::socket::unix::addr::UnixSocketAddr; +use crate::prelude::*; + +pub struct Init { + is_nonblocking: AtomicBool, + bind_addr: Option, + pollee: Pollee, +} + +impl Init { + pub fn new(is_nonblocking: bool) -> Self { + Self { + is_nonblocking: AtomicBool::new(is_nonblocking), + bind_addr: None, + pollee: Pollee::new(IoEvents::empty()), + } + } + + pub fn bind(&mut self, mut addr: UnixSocketAddr) -> Result<()> { + if self.bind_addr.is_some() { + return_errno_with_message!(Errno::EINVAL, "the socket is already bound"); + } + addr.create_file_and_bind()?; + self.bind_addr = Some(addr); + Ok(()) + } + + pub fn is_bound(&self) -> bool { + self.bind_addr.is_none() + } + + pub fn bound_addr(&self) -> Option<&UnixSocketAddr> { + self.bind_addr.as_ref() + } + + pub fn is_nonblocking(&self) -> bool { + self.is_nonblocking.load(Ordering::Acquire) + } + + pub fn set_nonblocking(&self, is_nonblocking: bool) { + self.is_nonblocking.store(is_nonblocking, Ordering::Release); + } + + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + self.pollee.poll(mask, poller) + } +} diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/listen.rs b/services/libs/jinux-std/src/net/socket/unix/stream/listen.rs new file mode 100644 index 000000000..8f5f6644a --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/stream/listen.rs @@ -0,0 +1,29 @@ +use core::sync::atomic::{AtomicBool, Ordering}; + +use crate::net::socket::unix::addr::UnixSocketAddr; + +pub struct Listen { + addr: UnixSocketAddr, + is_nonblocking: AtomicBool, +} + +impl Listen { + pub fn new(addr: UnixSocketAddr, nonblocking: bool) -> Self { + Self { + addr, + is_nonblocking: AtomicBool::new(nonblocking), + } + } + + pub fn addr(&self) -> &UnixSocketAddr { + &self.addr + } + + pub fn is_nonblocking(&self) -> bool { + self.is_nonblocking.load(Ordering::Acquire) + } + + pub fn set_nonblocking(&self, is_nonblocking: bool) { + self.is_nonblocking.store(is_nonblocking, Ordering::Release); + } +} diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/listener.rs b/services/libs/jinux-std/src/net/socket/unix/stream/listener.rs new file mode 100644 index 000000000..210ed21a0 --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/stream/listener.rs @@ -0,0 +1,151 @@ +use keyable_arc::KeyableWeak; +use spin::RwLockReadGuard; + +use crate::{ + fs::utils::{Inode, IoEvents, Pollee, Poller}, + net::socket::unix::addr::UnixSocketAddr, + prelude::*, +}; + +use super::endpoint::Endpoint; + +pub static ACTIVE_LISTENERS: ActiveListeners = ActiveListeners::new(); + +pub struct ActiveListeners { + listeners: RwLock, Arc>>, + // TODO: For linux, there is also abstract socket domain that a socket addr is not bound to an inode. +} + +impl ActiveListeners { + pub const fn new() -> Self { + Self { + listeners: RwLock::new(BTreeMap::new()), + } + } + + pub(super) fn add_listener(&self, addr: &UnixSocketAddr, backlog: usize) -> Result<()> { + let inode = create_keyable_inode(addr)?; + let mut listeners = self.listeners.write(); + if listeners.contains_key(&inode) { + return_errno_with_message!(Errno::EADDRINUSE, "the addr is already used"); + } + let new_listener = Arc::new(Listener::new(backlog)); + listeners.insert(inode, new_listener); + Ok(()) + } + + pub(super) fn get_listener(&self, addr: &UnixSocketAddr) -> Result> { + let listeners = self.listeners.read(); + get_listener(&listeners, addr) + } + + pub(super) fn pop_incoming( + &self, + nonblocking: bool, + addr: &UnixSocketAddr, + ) -> Result> { + let poller = Poller::new(); + loop { + let listener = { + let listeners = self.listeners.read(); + get_listener(&listeners, addr)? + }; + if let Some(endpoint) = listener.pop_incoming() { + return Ok(endpoint); + } + if nonblocking { + return_errno_with_message!(Errno::EAGAIN, "no connection comes"); + } + let events = { + let mask = IoEvents::IN; + listener.poll(mask, Some(&poller)) + }; + if events.contains(IoEvents::ERR) | events.contains(IoEvents::HUP) { + return_errno_with_message!(Errno::EINVAL, "connection is refused"); + } + if events.is_empty() { + poller.wait(); + } + } + } + + pub(super) fn push_incoming( + &self, + addr: &UnixSocketAddr, + endpoint: Arc, + ) -> Result<()> { + let listeners = self.listeners.read(); + let listener = get_listener(&listeners, addr).map_err(|_| { + Error::with_message( + Errno::ECONNREFUSED, + "no socket is listened at the remote address", + ) + })?; + listener.push_incoming(endpoint) + } + + pub(super) fn remove_listener(&self, addr: &UnixSocketAddr) { + let Ok(inode) = create_keyable_inode(addr) else { + return; + }; + self.listeners.write().remove(&inode); + } +} + +fn get_listener( + listeners: &RwLockReadGuard, Arc>>, + addr: &UnixSocketAddr, +) -> Result> { + let dentry = create_keyable_inode(addr)?; + listeners + .get(&dentry) + .map(Arc::clone) + .ok_or_else(|| Error::with_message(Errno::EINVAL, "the socket is not listened")) +} + +pub(super) struct Listener { + pollee: Pollee, + backlog: usize, + incoming_endpoints: Mutex>>, +} + +impl Listener { + pub fn new(backlog: usize) -> Self { + Self { + pollee: Pollee::new(IoEvents::empty()), + backlog, + incoming_endpoints: Mutex::new(VecDeque::with_capacity(backlog)), + } + } + + pub fn push_incoming(&self, endpoint: Arc) -> Result<()> { + let mut endpoints = self.incoming_endpoints.lock(); + if endpoints.len() >= self.backlog { + return_errno_with_message!(Errno::ECONNREFUSED, "incoming_endpoints is full"); + } + endpoints.push_back(endpoint); + self.pollee.add_events(IoEvents::IN); + Ok(()) + } + + pub fn pop_incoming(&self) -> Option> { + let mut incoming_endpoints = self.incoming_endpoints.lock(); + let endpoint = incoming_endpoints.pop_front(); + if endpoint.is_none() { + self.pollee.del_events(IoEvents::IN); + } + endpoint + } + + pub fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + // Lock to avoid any events may change pollee state when we poll + let _lock = self.incoming_endpoints.lock(); + self.pollee.poll(mask, poller) + } +} + +fn create_keyable_inode(addr: &UnixSocketAddr) -> Result> { + let dentry = addr.dentry()?; + let inode = dentry.inode(); + Ok(KeyableWeak::from(inode)) +} diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/mod.rs b/services/libs/jinux-std/src/net/socket/unix/stream/mod.rs new file mode 100644 index 000000000..498b7c0eb --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/stream/mod.rs @@ -0,0 +1,9 @@ +mod connected; +mod endpoint; +mod init; +mod listen; +mod listener; +pub mod stream; + +pub use listener::{ActiveListeners, ACTIVE_LISTENERS}; +pub use stream::UnixStreamSocket; diff --git a/services/libs/jinux-std/src/net/socket/unix/stream/stream.rs b/services/libs/jinux-std/src/net/socket/unix/stream/stream.rs new file mode 100644 index 000000000..755dc136e --- /dev/null +++ b/services/libs/jinux-std/src/net/socket/unix/stream/stream.rs @@ -0,0 +1,287 @@ +use crate::fs::file_handle::FileLike; +use crate::fs::utils::{IoEvents, Poller, StatusFlags}; +use crate::net::socket::unix::addr::UnixSocketAddr; +use crate::net::socket::util::send_recv_flags::SendRecvFlags; +use crate::net::socket::util::sockaddr::SocketAddr; +use crate::net::socket::{SockShutdownCmd, Socket}; +use crate::prelude::*; + +use super::connected::Connected; +use super::endpoint::Endpoint; +use super::init::Init; +use super::listen::Listen; +use super::ACTIVE_LISTENERS; + +pub struct UnixStreamSocket(RwLock); + +enum Status { + Init(Init), + Listen(Listen), + Connected(Connected), +} + +impl UnixStreamSocket { + pub fn new(nonblocking: bool) -> Self { + let status = Status::Init(Init::new(nonblocking)); + Self(RwLock::new(status)) + } + + pub fn new_pair(nonblocking: bool) -> Result<(Arc, Arc)> { + let (end_a, end_b) = Endpoint::end_pair(nonblocking)?; + let connected_a = UnixStreamSocket(RwLock::new(Status::Connected(Connected::new(end_a)))); + let connected_b = UnixStreamSocket(RwLock::new(Status::Connected(Connected::new(end_b)))); + Ok((Arc::new(connected_a), Arc::new(connected_b))) + } + + fn bound_addr(&self) -> Option { + let status = self.0.read(); + match &*status { + Status::Init(init) => init.bound_addr().map(Clone::clone), + Status::Listen(listen) => Some(listen.addr().clone()), + Status::Connected(connected) => connected.addr(), + } + } + + fn supported_flags(status_flags: &StatusFlags) -> StatusFlags { + const SUPPORTED_FLAGS: StatusFlags = StatusFlags::O_NONBLOCK; + const UNSUPPORTED_FLAGS: StatusFlags = SUPPORTED_FLAGS.complement(); + + if status_flags.intersects(UNSUPPORTED_FLAGS) { + warn!("ignore unsupported flags"); + } + + status_flags.intersection(SUPPORTED_FLAGS) + } +} + +impl FileLike for UnixStreamSocket { + fn as_socket(&self) -> Option<&dyn Socket> { + Some(self) + } + + fn read(&self, buf: &mut [u8]) -> Result { + self.recvfrom(buf, SendRecvFlags::empty()) + .map(|(read_size, _)| read_size) + } + + fn write(&self, buf: &[u8]) -> Result { + self.sendto(buf, None, SendRecvFlags::empty()) + } + + fn poll(&self, mask: IoEvents, poller: Option<&Poller>) -> IoEvents { + let inner = self.0.read(); + match &*inner { + Status::Init(init) => init.poll(mask, poller), + Status::Listen(listen) => { + let addr = listen.addr(); + let listener = ACTIVE_LISTENERS.get_listener(addr).unwrap(); + listener.poll(mask, poller) + } + Status::Connected(connet) => todo!(), + } + } + + fn status_flags(&self) -> StatusFlags { + let inner = self.0.read(); + let is_nonblocking = match &*inner { + Status::Init(init) => init.is_nonblocking(), + Status::Listen(listen) => listen.is_nonblocking(), + Status::Connected(connected) => connected.is_nonblocking(), + }; + + if is_nonblocking { + StatusFlags::O_NONBLOCK + } else { + StatusFlags::empty() + } + } + + fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> { + let is_nonblocking = { + let supported_flags = Self::supported_flags(&new_flags); + supported_flags.contains(StatusFlags::O_NONBLOCK) + }; + + let mut inner = self.0.write(); + match &mut *inner { + Status::Init(init) => init.set_nonblocking(is_nonblocking), + Status::Listen(listen) => listen.set_nonblocking(is_nonblocking), + Status::Connected(connected) => connected.set_nonblocking(is_nonblocking), + } + Ok(()) + } +} + +impl Socket for UnixStreamSocket { + fn bind(&self, sockaddr: SocketAddr) -> Result<()> { + let addr = UnixSocketAddr::try_from(sockaddr)?; + let mut inner = self.0.write(); + match &mut *inner { + Status::Init(init) => init.bind(addr), + Status::Listen(_) | Status::Connected(_) => { + return_errno_with_message!( + Errno::EINVAL, + "cannot bind a listening or connected socket" + ); + } // FIXME: Maybe binding a connected sockted should also be allowed? + } + } + + fn connect(&self, sockaddr: SocketAddr) -> Result<()> { + let mut inner = self.0.write(); + match &*inner { + Status::Init(init) => { + let remote_addr = UnixSocketAddr::try_from(sockaddr)?; + let addr = init.bound_addr(); + if let Some(addr) = addr { + if addr.path() == remote_addr.path() { + return_errno_with_message!( + Errno::EINVAL, + "try to connect to self is invalid" + ); + } + } + let (this_end, remote_end) = Endpoint::end_pair(init.is_nonblocking())?; + remote_end.set_addr(remote_addr.clone()); + if let Some(addr) = addr { + this_end.set_addr(addr.clone()); + }; + ACTIVE_LISTENERS.push_incoming(&remote_addr, remote_end)?; + *inner = Status::Connected(Connected::new(this_end)); + Ok(()) + } + Status::Listen(_) => { + return_errno_with_message!(Errno::EINVAL, "the socket is listened") + } + Status::Connected(_) => { + return_errno_with_message!(Errno::EISCONN, "the socket is connected") + } + } + } + + fn listen(&self, backlog: usize) -> Result<()> { + let mut inner = self.0.write(); + match &*inner { + Status::Init(init) => { + let addr = init.bound_addr().ok_or(Error::with_message( + Errno::EINVAL, + "the socket is not bound", + ))?; + ACTIVE_LISTENERS.add_listener(addr, backlog)?; + *inner = Status::Listen(Listen::new(addr.clone(), init.is_nonblocking())); + return Ok(()); + } + Status::Listen(_) => { + return_errno_with_message!(Errno::EINVAL, "the socket is already listened") + } + Status::Connected(_) => { + return_errno_with_message!(Errno::EINVAL, "the socket is already connected") + } + }; + } + + fn accept(&self) -> Result<(Arc, SocketAddr)> { + let inner = self.0.read(); + match &*inner { + Status::Listen(listen) => { + let is_nonblocking = listen.is_nonblocking(); + let addr = listen.addr().clone(); + drop(inner); + // Avoid lock when waiting + let connected = { + let local_endpoint = ACTIVE_LISTENERS.pop_incoming(is_nonblocking, &addr)?; + Connected::new(local_endpoint) + }; + + let peer_addr = match connected.peer_addr() { + None => SocketAddr::Unix(String::new()), + Some(addr) => SocketAddr::from(addr.clone()), + }; + + let socket = UnixStreamSocket(RwLock::new(Status::Connected(connected))); + return Ok((Arc::new(socket), peer_addr)); + } + Status::Connected(_) | Status::Init(_) => { + return_errno_with_message!(Errno::EINVAL, "the socket is not listened") + } + } + } + + fn shutdown(&self, cmd: SockShutdownCmd) -> Result<()> { + let inner = self.0.read(); + if let Status::Connected(connected) = &*inner { + connected.shutdown(cmd) + } else { + return_errno_with_message!(Errno::ENOTCONN, "the socked is not connected"); + } + } + + fn addr(&self) -> Result { + let inner = self.0.read(); + let addr = match &*inner { + Status::Init(init) => init.bound_addr().map(Clone::clone), + Status::Listen(listen) => Some(listen.addr().clone()), + Status::Connected(connected) => connected.addr(), + }; + addr.map(Into::::into) + .ok_or(Error::with_message( + Errno::EINVAL, + "the socket does not bind to addr", + )) + } + + fn peer_addr(&self) -> Result { + let inner = self.0.read(); + if let Status::Connected(connected) = &*inner { + match connected.peer_addr() { + None => return Ok(SocketAddr::Unix(String::new())), + Some(peer_addr) => { + return Ok(SocketAddr::from(peer_addr.clone())); + } + } + } + return_errno_with_message!(Errno::EINVAL, "the socket is not connected"); + } + + fn recvfrom(&self, buf: &mut [u8], flags: SendRecvFlags) -> Result<(usize, SocketAddr)> { + let inner = self.0.read(); + // TODO: deal with flags + match &*inner { + Status::Connected(connected) => { + let read_size = connected.read(buf)?; + let peer_addr = self.peer_addr()?; + Ok((read_size, peer_addr)) + } + Status::Init(_) | Status::Listen(_) => { + return_errno_with_message!(Errno::EINVAL, "the socket is not connected") + } + } + } + + fn sendto( + &self, + buf: &[u8], + remote: Option, + flags: SendRecvFlags, + ) -> Result { + debug_assert!(remote.is_none()); + // TODO: deal with flags + let inner = self.0.read(); + match &*inner { + Status::Connected(connected) => connected.write(buf), + Status::Init(_) | Status::Listen(_) => { + return_errno_with_message!(Errno::EINVAL, "the socket is not connected") + } + } + } +} + +impl Drop for UnixStreamSocket { + fn drop(&mut self) { + let Some(bound_addr) = self.bound_addr() else { + return; + }; + + ACTIVE_LISTENERS.remove_listener(&bound_addr); + } +} diff --git a/services/libs/jinux-std/src/util/net/addr.rs b/services/libs/jinux-std/src/util/net/addr.rs new file mode 100644 index 000000000..8228888f2 --- /dev/null +++ b/services/libs/jinux-std/src/util/net/addr.rs @@ -0,0 +1,285 @@ +use crate::net::iface::Ipv4Address; +use crate::net::socket::SocketAddr; +use crate::prelude::*; +use crate::util::{read_bytes_from_user, read_val_from_user, write_val_to_user}; + +pub fn read_socket_addr_from_user(addr: Vaddr, addr_len: usize) -> Result { + debug_assert!(addr_len >= core::mem::size_of::()); + let sockaddr: SockAddr = read_val_from_user(addr)?; + let socket_addr = match sockaddr.sa_family()? { + SaFamily::AF_UNSPEC => { + return_errno_with_message!(Errno::EINVAL, "the socket addr family is unspecified") + } + SaFamily::AF_UNIX => { + debug_assert!(addr_len >= core::mem::size_of::()); + let sa_family: u16 = read_val_from_user(addr)?; + debug_assert!(sa_family == SaFamily::AF_UNIX as u16); + + let bytes = { + let bytes_len = addr_len - core::mem::size_of::(); + let mut bytes = vec![0u8; bytes_len]; + read_bytes_from_user(addr + core::mem::size_of::(), &mut bytes)?; + bytes + }; + + let path = { + let cstr = CStr::from_bytes_until_nul(&bytes)?; + cstr.to_string_lossy().to_string() + }; + + SocketAddr::Unix(path) + } + SaFamily::AF_INET => { + debug_assert!(addr_len >= core::mem::size_of::()); + let sock_addr_in: SockAddrInet = read_val_from_user(addr)?; + SocketAddr::from(sock_addr_in) + } + SaFamily::AF_INET6 => { + debug_assert!(addr_len >= core::mem::size_of::()); + let sock_addr_in6: SockAddrInet6 = read_val_from_user(addr)?; + todo!() + } + _ => { + return_errno_with_message!(Errno::EAFNOSUPPORT, "cannot support address for the family") + } + }; + Ok(socket_addr) +} + +pub fn write_socket_addr_to_user( + socket_addr: &SocketAddr, + dest: Vaddr, + max_len: usize, +) -> Result { + match socket_addr { + SocketAddr::Unix(path) => { + let sock_addr_unix = SockAddrUnix::try_from(path.as_str())?; + let write_size = core::mem::size_of::(); + debug_assert!(max_len >= write_size); + write_val_to_user(dest, &sock_addr_unix)?; + Ok(write_size) + } + SocketAddr::IPv4(addr, port) => { + let in_addr = InetAddr::from(*addr); + let sock_addr_in = SockAddrInet::new(*port, in_addr); + let write_size = core::mem::size_of::(); + debug_assert!(max_len >= write_size); + write_val_to_user(dest, &sock_addr_in)?; + Ok(write_size) + } + SocketAddr::IPv6 => todo!(), + } +} + +#[derive(Debug, Clone, Copy, Pod)] +#[repr(C)] +/// PlaceHolder +pub struct SockAddr { + sa_family: u16, // SaFamily + sa_data: [u8; 14], +} + +impl SockAddr { + pub fn sa_family(&self) -> Result { + Ok(SaFamily::try_from(self.sa_family as i32)?) + } +} + +const SOCK_ADDR_UNIX_LEN: usize = 108; + +#[repr(C)] +#[derive(Debug, Clone, Copy, Pod)] +pub struct SockAddrUnix { + sun_family: u16, // Always SaFamily::AF_UNIX + sun_path: [u8; SOCK_ADDR_UNIX_LEN], +} + +#[repr(C)] +#[derive(Debug, Clone, Copy, Pod)] +/// IPv4 4-byte address +pub struct InetAddr { + s_addr: [u8; 4], +} + +impl InetAddr { + pub fn as_bytes(&self) -> &[u8] { + &self.s_addr + } + + pub fn from_bytes(bytes: &[u8]) -> Self { + debug_assert!(bytes.len() == 4); + let mut s_addr = [0u8; 4]; + s_addr.copy_from_slice(bytes); + Self { s_addr } + } +} + +#[derive(Debug, Clone, Copy, Pod)] +#[repr(C)] +pub struct PortNum { + port: [u8; 2], +} + +impl PortNum { + pub fn as_u16(&self) -> u16 { + u16::from_be_bytes(self.port) + } + + pub fn from_u16(value: u16) -> Self { + let bytes = value.to_be_bytes(); + Self { port: bytes } + } +} + +#[repr(C)] +#[derive(Debug, Clone, Copy, Pod)] +/// IPv4 socket address +pub struct SockAddrInet { + /// always SaFamily::AF_INET + sin_family: u16, + /// Port number + sin_port_t: PortNum, + /// IPv4 address + sin_addr: InetAddr, + /// Pad to size of 'SockAddr' structure (16 bytes) + _pad: [u8; 8], +} + +impl SockAddrInet { + pub fn new(port: u16, addr: InetAddr) -> Self { + let port = PortNum::from_u16(port); + Self { + sin_family: SaFamily::AF_INET as _, + sin_port_t: port, + sin_addr: addr, + _pad: [0u8; 8], + } + } +} + +#[repr(C)] +#[derive(Debug, Clone, Copy, Pod)] +/// IPv6 address +pub struct Inet6Addr { + s6_addr: [u8; 16], +} + +impl Inet6Addr { + pub fn as_bytes(&self) -> &[u8] { + &self.s6_addr + } +} + +/// IPv6 socket address +#[repr(C)] +#[derive(Debug, Clone, Copy, Pod)] +pub struct SockAddrInet6 { + /// always SaFamily::AF_INET6 + sin6_family: u16, + /// Port number + sin6_port: PortNum, + /// IPv6 flow information + sin6_flowinfo: u32, + /// IPv6 address + sin6_addr: Inet6Addr, + // Scope ID + sin6_scope_id: u32, +} + +/// Address family. The definition is from https://elixir.bootlin.com/linux/v6.0.9/source/include/linux/socket.h. +#[repr(i32)] +#[derive(Debug, Clone, Copy, TryFromInt, PartialEq, Eq)] +#[allow(non_camel_case_types)] +pub enum SaFamily { + AF_UNSPEC = 0, + AF_UNIX = 1, /* Unix domain sockets */ + //AF_LOCAL 1 /* POSIX name for AF_UNIX */ + AF_INET = 2, /* Internet IP Protocol */ + AF_AX25 = 3, /* Amateur Radio AX.25 */ + AF_IPX = 4, /* Novell IPX */ + AF_APPLETALK = 5, /* AppleTalk DDP */ + AF_NETROM = 6, /* Amateur Radio NET/ROM */ + AF_BRIDGE = 7, /* Multiprotocol bridge */ + AF_ATMPVC = 8, /* ATM PVCs */ + AF_X25 = 9, /* Reserved for X.25 project */ + AF_INET6 = 10, /* IP version 6 */ + AF_ROSE = 11, /* Amateur Radio X.25 PLP */ + AF_DECnet = 12, /* Reserved for DECnet project */ + AF_NETBEUI = 13, /* Reserved for 802.2LLC project*/ + AF_SECURITY = 14, /* Security callback pseudo AF */ + AF_KEY = 15, /* PF_KEY key management API */ + AF_NETLINK = 16, + //AF_ROUTE = AF_NETLINK /* Alias to emulate 4.4BSD */ + AF_PACKET = 17, /* Packet family */ + AF_ASH = 18, /* Ash */ + AF_ECONET = 19, /* Acorn Econet */ + AF_ATMSVC = 20, /* ATM SVCs */ + AF_RDS = 21, /* RDS sockets */ + AF_SNA = 22, /* Linux SNA Project (nutters!) */ + AF_IRDA = 23, /* IRDA sockets */ + AF_PPPOX = 24, /* PPPoX sockets */ + AF_WANPIPE = 25, /* Wanpipe API Sockets */ + AF_LLC = 26, /* Linux LLC */ + AF_IB = 27, /* Native InfiniBand address */ + AF_MPLS = 28, /* MPLS */ + AF_CAN = 29, /* Controller Area Network */ + AF_TIPC = 30, /* TIPC sockets */ + AF_BLUETOOTH = 31, /* Bluetooth sockets */ + AF_IUCV = 32, /* IUCV sockets */ + AF_RXRPC = 33, /* RxRPC sockets */ + AF_ISDN = 34, /* mISDN sockets */ + AF_PHONET = 35, /* Phonet sockets */ + AF_IEEE802154 = 36, /* IEEE802154 sockets */ + AF_CAIF = 37, /* CAIF sockets */ + AF_ALG = 38, /* Algorithm sockets */ + AF_NFC = 39, /* NFC sockets */ + AF_VSOCK = 40, /* vSockets */ + AF_KCM = 41, /* Kernel Connection Multiplexor*/ + AF_QIPCRTR = 42, /* Qualcomm IPC Router */ + AF_SMC = 43, /* smc sockets: reserve number for + * PF_SMC protocol family that + * reuses AF_INET address family + */ + AF_XDP = 44, /* XDP sockets */ + AF_MCTP = 45, /* Management component + * transport protocol + */ + AF_MAX = 46, /* For now.. */ +} + +impl From for Ipv4Address { + fn from(value: InetAddr) -> Self { + let addr = value.as_bytes(); + Ipv4Address::from_bytes(addr) + } +} + +impl From for InetAddr { + fn from(value: Ipv4Address) -> Self { + let bytes = value.as_bytes(); + InetAddr::from_bytes(bytes) + } +} + +impl From for SocketAddr { + fn from(value: SockAddrInet) -> Self { + let port = value.sin_port_t.as_u16(); + let addr = Ipv4Address::from(value.sin_addr); + SocketAddr::IPv4(addr, port) + } +} + +impl TryFrom<&str> for SockAddrUnix { + type Error = Error; + + fn try_from(value: &str) -> Result { + let mut sun_path = [0u8; SOCK_ADDR_UNIX_LEN]; + let bytes = value.as_bytes(); + let copy_len = bytes.len().min(SOCK_ADDR_UNIX_LEN - 1); + sun_path[..copy_len].copy_from_slice(&bytes[..copy_len]); + Ok(SockAddrUnix { + sun_family: SaFamily::AF_UNIX as u16, + sun_path, + }) + } +} diff --git a/services/libs/jinux-std/src/util/net/mod.rs b/services/libs/jinux-std/src/util/net/mod.rs new file mode 100644 index 000000000..3abaf11bd --- /dev/null +++ b/services/libs/jinux-std/src/util/net/mod.rs @@ -0,0 +1,22 @@ +mod addr; +mod socket; + +pub use addr::{ + read_socket_addr_from_user, write_socket_addr_to_user, InetAddr, SaFamily, SockAddr, + SockAddrInet, SockAddrInet6, SockAddrUnix, +}; +pub use socket::{Protocol, SockFlags, SockType, SOCK_TYPE_MASK}; + +#[macro_export] +macro_rules! get_socket_without_holding_filetable_lock { + ($name:tt, $current: expr, $sockfd: expr) => { + let file_like = { + let file_table = $current.file_table().lock(); + file_table.get_file($sockfd)?.clone() + // Drop filetable here to avoid locking + }; + let $name = file_like + .as_socket() + .ok_or_else(|| Error::with_message(Errno::ENOTSOCK, "the file is not socket"))?; + }; +} diff --git a/services/libs/jinux-std/src/util/net/socket.rs b/services/libs/jinux-std/src/util/net/socket.rs new file mode 100644 index 000000000..47f963be0 --- /dev/null +++ b/services/libs/jinux-std/src/util/net/socket.rs @@ -0,0 +1,68 @@ +use crate::prelude::*; + +#[repr(i32)] +#[derive(Debug, Clone, Copy, TryFromInt)] +#[allow(non_camel_case_types)] +/// Standard well-defined IP protocols. +/// From https://elixir.bootlin.com/linux/v6.0.9/source/include/uapi/linux/in.h. +pub enum Protocol { + IPPROTO_IP = 0, /* Dummy protocol for TCP */ + IPPROTO_ICMP = 1, /* Internet Control Message Protocol */ + IPPROTO_IGMP = 2, /* Internet Group Management Protocol */ + IPPROTO_TCP = 6, /* Transmission Control Protocol */ + IPPROTO_EGP = 8, /* Exterior Gateway Protocol */ + IPPROTO_PUP = 12, /* PUP protocol */ + IPPROTO_UDP = 17, /* User Datagram Protocol */ + IPPROTO_IDP = 22, /* XNS IDP protocol */ + IPPROTO_TP = 29, /* SO Transport Protocol Class 4 */ + IPPROTO_DCCP = 33, /* Datagram Congestion Control Protocol */ + IPPROTO_IPV6 = 41, /* IPv6-in-IPv4 tunnelling */ + IPPROTO_RSVP = 46, /* RSVP Protocol */ + IPPROTO_GRE = 47, /* Cisco GRE tunnels (rfc 1701,1702) */ + IPPROTO_ESP = 50, /* Encapsulation Security Payload protocol */ + IPPROTO_AH = 51, /* Authentication Header protocol */ + IPPROTO_MTP = 92, /* Multicast Transport Protocol */ + IPPROTO_BEETPH = 94, /* IP option pseudo header for BEET */ + IPPROTO_ENCAP = 98, /* Encapsulation Header */ + IPPROTO_PIM = 103, /* Protocol Independent Multicast */ + IPPROTO_COMP = 108, /* Compression Header Protocol */ + IPPROTO_SCTP = 132, /* Stream Control Transport Protocol */ + IPPROTO_UDPLITE = 136, /* UDP-Lite (RFC 3828) */ + IPPROTO_MPLS = 137, /* MPLS in IP (RFC 4023) */ + IPPROTO_ETHERNET = 143, /* Ethernet-within-IPv6 Encapsulation */ + IPPROTO_RAW = 255, /* Raw IP packets */ + IPPROTO_MPTCP = 262, /* Multipath TCP connection */ +} + +#[repr(i32)] +#[allow(non_camel_case_types)] +#[derive(Debug, Clone, Copy, TryFromInt)] +/// Socket types. +/// From https://elixir.bootlin.com/linux/v6.0.9/source/include/linux/net.h +pub enum SockType { + /// Stream socket + SOCK_STREAM = 1, + /// Datagram socket + SOCK_DGRAM = 2, + /// Raw socket + SOCK_RAW = 3, + /// Reliably-delivered message + SOCK_RDM = 4, + /// Sequential packet socket + SOCK_SEQPACKET = 5, + /// Datagram Congestion Control Protocol socket + SOCK_DCCP = 6, + /// Linux specific way of getting packets at the dev level + SOCK_PACKET = 10, +} + +pub const SOCK_TYPE_MASK: i32 = 0xf; + +bitflags! { + #[repr(C)] + #[derive(Pod)] + pub struct SockFlags: i32 { + const SOCK_NONBLOCK = 1 << 11; + const SOCK_CLOEXEC = 1 << 19; + } +}