net: add split() to tcpsocket
This commit is contained in:
		
							parent
							
								
									240bef8c9f
								
							
						
					
					
						commit
						0b2f43c391
					
				| @ -2,7 +2,7 @@ use core::future::Future; | ||||
| use core::task::Poll; | ||||
| use futures::future::poll_fn; | ||||
| 
 | ||||
| use super::{Error, TcpSocket}; | ||||
| use super::{with_socket, Error, TcpReader, TcpSocket, TcpWriter}; | ||||
| 
 | ||||
| impl<'d> embedded_io::asynch::Read for TcpSocket<'d> { | ||||
|     type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||||
| @ -13,7 +13,7 @@ impl<'d> embedded_io::asynch::Read for TcpSocket<'d> { | ||||
|         poll_fn(move |cx| { | ||||
|             // CAUTION: smoltcp semantics around EOF are different to what you'd expect
 | ||||
|             // from posix-like IO, so we have to tweak things here.
 | ||||
|             self.with(|s, _| match s.recv_slice(buf) { | ||||
|             with_socket(self.handle, |s, _| match s.recv_slice(buf) { | ||||
|                 // No data ready
 | ||||
|                 Ok(0) => { | ||||
|                     s.register_recv_waker(cx.waker()); | ||||
| @ -39,7 +39,69 @@ impl<'d> embedded_io::asynch::Write for TcpSocket<'d> { | ||||
| 
 | ||||
|     fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||||
|         poll_fn(move |cx| { | ||||
|             self.with(|s, _| match s.send_slice(buf) { | ||||
|             with_socket(self.handle, |s, _| match s.send_slice(buf) { | ||||
|                 // Not ready to send (no space in the tx buffer)
 | ||||
|                 Ok(0) => { | ||||
|                     s.register_send_waker(cx.waker()); | ||||
|                     Poll::Pending | ||||
|                 } | ||||
|                 // Some data sent
 | ||||
|                 Ok(n) => Poll::Ready(Ok(n)), | ||||
|                 // Connection reset. TODO: this can also be timeouts etc, investigate.
 | ||||
|                 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)), | ||||
|                 // smoltcp returns no errors other than the above.
 | ||||
|                 Err(_) => unreachable!(), | ||||
|             }) | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>> | ||||
|     where | ||||
|         Self: 'a; | ||||
| 
 | ||||
|     fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> { | ||||
|         poll_fn(move |_| { | ||||
|             Poll::Ready(Ok(())) // TODO: Is there a better implementation for this?
 | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<'d> embedded_io::asynch::Read for TcpReader<'d> { | ||||
|     type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||||
|     where | ||||
|         Self: 'a; | ||||
| 
 | ||||
|     fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> { | ||||
|         poll_fn(move |cx| { | ||||
|             // CAUTION: smoltcp semantics around EOF are different to what you'd expect
 | ||||
|             // from posix-like IO, so we have to tweak things here.
 | ||||
|             with_socket(self.handle, |s, _| match s.recv_slice(buf) { | ||||
|                 // No data ready
 | ||||
|                 Ok(0) => { | ||||
|                     s.register_recv_waker(cx.waker()); | ||||
|                     Poll::Pending | ||||
|                 } | ||||
|                 // Data ready!
 | ||||
|                 Ok(n) => Poll::Ready(Ok(n)), | ||||
|                 // EOF
 | ||||
|                 Err(smoltcp::Error::Finished) => Poll::Ready(Ok(0)), | ||||
|                 // Connection reset. TODO: this can also be timeouts etc, investigate.
 | ||||
|                 Err(smoltcp::Error::Illegal) => Poll::Ready(Err(Error::ConnectionReset)), | ||||
|                 // smoltcp returns no errors other than the above.
 | ||||
|                 Err(_) => unreachable!(), | ||||
|             }) | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<'d> embedded_io::asynch::Write for TcpWriter<'d> { | ||||
|     type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>> | ||||
|     where | ||||
|         Self: 'a; | ||||
| 
 | ||||
|     fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> { | ||||
|         poll_fn(move |cx| { | ||||
|             with_socket(self.handle, |s, _| match s.send_slice(buf) { | ||||
|                 // Not ready to send (no space in the tx buffer)
 | ||||
|                 Ok(0) => { | ||||
|                     s.register_send_waker(cx.waker()); | ||||
|  | ||||
| @ -49,6 +49,20 @@ pub struct TcpSocket<'a> { | ||||
| 
 | ||||
| impl<'a> Unpin for TcpSocket<'a> {} | ||||
| 
 | ||||
| pub struct TcpReader<'a> { | ||||
|     handle: SocketHandle, | ||||
|     ghost: PhantomData<&'a mut [u8]>, | ||||
| } | ||||
| 
 | ||||
| impl<'a> Unpin for TcpReader<'a> {} | ||||
| 
 | ||||
| pub struct TcpWriter<'a> { | ||||
|     handle: SocketHandle, | ||||
|     ghost: PhantomData<&'a mut [u8]>, | ||||
| } | ||||
| 
 | ||||
| impl<'a> Unpin for TcpWriter<'a> {} | ||||
| 
 | ||||
| impl<'a> TcpSocket<'a> { | ||||
|     pub fn new(rx_buffer: &'a mut [u8], tx_buffer: &'a mut [u8]) -> Self { | ||||
|         let handle = Stack::with(|stack| { | ||||
| @ -66,12 +80,27 @@ impl<'a> TcpSocket<'a> { | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub fn split(&mut self) -> (TcpReader<'_>, TcpWriter<'_>) { | ||||
|         ( | ||||
|             TcpReader { | ||||
|                 handle: self.handle, | ||||
|                 ghost: PhantomData, | ||||
|             }, | ||||
|             TcpWriter { | ||||
|                 handle: self.handle, | ||||
|                 ghost: PhantomData, | ||||
|             }, | ||||
|         ) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn connect<T>(&mut self, remote_endpoint: T) -> Result<(), ConnectError> | ||||
|     where | ||||
|         T: Into<IpEndpoint>, | ||||
|     { | ||||
|         let local_port = Stack::with(|stack| stack.get_local_port()); | ||||
|         match self.with(|s, cx| s.connect(cx, remote_endpoint, local_port)) { | ||||
|         match with_socket(self.handle, |s, cx| { | ||||
|             s.connect(cx, remote_endpoint, local_port) | ||||
|         }) { | ||||
|             Ok(()) => {} | ||||
|             Err(smoltcp::Error::Illegal) => return Err(ConnectError::InvalidState), | ||||
|             Err(smoltcp::Error::Unaddressable) => return Err(ConnectError::NoRoute), | ||||
| @ -80,7 +109,7 @@ impl<'a> TcpSocket<'a> { | ||||
|         } | ||||
| 
 | ||||
|         futures::future::poll_fn(|cx| { | ||||
|             self.with(|s, _| match s.state() { | ||||
|             with_socket(self.handle, |s, _| match s.state() { | ||||
|                 TcpState::Closed | TcpState::TimeWait => { | ||||
|                     Poll::Ready(Err(ConnectError::ConnectionReset)) | ||||
|                 } | ||||
| @ -99,7 +128,7 @@ impl<'a> TcpSocket<'a> { | ||||
|     where | ||||
|         T: Into<IpEndpoint>, | ||||
|     { | ||||
|         match self.with(|s, _| s.listen(local_endpoint)) { | ||||
|         match with_socket(self.handle, |s, _| s.listen(local_endpoint)) { | ||||
|             Ok(()) => {} | ||||
|             Err(smoltcp::Error::Illegal) => return Err(AcceptError::InvalidState), | ||||
|             Err(smoltcp::Error::Unaddressable) => return Err(AcceptError::InvalidPort), | ||||
| @ -108,7 +137,7 @@ impl<'a> TcpSocket<'a> { | ||||
|         } | ||||
| 
 | ||||
|         futures::future::poll_fn(|cx| { | ||||
|             self.with(|s, _| match s.state() { | ||||
|             with_socket(self.handle, |s, _| match s.state() { | ||||
|                 TcpState::Listen | TcpState::SynSent | TcpState::SynReceived => { | ||||
|                     s.register_send_waker(cx.waker()); | ||||
|                     Poll::Pending | ||||
| @ -120,57 +149,58 @@ impl<'a> TcpSocket<'a> { | ||||
|     } | ||||
| 
 | ||||
|     pub fn set_timeout(&mut self, duration: Option<Duration>) { | ||||
|         self.with(|s, _| s.set_timeout(duration)) | ||||
|         with_socket(self.handle, |s, _| s.set_timeout(duration)) | ||||
|     } | ||||
| 
 | ||||
|     pub fn set_keep_alive(&mut self, interval: Option<Duration>) { | ||||
|         self.with(|s, _| s.set_keep_alive(interval)) | ||||
|         with_socket(self.handle, |s, _| s.set_keep_alive(interval)) | ||||
|     } | ||||
| 
 | ||||
|     pub fn set_hop_limit(&mut self, hop_limit: Option<u8>) { | ||||
|         self.with(|s, _| s.set_hop_limit(hop_limit)) | ||||
|         with_socket(self.handle, |s, _| s.set_hop_limit(hop_limit)) | ||||
|     } | ||||
| 
 | ||||
|     pub fn local_endpoint(&self) -> IpEndpoint { | ||||
|         self.with(|s, _| s.local_endpoint()) | ||||
|         with_socket(self.handle, |s, _| s.local_endpoint()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn remote_endpoint(&self) -> IpEndpoint { | ||||
|         self.with(|s, _| s.remote_endpoint()) | ||||
|         with_socket(self.handle, |s, _| s.remote_endpoint()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn state(&self) -> TcpState { | ||||
|         self.with(|s, _| s.state()) | ||||
|         with_socket(self.handle, |s, _| s.state()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn close(&mut self) { | ||||
|         self.with(|s, _| s.close()) | ||||
|         with_socket(self.handle, |s, _| s.close()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn abort(&mut self) { | ||||
|         self.with(|s, _| s.abort()) | ||||
|         with_socket(self.handle, |s, _| s.abort()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn may_send(&self) -> bool { | ||||
|         self.with(|s, _| s.may_send()) | ||||
|         with_socket(self.handle, |s, _| s.may_send()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn may_recv(&self) -> bool { | ||||
|         self.with(|s, _| s.may_recv()) | ||||
|         with_socket(self.handle, |s, _| s.may_recv()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|     fn with<R>(&self, f: impl FnOnce(&mut SyncTcpSocket, &mut SmolContext) -> R) -> R { | ||||
|         Stack::with(|stack| { | ||||
|             let res = { | ||||
|                 let (s, cx) = stack | ||||
|                     .iface | ||||
|                     .get_socket_and_context::<SyncTcpSocket>(self.handle); | ||||
|                 f(s, cx) | ||||
|             }; | ||||
|             stack.wake(); | ||||
|             res | ||||
|         }) | ||||
|     } | ||||
| fn with_socket<R>( | ||||
|     handle: SocketHandle, | ||||
|     f: impl FnOnce(&mut SyncTcpSocket, &mut SmolContext) -> R, | ||||
| ) -> R { | ||||
|     Stack::with(|stack| { | ||||
|         let res = { | ||||
|             let (s, cx) = stack.iface.get_socket_and_context::<SyncTcpSocket>(handle); | ||||
|             f(s, cx) | ||||
|         }; | ||||
|         stack.wake(); | ||||
|         res | ||||
|     }) | ||||
| } | ||||
| 
 | ||||
| impl<'a> Drop for TcpSocket<'a> { | ||||
| @ -190,3 +220,11 @@ impl embedded_io::Error for Error { | ||||
| impl<'d> embedded_io::Io for TcpSocket<'d> { | ||||
|     type Error = Error; | ||||
| } | ||||
| 
 | ||||
| impl<'d> embedded_io::Io for TcpReader<'d> { | ||||
|     type Error = Error; | ||||
| } | ||||
| 
 | ||||
| impl<'d> embedded_io::Io for TcpWriter<'d> { | ||||
|     type Error = Error; | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user