1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
/* ringq.rs
 *
 * Developed by Tim Walls <tim.walls@snowgoons.com>
 * Copyright (c) All Rights Reserved, Tim Walls
 */
//! A simple ring queue implementation for internal use within Oxide.

// Imports ===================================================================
use core::mem::MaybeUninit;
use avr_oxide::concurrency::{Isolated, scheduler, thread};
use avr_oxide::concurrency::scheduler::ThreadState;
use avr_oxide::concurrency::util::ThreadSet;
use avr_oxide::OxideResult;
use avr_oxide::util::datatypes::{BoundedIncDec, BoundedMaths, Volatile};
use avr_oxide::OxideResult::{Ok,Err};

// Declarations ==============================================================

/**
 * A simple ring-queue implementation.  We make the maximum size 255 (i.e.
 * represented as a u8) so we can be sure that reading/writing that value
 * will be atomic without needing a lock.
 *
 * Note that in this implementation we can store 1-less than the size (we
 * need a buffer between start and end of queue so we can tell the difference
 * between full and empty.  An alternative is to maintain a separate length
 * counter, but then I have to make that atomic, whereas in this implementation
 * I can be sure tail is only ever written by the consumer, and head is only
 * ever written by the producer, and both are atomic.
 */
pub struct RingQ<E, const SIZE: usize>
  where
    E: Clone + Coalesce + Copy
{
  queue: [MaybeUninit<E>; SIZE],
  head: Volatile<u8>,
  tail: Volatile<u8>,
  blocked_consumers: ThreadSet,
  blocked_producers: ThreadSet,
}

/**
 * A trait implemented by objects that can be coalesced - i.e. two objects
 * replaced by one that represents the same semantic event/action/whatever.
 */
pub trait Coalesce : Sized {
  /**
   * Mutate myself so that I represent the result of coalescing with the
   * provided element.
   */
  fn coalesced(&self, with: &Self) -> OxideResult<Self,QueueError>;
}

/**
 * Marker trait that allows a default implementation for Coalesce for types
 * that don't want the functionality.
 */
pub trait DoesNotCoalesce {}

/**
 * Errors which can be generated by our Ring Queue.
 */
pub enum QueueError {
  /// There is no more space in the queue to append an item
  QueueFull,

  /// An attempt to coalesce objects that cannot be combined was made
  CannotCoalesce
}


// Code ======================================================================
impl DoesNotCoalesce for u8 {}

impl<T: DoesNotCoalesce> Coalesce for T {
  fn coalesced(&self, _with: &Self) -> OxideResult<Self, QueueError> {
    Err(QueueError::CannotCoalesce)
  }
}

impl<E, const SIZE: usize> Default for RingQ<E,SIZE>
where
  E: Clone + Coalesce + Copy
{
  fn default() -> Self {
    RingQ::<E,SIZE>::new()
  }
}

impl<E, const SIZE: usize> RingQ<E,SIZE>
where
  E: Clone + Coalesce + Copy
{
  pub fn new() -> Self {
    Self {
      queue: [MaybeUninit::uninit(); SIZE],
      head: 0u8.into(),
      tail: 0u8.into(),
      blocked_consumers: ThreadSet::new(),
      blocked_producers: ThreadSet::new(),
    }
  }

  pub fn new_with(event: E) -> Self {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut new = Self::new();
      new.append(isotoken, event).unwrap();
      new
    })
  }

  /**
   * Return the number of elements in the queue
   */
  #[allow(dead_code)]
  pub fn len(&self) -> u8 {
    self.head.bsub::<SIZE>(&self.tail)
  }

  /**
   * Consume an entry from the ring queue (if there is one :).)  This is
   * the single-consumer version of the method, that does no locking at all -
   * it will only be safe to use if no other thread is also reading.
   */
  pub fn consume(&mut self, isotoken: Isolated) -> Option<E> {
    let tail = self.tail.read();

    if self.head == tail {
      None
    } else {
      unsafe {
        // Extract the current value *before* we modify the tail pointer
        let consumed = self.queue[tail as usize].assume_init_read();

        self.tail.binc_isolated::<SIZE>(isotoken);

        // We can release any producers that were waiting for space
        scheduler::release_all_threads_and_clear(isotoken, &mut self.blocked_producers);

        Some(consumed)
      }
    }
  }

  /**
   * Consume an entry from the ring queue.  If there is nothing to consume,
   * block until there is.
   */
  pub fn consume_blocking(&mut self) -> E {
    loop {
      let element =
        avr_oxide::concurrency::interrupt::isolated(|isotoken|{
          match self.consume(isotoken) {
            None => {
              // If there was nothing to consume, then add myself to the blocked
              // list (we'll yield once interrupts are enabled again)
              self.blocked_consumers.add_current_thread(isotoken);
              scheduler::set_current_thread_state(isotoken, ThreadState::BlockedOnQueue);
              None
            },
            Some(e) => {
              // If we got something though, we can just return it
              Some(e)
            }
          }
        });

      match element {
        Some(value) => {
          return value;
        },
        None => {
          // Yield before trying to go round the thread again
          thread::yield_now();
        }
      }
    }
  }

  /**
   * Insert an entry into the ring queue, if possible.
   */
  #[optimize(speed)]
  pub fn append(&mut self, isotoken: Isolated, element: E) -> OxideResult<(), QueueError> {
    let head = self.head.read();

    if head != self.tail.read() { // Queue is not empty
      let prev = head.bsub_isolated::<SIZE>(isotoken, 1);

      // OK, attempt coalescing
      unsafe {
        let queued_element = &self.queue[prev as usize].assume_init();

        match queued_element.coalesced(&element) {
          Ok(coalesced_element) => {
            *(self.queue[prev as usize].assume_init_mut()) = coalesced_element;
            return Ok(())
          },
          Err(_) => {
            // Fall through
          }
        }
      }
    }

    if head != (self.tail.bsub_isolated::<SIZE>(isotoken, 1)) { // There is space
      self.queue[head as usize].write(element);
      self.head.binc_isolated::<SIZE>(isotoken);

      // We can release any waiting consumers as well
      scheduler::release_all_threads_and_clear(isotoken, &mut self.blocked_consumers);
      Ok(())
    } else {
      Err(QueueError::QueueFull)
    }
  }

  /**
   * Insert an entry into the ring queue.  If the queue is full, block until
   * there is space.
   */
  pub fn append_blocking(&mut self, element: E) {
    loop {
      if avr_oxide::concurrency::interrupt::isolated(|isotoken|{
        if self.append(isotoken, element).is_ok() {
          true
        } else {
          // Add ourselves to the list of blocked producers
          self.blocked_producers.add_current_thread(isotoken);
          scheduler::set_current_thread_state(isotoken, ThreadState::BlockedOnQueue);
          false
        }
      }) {
        return;
      } else {
        // If we got here, then we are waiting - so yield
        thread::yield_now();
      }
    }
  }
}

// Tests =====================================================================
#[cfg(test)]
mod tests {
  use avr_oxide::OxideResult;
  use avr_oxide::private::ringq::{RingQ, DoesNotCoalesce, Coalesce, QueueError, BoundedMaths};
  use avr_oxide::OxideResult::{Err,Ok};

  #[derive(Clone,Copy,PartialEq,Eq,Debug)]
  struct TestEvent {
    num: u8
  }
  impl DoesNotCoalesce for TestEvent {}

  #[derive(Clone,Copy,PartialEq,Eq,Debug)]
  struct CoalescingTestEvent {
    num: u8
  }
  impl Coalesce for CoalescingTestEvent {
    fn coalesced(&self, with: &Self) -> OxideResult<Self, QueueError> {
      println!("Self.num = {}, adding {}", self.num, with.num);
      Ok(Self {
        num: self.num + with.num
      })
    }
  }



  #[test]
  fn test_ringq() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue : RingQ<TestEvent,10> = RingQ::new();

      println!("Initial queue length: {}", queue.len());
      assert_eq!(queue.len(), 0);

      let test_ev1 = TestEvent { num: 0 };
      queue.append(isotoken, test_ev1);

      println!("Queue length after adding 1 event: {}", queue.len());
      assert_eq!(queue.len(), 1);
    });
  }

  #[test]
  #[should_panic]
  fn test_ringq_bounds() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue: RingQ<TestEvent,10> = RingQ::new();

      for i in 1..=9 {
        let test_ev1 = TestEvent { num: 0 };
        queue.append(isotoken, test_ev1);
        println!("Appended element {}", i);
      }

      println!("Queue length: {}", queue.len());
      println!("Succesfully added 9 events; next should panic");
      let test_ev1 = TestEvent { num: 0 };

      if let Ok(_) = queue.append(isotoken,test_ev1) {
      } else {
        panic!();
      }
    })
  }

  #[test]
  fn test_ringq_consume() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue : RingQ<TestEvent,4> = RingQ::new();

      println!("Initial queue length: {}", queue.len());
      assert_eq!(queue.len(), 0);

      let test_ev1 = TestEvent { num: 1 };
      let test_ev2 = TestEvent { num: 2 };
      let test_ev3 = TestEvent { num: 3 };
      queue.append(isotoken, test_ev1.clone());
      queue.append(isotoken, test_ev2.clone());
      queue.append(isotoken, test_ev3.clone());

      println!("Queue length after adding 3 events: {}", queue.len());
      assert_eq!(queue.len(), 3);

      let consumed_ev1 = queue.consume(isotoken).unwrap();
      let consumed_ev2 = queue.consume(isotoken).unwrap();

      println!("Queue length after consuming 2 events: {}", queue.len());
      assert_eq!(queue.len(), 1);

      let test_ev4 = TestEvent { num: 4 };
      let test_ev5 = TestEvent { num: 5 };
      queue.append(isotoken, test_ev4.clone());
      queue.append(isotoken, test_ev5.clone());

      println!("Queue length after adding 2 more events: {}", queue.len());
      assert_eq!(queue.len(), 3);

      let consumed_ev3 = queue.consume(isotoken).unwrap();
      let consumed_ev4 = queue.consume(isotoken).unwrap();
      let consumed_ev5 = queue.consume(isotoken).unwrap();

      println!("Queue length after consuming 3 more events: {}", queue.len());
      assert_eq!(queue.len(), 0);

      // Check we got the data out in the same order we inserted it
      let inserted = [ test_ev1, test_ev2, test_ev3, test_ev4, test_ev5 ];
      let consumed = [ consumed_ev1, consumed_ev2, consumed_ev3, consumed_ev4, consumed_ev5 ];
      assert_eq!(inserted, consumed);
    })
  }

  #[test]
  fn test_ringq_coalesce() {
    avr_oxide::concurrency::interrupt::isolated(|isotoken|{
      let mut queue : RingQ<CoalescingTestEvent, 4> = RingQ::new();

      let test_ev1 = CoalescingTestEvent { num: 1 };
      let test_ev2 = CoalescingTestEvent { num: 2 };
      let test_ev3 = CoalescingTestEvent { num: 3 };
      let test_ev4 = CoalescingTestEvent { num: 4 };
      let test_ev5 = CoalescingTestEvent { num: 5 };

      println!("Initial queue length: {}", queue.len());
      assert_eq!(queue.len(), 0);

      queue.append(isotoken, test_ev1.clone());
      queue.append(isotoken, test_ev2.clone());
      queue.append(isotoken, test_ev3.clone());
      queue.append(isotoken, test_ev4.clone());
      queue.append(isotoken, test_ev5.clone());

      println!("Queue length after adding 5 coalescable events: {}", queue.len());
      assert_eq!(queue.len(), 1);

      let consumed_ev1 = queue.consume(isotoken).unwrap();
      println!("Consumed event: {:?}", &consumed_ev1);
      assert_eq!(consumed_ev1.num, 15);
    });
  }


}