nrf/buffered_uart: refactor so rx/tx halves are independent.

This commit is contained in:
Dario Nieuwenhuis 2024-02-21 21:51:43 +01:00
parent c2e429205d
commit 1f17fdf84e
4 changed files with 269 additions and 231 deletions

View File

@ -1,6 +1,6 @@
//! Atomic reusable ringbuffer.
use core::slice;
use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use core::{ptr, slice};
/// Atomic reusable ringbuffer
///
@ -73,6 +73,7 @@ impl RingBuffer {
pub unsafe fn deinit(&self) {
// Ordering: it's OK to use `Relaxed` because this is not called
// concurrently with other methods.
self.buf.store(ptr::null_mut(), Ordering::Relaxed);
self.len.store(0, Ordering::Relaxed);
self.start.store(0, Ordering::Relaxed);
self.end.store(0, Ordering::Relaxed);
@ -82,20 +83,46 @@ impl RingBuffer {
///
/// # Safety
///
/// Only one reader can exist at a time.
/// - Only one reader can exist at a time.
/// - Ringbuffer must be initialized.
pub unsafe fn reader(&self) -> Reader<'_> {
Reader(self)
}
/// Try creating a reader, fails if not initialized.
///
/// # Safety
///
/// Only one reader can exist at a time.
pub unsafe fn try_reader(&self) -> Option<Reader<'_>> {
if self.buf.load(Ordering::Relaxed).is_null() {
return None;
}
Some(Reader(self))
}
/// Create a writer.
///
/// # Safety
///
/// Only one writer can exist at a time.
/// - Only one writer can exist at a time.
/// - Ringbuffer must be initialized.
pub unsafe fn writer(&self) -> Writer<'_> {
Writer(self)
}
/// Try creating a writer, fails if not initialized.
///
/// # Safety
///
/// Only one writer can exist at a time.
pub unsafe fn try_writer(&self) -> Option<Writer<'_>> {
if self.buf.load(Ordering::Relaxed).is_null() {
return None;
}
Some(Writer(self))
}
/// Return length of buffer.
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)

View File

@ -22,13 +22,13 @@ use embassy_sync::waitqueue::AtomicWaker;
pub use pac::uarte0::{baudrate::BAUDRATE_A as Baudrate, config::PARITY_A as Parity};
use crate::gpio::sealed::Pin;
use crate::gpio::{self, AnyPin, Pin as GpioPin, PselBits};
use crate::gpio::{AnyPin, Pin as GpioPin, PselBits};
use crate::interrupt::typelevel::Interrupt;
use crate::ppi::{
self, AnyConfigurableChannel, AnyGroup, Channel, ConfigurableChannel, Event, Group, Ppi, PpiGroup, Task,
};
use crate::timer::{Instance as TimerInstance, Timer};
use crate::uarte::{apply_workaround_for_enable_anomaly, Config, Instance as UarteInstance};
use crate::uarte::{apply_workaround_for_enable_anomaly, drop_tx_rx, Config, Instance as UarteInstance};
use crate::{interrupt, pac, Peripheral};
mod sealed {
@ -86,10 +86,9 @@ impl<U: UarteInstance> interrupt::typelevel::Handler<U::Interrupt> for Interrupt
let r = U::regs();
let s = U::buffered_state();
if let Some(mut rx) = unsafe { s.rx_buf.try_writer() } {
let buf_len = s.rx_buf.len();
let half_len = buf_len / 2;
let mut tx = unsafe { s.tx_buf.reader() };
let mut rx = unsafe { s.rx_buf.writer() };
if r.events_error.read().bits() != 0 {
r.events_error.reset();
@ -179,9 +178,11 @@ impl<U: UarteInstance> interrupt::typelevel::Handler<U::Interrupt> for Interrupt
r.intenclr.write(|w| w.rxstarted().clear());
}
}
}
// =============================
if let Some(mut tx) = unsafe { s.tx_buf.try_reader() } {
// TX end
if r.events_endtx.read().bits() != 0 {
r.events_endtx.reset();
@ -208,6 +209,7 @@ impl<U: UarteInstance> interrupt::typelevel::Handler<U::Interrupt> for Interrupt
r.tasks_starttx.write(|w| unsafe { w.bits(1) });
}
}
}
//trace!("irq: end");
}
@ -215,11 +217,8 @@ impl<U: UarteInstance> interrupt::typelevel::Handler<U::Interrupt> for Interrupt
/// Buffered UARTE driver.
pub struct BufferedUarte<'d, U: UarteInstance, T: TimerInstance> {
_peri: PeripheralRef<'d, U>,
timer: Timer<'d, T>,
_ppi_ch1: Ppi<'d, AnyConfigurableChannel, 1, 1>,
_ppi_ch2: Ppi<'d, AnyConfigurableChannel, 1, 2>,
_ppi_group: PpiGroup<'d, AnyGroup>,
tx: BufferedUarteTx<'d, U>,
rx: BufferedUarteRx<'d, U, T>,
}
impl<'d, U: UarteInstance, T: TimerInstance> Unpin for BufferedUarte<'d, U, T> {}
@ -404,19 +403,23 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
U::Interrupt::pend();
unsafe { U::Interrupt::enable() };
let s = U::state();
s.tx_rx_refcount.store(2, Ordering::Relaxed);
Self {
tx: BufferedUarteTx {
_peri: unsafe { peri.clone_unchecked() },
},
rx: BufferedUarteRx {
_peri: peri,
timer,
_ppi_ch1: ppi_ch1,
_ppi_ch2: ppi_ch2,
_ppi_group: ppi_group,
},
}
}
fn pend_irq() {
U::Interrupt::pend()
}
/// Adjust the baud rate to the provided value.
pub fn set_baudrate(&mut self, baudrate: Baudrate) {
let r = U::regs();
@ -426,19 +429,52 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
/// Split the UART in reader and writer parts.
///
/// This allows reading and writing concurrently from independent tasks.
pub fn split(&mut self) -> (BufferedUarteRx<'_, U, T>, BufferedUarteTx<'_, U, T>) {
(BufferedUarteRx { inner: self }, BufferedUarteTx { inner: self })
pub fn split(self) -> (BufferedUarteRx<'d, U, T>, BufferedUarteTx<'d, U>) {
(self.rx, self.tx)
}
async fn inner_read(&self, buf: &mut [u8]) -> Result<usize, Error> {
let data = self.inner_fill_buf().await?;
let n = data.len().min(buf.len());
buf[..n].copy_from_slice(&data[..n]);
self.inner_consume(n);
Ok(n)
/// Split the UART in reader and writer parts, by reference.
///
/// The returned halves borrow from `self`, so you can drop them and go back to using
/// the "un-split" `self`. This allows temporarily splitting the UART.
pub fn split_by_ref(&mut self) -> (&mut BufferedUarteRx<'d, U, T>, &mut BufferedUarteTx<'d, U>) {
(&mut self.rx, &mut self.tx)
}
async fn inner_write<'a>(&'a self, buf: &'a [u8]) -> Result<usize, Error> {
/// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.rx.read(buf).await
}
/// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
pub async fn fill_buf(&mut self) -> Result<&[u8], Error> {
self.rx.fill_buf().await
}
/// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
pub fn consume(&mut self, amt: usize) {
self.rx.consume(amt)
}
/// Write a buffer into this writer, returning how many bytes were written.
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
self.tx.write(buf).await
}
/// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
pub async fn flush(&mut self) -> Result<(), Error> {
self.tx.flush().await
}
}
/// Reader part of the buffered UARTE driver.
pub struct BufferedUarteTx<'d, U: UarteInstance> {
_peri: PeripheralRef<'d, U>,
}
impl<'d, U: UarteInstance> BufferedUarteTx<'d, U> {
/// Write a buffer into this writer, returning how many bytes were written.
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
poll_fn(move |cx| {
//trace!("poll_write: {:?}", buf.len());
let s = U::buffered_state();
@ -458,14 +494,15 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
//trace!("poll_write: queued {:?}", n);
compiler_fence(Ordering::SeqCst);
Self::pend_irq();
U::Interrupt::pend();
Poll::Ready(Ok(n))
})
.await
}
async fn inner_flush<'a>(&'a self) -> Result<(), Error> {
/// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
pub async fn flush(&mut self) -> Result<(), Error> {
poll_fn(move |cx| {
//trace!("poll_flush");
let s = U::buffered_state();
@ -479,8 +516,51 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
})
.await
}
}
async fn inner_fill_buf<'a>(&'a self) -> Result<&'a [u8], Error> {
impl<'a, U: UarteInstance> Drop for BufferedUarteTx<'a, U> {
fn drop(&mut self) {
let r = U::regs();
r.intenclr.write(|w| {
w.txdrdy().set_bit();
w.txstarted().set_bit();
w.txstopped().set_bit();
w
});
r.events_txstopped.reset();
r.tasks_stoptx.write(|w| unsafe { w.bits(1) });
while r.events_txstopped.read().bits() == 0 {}
let s = U::buffered_state();
unsafe { s.tx_buf.deinit() }
let s = U::state();
drop_tx_rx(r, s);
}
}
/// Reader part of the buffered UARTE driver.
pub struct BufferedUarteRx<'d, U: UarteInstance, T: TimerInstance> {
_peri: PeripheralRef<'d, U>,
timer: Timer<'d, T>,
_ppi_ch1: Ppi<'d, AnyConfigurableChannel, 1, 1>,
_ppi_ch2: Ppi<'d, AnyConfigurableChannel, 1, 2>,
_ppi_group: PpiGroup<'d, AnyGroup>,
}
impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarteRx<'d, U, T> {
/// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let data = self.fill_buf().await?;
let n = data.len().min(buf.len());
buf[..n].copy_from_slice(&data[..n]);
self.consume(n);
Ok(n)
}
/// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
pub async fn fill_buf(&mut self) -> Result<&[u8], Error> {
poll_fn(move |cx| {
compiler_fence(Ordering::SeqCst);
//trace!("poll_read");
@ -532,7 +612,8 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
.await
}
fn inner_consume(&self, amt: usize) {
/// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
pub fn consume(&mut self, amt: usize) {
if amt == 0 {
return;
}
@ -542,69 +623,31 @@ impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
rx.pop_done(amt);
U::regs().intenset.write(|w| w.rxstarted().set());
}
/// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.inner_read(buf).await
}
/// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
pub async fn fill_buf(&mut self) -> Result<&[u8], Error> {
self.inner_fill_buf().await
}
impl<'a, U: UarteInstance, T: TimerInstance> Drop for BufferedUarteRx<'a, U, T> {
fn drop(&mut self) {
self._ppi_group.disable_all();
/// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
pub fn consume(&mut self, amt: usize) {
self.inner_consume(amt)
}
let r = U::regs();
/// Write a buffer into this writer, returning how many bytes were written.
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
self.inner_write(buf).await
}
self.timer.stop();
/// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
pub async fn flush(&mut self) -> Result<(), Error> {
self.inner_flush().await
}
}
r.intenclr.write(|w| {
w.rxdrdy().set_bit();
w.rxstarted().set_bit();
w.rxto().set_bit();
w
});
r.events_rxto.reset();
r.tasks_stoprx.write(|w| unsafe { w.bits(1) });
while r.events_rxto.read().bits() == 0 {}
/// Reader part of the buffered UARTE driver.
pub struct BufferedUarteTx<'d, U: UarteInstance, T: TimerInstance> {
inner: &'d BufferedUarte<'d, U, T>,
}
let s = U::buffered_state();
unsafe { s.rx_buf.deinit() }
impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarteTx<'d, U, T> {
/// Write a buffer into this writer, returning how many bytes were written.
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
self.inner.inner_write(buf).await
}
/// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
pub async fn flush(&mut self) -> Result<(), Error> {
self.inner.inner_flush().await
}
}
/// Writer part of the buffered UARTE driver.
pub struct BufferedUarteRx<'d, U: UarteInstance, T: TimerInstance> {
inner: &'d BufferedUarte<'d, U, T>,
}
impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarteRx<'d, U, T> {
/// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.inner.inner_read(buf).await
}
/// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
pub async fn fill_buf(&mut self) -> Result<&[u8], Error> {
self.inner.inner_fill_buf().await
}
/// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
pub fn consume(&mut self, amt: usize) {
self.inner.inner_consume(amt)
let s = U::state();
drop_tx_rx(r, s);
}
}
@ -625,91 +668,59 @@ mod _embedded_io {
type Error = Error;
}
impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::ErrorType for BufferedUarteTx<'d, U, T> {
impl<'d, U: UarteInstance> embedded_io_async::ErrorType for BufferedUarteTx<'d, U> {
type Error = Error;
}
impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::Read for BufferedUarte<'d, U, T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
self.inner_read(buf).await
self.read(buf).await
}
}
impl<'d: 'd, U: UarteInstance, T: TimerInstance> embedded_io_async::Read for BufferedUarteRx<'d, U, T> {
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
self.inner.inner_read(buf).await
self.read(buf).await
}
}
impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::BufRead for BufferedUarte<'d, U, T> {
async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
self.inner_fill_buf().await
self.fill_buf().await
}
fn consume(&mut self, amt: usize) {
self.inner_consume(amt)
self.consume(amt)
}
}
impl<'d: 'd, U: UarteInstance, T: TimerInstance> embedded_io_async::BufRead for BufferedUarteRx<'d, U, T> {
async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
self.inner.inner_fill_buf().await
self.fill_buf().await
}
fn consume(&mut self, amt: usize) {
self.inner.inner_consume(amt)
self.consume(amt)
}
}
impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::Write for BufferedUarte<'d, U, T> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.inner_write(buf).await
self.write(buf).await
}
async fn flush(&mut self) -> Result<(), Self::Error> {
self.inner_flush().await
self.flush().await
}
}
impl<'d: 'd, U: UarteInstance, T: TimerInstance> embedded_io_async::Write for BufferedUarteTx<'d, U, T> {
impl<'d: 'd, U: UarteInstance> embedded_io_async::Write for BufferedUarteTx<'d, U> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
self.inner.inner_write(buf).await
self.write(buf).await
}
async fn flush(&mut self) -> Result<(), Self::Error> {
self.inner.inner_flush().await
}
}
}
impl<'a, U: UarteInstance, T: TimerInstance> Drop for BufferedUarte<'a, U, T> {
fn drop(&mut self) {
self._ppi_group.disable_all();
let r = U::regs();
self.timer.stop();
r.inten.reset();
r.events_rxto.reset();
r.tasks_stoprx.write(|w| unsafe { w.bits(1) });
r.events_txstopped.reset();
r.tasks_stoptx.write(|w| unsafe { w.bits(1) });
while r.events_txstopped.read().bits() == 0 {}
while r.events_rxto.read().bits() == 0 {}
r.enable.write(|w| w.enable().disabled());
gpio::deconfigure_pin(r.psel.rxd.read().bits());
gpio::deconfigure_pin(r.psel.txd.read().bits());
gpio::deconfigure_pin(r.psel.rts.read().bits());
gpio::deconfigure_pin(r.psel.cts.read().bits());
let s = U::buffered_state();
unsafe {
s.rx_buf.deinit();
s.tx_buf.deinit();
self.flush().await
}
}
}

View File

@ -23,7 +23,7 @@ async fn main(_spawner: Spawner) {
let mut tx_buffer = [0u8; 1024];
let mut rx_buffer = [0u8; 1024];
let mut u = BufferedUarte::new(
let u = BufferedUarte::new(
p.UARTE0,
p.TIMER0,
p.PPI_CH0,

View File

@ -23,7 +23,7 @@ async fn main(_spawner: Spawner) {
let mut tx_buffer = [0u8; 1024];
let mut rx_buffer = [0u8; 1024];
let mut u = BufferedUarte::new(
let u = BufferedUarte::new(
p.UARTE0,
p.TIMER0,
p.PPI_CH0,