feat: add dynamic dispatch variants of pipe
This commit is contained in:
parent
92326f10b5
commit
c06862eeaf
@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
|
|||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## Unreleased
|
||||||
|
|
||||||
|
- Add dynamic dispatch variant of `Pipe`.
|
||||||
|
|
||||||
## 0.6.1 - 2024-11-22
|
## 0.6.1 - 2024-11-22
|
||||||
|
|
||||||
- Add `LazyLock` sync primitive.
|
- Add `LazyLock` sync primitive.
|
||||||
|
|||||||
@ -532,6 +532,250 @@ impl<M: RawMutex, const N: usize> embedded_io_async::Write for Writer<'_, M, N>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Type-erased variants
|
||||||
|
//
|
||||||
|
|
||||||
|
pub(crate) trait DynamicPipe {
|
||||||
|
fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a>;
|
||||||
|
fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a>;
|
||||||
|
|
||||||
|
fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError>;
|
||||||
|
fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError>;
|
||||||
|
|
||||||
|
fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError>;
|
||||||
|
fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError>;
|
||||||
|
|
||||||
|
fn consume(&self, amt: usize);
|
||||||
|
unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M, const N: usize> DynamicPipe for Pipe<M, N>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn consume(&self, amt: usize) {
|
||||||
|
Pipe::consume(self, amt)
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> {
|
||||||
|
Pipe::try_fill_buf_with_context(self, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
|
||||||
|
Pipe::write(self, buf).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
|
||||||
|
Pipe::read(self, buf).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
Pipe::try_read(self, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
Pipe::try_write(self, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
Pipe::try_write_with_context(self, cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
Pipe::try_read_with_context(self, cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write-only access to a [`DynamicPipe`].
|
||||||
|
pub struct DynamicWriter<'p> {
|
||||||
|
pipe: &'p dyn DynamicPipe,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Clone for DynamicWriter<'p> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
*self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Copy for DynamicWriter<'p> {}
|
||||||
|
|
||||||
|
impl<'p> DynamicWriter<'p> {
|
||||||
|
/// Write some bytes to the pipe.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::write()`]
|
||||||
|
pub fn write<'a>(&'a self, buf: &'a [u8]) -> DynamicWriteFuture<'a> {
|
||||||
|
self.pipe.write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately write some bytes to the pipe.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::try_write()`]
|
||||||
|
pub fn try_write(&self, buf: &[u8]) -> Result<usize, TryWriteError> {
|
||||||
|
self.pipe.try_write(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> From<Writer<'p, M, N>> for DynamicWriter<'p>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(value: Writer<'p, M, N>) -> Self {
|
||||||
|
Self { pipe: value.pipe }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future returned by [`DynamicWriter::write`].
|
||||||
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||||
|
pub struct DynamicWriteFuture<'p> {
|
||||||
|
pipe: &'p dyn DynamicPipe,
|
||||||
|
buf: &'p [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Future for DynamicWriteFuture<'p> {
|
||||||
|
type Output = usize;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match self.pipe.try_write_with_context(Some(cx), self.buf) {
|
||||||
|
Ok(n) => Poll::Ready(n),
|
||||||
|
Err(TryWriteError::Full) => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Unpin for DynamicWriteFuture<'p> {}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> From<WriteFuture<'p, M, N>> for DynamicWriteFuture<'p>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(value: WriteFuture<'p, M, N>) -> Self {
|
||||||
|
Self {
|
||||||
|
pipe: value.pipe,
|
||||||
|
buf: value.buf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read-only access to a [`DynamicPipe`].
|
||||||
|
pub struct DynamicReader<'p> {
|
||||||
|
pipe: &'p dyn DynamicPipe,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> DynamicReader<'p> {
|
||||||
|
/// Read some bytes from the pipe.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::read()`]
|
||||||
|
pub fn read<'a>(&'a self, buf: &'a mut [u8]) -> DynamicReadFuture<'a> {
|
||||||
|
self.pipe.read(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempt to immediately read some bytes from the pipe.
|
||||||
|
///
|
||||||
|
/// See [`Pipe::try_read()`]
|
||||||
|
pub fn try_read(&self, buf: &mut [u8]) -> Result<usize, TryReadError> {
|
||||||
|
self.pipe.try_read(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
|
||||||
|
///
|
||||||
|
/// If no bytes are currently available to read, this function waits until at least one byte is available.
|
||||||
|
///
|
||||||
|
/// If the reader is at end-of-file (EOF), an empty slice is returned.
|
||||||
|
pub fn fill_buf(&mut self) -> DynamicFillBufFuture<'_> {
|
||||||
|
DynamicFillBufFuture { pipe: Some(self.pipe) }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try returning contents of the internal buffer.
|
||||||
|
///
|
||||||
|
/// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`.
|
||||||
|
///
|
||||||
|
/// If the reader is at end-of-file (EOF), an empty slice is returned.
|
||||||
|
pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> {
|
||||||
|
unsafe { self.pipe.try_fill_buf_with_context(None) }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
|
||||||
|
pub fn consume(&mut self, amt: usize) {
|
||||||
|
self.pipe.consume(amt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> From<Reader<'p, M, N>> for DynamicReader<'p>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(value: Reader<'p, M, N>) -> Self {
|
||||||
|
Self { pipe: value.pipe }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future returned by [`Pipe::read`] and [`Reader::read`].
|
||||||
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||||
|
pub struct DynamicReadFuture<'p> {
|
||||||
|
pipe: &'p dyn DynamicPipe,
|
||||||
|
buf: &'p mut [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Future for DynamicReadFuture<'p> {
|
||||||
|
type Output = usize;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match self.pipe.try_read_with_context(Some(cx), self.buf) {
|
||||||
|
Ok(n) => Poll::Ready(n),
|
||||||
|
Err(TryReadError::Empty) => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Unpin for DynamicReadFuture<'p> {}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> From<ReadFuture<'p, M, N>> for DynamicReadFuture<'p>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(value: ReadFuture<'p, M, N>) -> Self {
|
||||||
|
Self {
|
||||||
|
pipe: value.pipe,
|
||||||
|
buf: value.buf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future returned by [`DynamicPipe::fill_buf`] and [`DynamicReader::fill_buf`].
|
||||||
|
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||||
|
pub struct DynamicFillBufFuture<'p> {
|
||||||
|
pipe: Option<&'p dyn DynamicPipe>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Future for DynamicFillBufFuture<'p> {
|
||||||
|
type Output = &'p [u8];
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let pipe = self.pipe.take().unwrap();
|
||||||
|
match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } {
|
||||||
|
Ok(buf) => Poll::Ready(buf),
|
||||||
|
Err(TryReadError::Empty) => {
|
||||||
|
self.pipe = Some(pipe);
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> Unpin for DynamicFillBufFuture<'p> {}
|
||||||
|
|
||||||
|
impl<'p, M, const N: usize> From<FillBufFuture<'p, M, N>> for DynamicFillBufFuture<'p>
|
||||||
|
where
|
||||||
|
M: RawMutex,
|
||||||
|
{
|
||||||
|
fn from(value: FillBufFuture<'p, M, N>) -> Self {
|
||||||
|
Self {
|
||||||
|
pipe: value.pipe.map(|p| p as &dyn DynamicPipe),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use futures_executor::ThreadPool;
|
use futures_executor::ThreadPool;
|
||||||
@ -619,6 +863,35 @@ mod tests {
|
|||||||
let _ = w.clone();
|
let _ = w.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dynamic_dispatch_pipe() {
|
||||||
|
let mut c = Pipe::<NoopRawMutex, 3>::new();
|
||||||
|
let (r, w) = c.split();
|
||||||
|
let (mut r, w): (DynamicReader<'_>, DynamicWriter<'_>) = (r.into(), w.into());
|
||||||
|
|
||||||
|
assert!(w.try_write(&[42, 43]).is_ok());
|
||||||
|
let buf = r.try_fill_buf().unwrap();
|
||||||
|
assert_eq!(buf, &[42, 43]);
|
||||||
|
let buf = r.try_fill_buf().unwrap();
|
||||||
|
assert_eq!(buf, &[42, 43]);
|
||||||
|
r.consume(1);
|
||||||
|
let buf = r.try_fill_buf().unwrap();
|
||||||
|
assert_eq!(buf, &[43]);
|
||||||
|
r.consume(1);
|
||||||
|
assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty));
|
||||||
|
assert_eq!(w.try_write(&[44, 45, 46]), Ok(1));
|
||||||
|
assert_eq!(w.try_write(&[45, 46]), Ok(2));
|
||||||
|
let buf = r.try_fill_buf().unwrap();
|
||||||
|
assert_eq!(buf, &[44]); // only one byte due to wraparound.
|
||||||
|
r.consume(1);
|
||||||
|
let buf = r.try_fill_buf().unwrap();
|
||||||
|
assert_eq!(buf, &[45, 46]);
|
||||||
|
assert!(w.try_write(&[47]).is_ok());
|
||||||
|
let buf = r.try_fill_buf().unwrap();
|
||||||
|
assert_eq!(buf, &[45, 46, 47]);
|
||||||
|
r.consume(3);
|
||||||
|
}
|
||||||
|
|
||||||
#[futures_test::test]
|
#[futures_test::test]
|
||||||
async fn receiver_receives_given_try_write_async() {
|
async fn receiver_receives_given_try_write_async() {
|
||||||
let executor = ThreadPool::new().unwrap();
|
let executor = ThreadPool::new().unwrap();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user