use core::mem::MaybeUninit;
use avr_oxide::concurrency::{Isolated, scheduler, thread};
use avr_oxide::concurrency::scheduler::ThreadState;
use avr_oxide::concurrency::util::ThreadSet;
use avr_oxide::OxideResult;
use avr_oxide::util::datatypes::{BoundedIncDec, BoundedMaths, Volatile};
use avr_oxide::OxideResult::{Ok,Err};
pub struct RingQ<E, const SIZE: usize>
where
E: Clone + Coalesce + Copy
{
queue: [MaybeUninit<E>; SIZE],
head: Volatile<u8>,
tail: Volatile<u8>,
blocked_consumers: ThreadSet,
blocked_producers: ThreadSet,
}
pub trait Coalesce : Sized {
fn coalesced(&self, with: &Self) -> OxideResult<Self,QueueError>;
}
pub trait DoesNotCoalesce {}
pub enum QueueError {
QueueFull,
CannotCoalesce
}
impl DoesNotCoalesce for u8 {}
impl<T: DoesNotCoalesce> Coalesce for T {
fn coalesced(&self, _with: &Self) -> OxideResult<Self, QueueError> {
Err(QueueError::CannotCoalesce)
}
}
impl<E, const SIZE: usize> Default for RingQ<E,SIZE>
where
E: Clone + Coalesce + Copy
{
fn default() -> Self {
RingQ::<E,SIZE>::new()
}
}
impl<E, const SIZE: usize> RingQ<E,SIZE>
where
E: Clone + Coalesce + Copy
{
pub fn new() -> Self {
Self {
queue: [MaybeUninit::uninit(); SIZE],
head: 0u8.into(),
tail: 0u8.into(),
blocked_consumers: ThreadSet::new(),
blocked_producers: ThreadSet::new(),
}
}
pub fn new_with(event: E) -> Self {
avr_oxide::concurrency::interrupt::isolated(|isotoken|{
let mut new = Self::new();
new.append(isotoken, event).unwrap();
new
})
}
#[allow(dead_code)]
pub fn len(&self) -> u8 {
self.head.bsub::<SIZE>(&self.tail)
}
pub fn consume(&mut self, isotoken: Isolated) -> Option<E> {
let tail = self.tail.read();
if self.head == tail {
None
} else {
unsafe {
let consumed = self.queue[tail as usize].assume_init_read();
self.tail.binc_isolated::<SIZE>(isotoken);
scheduler::release_all_threads_and_clear(isotoken, &mut self.blocked_producers);
Some(consumed)
}
}
}
pub fn consume_blocking(&mut self) -> E {
loop {
let element =
avr_oxide::concurrency::interrupt::isolated(|isotoken|{
match self.consume(isotoken) {
None => {
self.blocked_consumers.add_current_thread(isotoken);
scheduler::set_current_thread_state(isotoken, ThreadState::BlockedOnQueue);
None
},
Some(e) => {
Some(e)
}
}
});
match element {
Some(value) => {
return value;
},
None => {
thread::yield_now();
}
}
}
}
#[optimize(speed)]
pub fn append(&mut self, isotoken: Isolated, element: E) -> OxideResult<(), QueueError> {
let head = self.head.read();
if head != self.tail.read() { let prev = head.bsub_isolated::<SIZE>(isotoken, 1);
unsafe {
let queued_element = &self.queue[prev as usize].assume_init();
match queued_element.coalesced(&element) {
Ok(coalesced_element) => {
*(self.queue[prev as usize].assume_init_mut()) = coalesced_element;
return Ok(())
},
Err(_) => {
}
}
}
}
if head != (self.tail.bsub_isolated::<SIZE>(isotoken, 1)) { self.queue[head as usize].write(element);
self.head.binc_isolated::<SIZE>(isotoken);
scheduler::release_all_threads_and_clear(isotoken, &mut self.blocked_consumers);
Ok(())
} else {
Err(QueueError::QueueFull)
}
}
pub fn append_blocking(&mut self, element: E) {
loop {
if avr_oxide::concurrency::interrupt::isolated(|isotoken|{
if self.append(isotoken, element).is_ok() {
true
} else {
self.blocked_producers.add_current_thread(isotoken);
scheduler::set_current_thread_state(isotoken, ThreadState::BlockedOnQueue);
false
}
}) {
return;
} else {
thread::yield_now();
}
}
}
}
#[cfg(test)]
mod tests {
use avr_oxide::OxideResult;
use avr_oxide::private::ringq::{RingQ, DoesNotCoalesce, Coalesce, QueueError, BoundedMaths};
use avr_oxide::OxideResult::{Err,Ok};
#[derive(Clone,Copy,PartialEq,Eq,Debug)]
struct TestEvent {
num: u8
}
impl DoesNotCoalesce for TestEvent {}
#[derive(Clone,Copy,PartialEq,Eq,Debug)]
struct CoalescingTestEvent {
num: u8
}
impl Coalesce for CoalescingTestEvent {
fn coalesced(&self, with: &Self) -> OxideResult<Self, QueueError> {
println!("Self.num = {}, adding {}", self.num, with.num);
Ok(Self {
num: self.num + with.num
})
}
}
#[test]
fn test_ringq() {
avr_oxide::concurrency::interrupt::isolated(|isotoken|{
let mut queue : RingQ<TestEvent,10> = RingQ::new();
println!("Initial queue length: {}", queue.len());
assert_eq!(queue.len(), 0);
let test_ev1 = TestEvent { num: 0 };
queue.append(isotoken, test_ev1);
println!("Queue length after adding 1 event: {}", queue.len());
assert_eq!(queue.len(), 1);
});
}
#[test]
#[should_panic]
fn test_ringq_bounds() {
avr_oxide::concurrency::interrupt::isolated(|isotoken|{
let mut queue: RingQ<TestEvent,10> = RingQ::new();
for i in 1..=9 {
let test_ev1 = TestEvent { num: 0 };
queue.append(isotoken, test_ev1);
println!("Appended element {}", i);
}
println!("Queue length: {}", queue.len());
println!("Succesfully added 9 events; next should panic");
let test_ev1 = TestEvent { num: 0 };
if let Ok(_) = queue.append(isotoken,test_ev1) {
} else {
panic!();
}
})
}
#[test]
fn test_ringq_consume() {
avr_oxide::concurrency::interrupt::isolated(|isotoken|{
let mut queue : RingQ<TestEvent,4> = RingQ::new();
println!("Initial queue length: {}", queue.len());
assert_eq!(queue.len(), 0);
let test_ev1 = TestEvent { num: 1 };
let test_ev2 = TestEvent { num: 2 };
let test_ev3 = TestEvent { num: 3 };
queue.append(isotoken, test_ev1.clone());
queue.append(isotoken, test_ev2.clone());
queue.append(isotoken, test_ev3.clone());
println!("Queue length after adding 3 events: {}", queue.len());
assert_eq!(queue.len(), 3);
let consumed_ev1 = queue.consume(isotoken).unwrap();
let consumed_ev2 = queue.consume(isotoken).unwrap();
println!("Queue length after consuming 2 events: {}", queue.len());
assert_eq!(queue.len(), 1);
let test_ev4 = TestEvent { num: 4 };
let test_ev5 = TestEvent { num: 5 };
queue.append(isotoken, test_ev4.clone());
queue.append(isotoken, test_ev5.clone());
println!("Queue length after adding 2 more events: {}", queue.len());
assert_eq!(queue.len(), 3);
let consumed_ev3 = queue.consume(isotoken).unwrap();
let consumed_ev4 = queue.consume(isotoken).unwrap();
let consumed_ev5 = queue.consume(isotoken).unwrap();
println!("Queue length after consuming 3 more events: {}", queue.len());
assert_eq!(queue.len(), 0);
let inserted = [ test_ev1, test_ev2, test_ev3, test_ev4, test_ev5 ];
let consumed = [ consumed_ev1, consumed_ev2, consumed_ev3, consumed_ev4, consumed_ev5 ];
assert_eq!(inserted, consumed);
})
}
#[test]
fn test_ringq_coalesce() {
avr_oxide::concurrency::interrupt::isolated(|isotoken|{
let mut queue : RingQ<CoalescingTestEvent, 4> = RingQ::new();
let test_ev1 = CoalescingTestEvent { num: 1 };
let test_ev2 = CoalescingTestEvent { num: 2 };
let test_ev3 = CoalescingTestEvent { num: 3 };
let test_ev4 = CoalescingTestEvent { num: 4 };
let test_ev5 = CoalescingTestEvent { num: 5 };
println!("Initial queue length: {}", queue.len());
assert_eq!(queue.len(), 0);
queue.append(isotoken, test_ev1.clone());
queue.append(isotoken, test_ev2.clone());
queue.append(isotoken, test_ev3.clone());
queue.append(isotoken, test_ev4.clone());
queue.append(isotoken, test_ev5.clone());
println!("Queue length after adding 5 coalescable events: {}", queue.len());
assert_eq!(queue.len(), 1);
let consumed_ev1 = queue.consume(isotoken).unwrap();
println!("Consumed event: {:?}", &consumed_ev1);
assert_eq!(consumed_ev1.num, 15);
});
}
}