Skip to content

Commit d77af8b

Browse files
timviseeithinuel
authored andcommitted
Add functions to take/release just producer or consumer
See jamesmunns#40 See jamesmunns#67
1 parent d9f647f commit d77af8b

File tree

1 file changed

+236
-12
lines changed

1 file changed

+236
-12
lines changed

Diff for: core/src/bbbuffer.rs

+236-12
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,17 @@ use core::{
1212
result::Result as CoreResult,
1313
slice::from_raw_parts_mut,
1414
sync::atomic::{
15-
AtomicBool, AtomicUsize,
15+
AtomicBool, AtomicU8, AtomicUsize,
1616
Ordering::{AcqRel, Acquire, Release},
1717
},
1818
};
19+
20+
/// Bit to define producer taken.
21+
const BIT_PRODUCER: u8 = 1 << 0;
22+
23+
/// Bit to define consumer taken.
24+
const BIT_CONSUMER: u8 = 1 << 1;
25+
1926
#[derive(Debug)]
2027
/// A backing structure for a BBQueue. Can be used to create either
2128
/// a BBQueue or a split Producer/Consumer pair
@@ -47,8 +54,10 @@ pub struct BBBuffer<const N: usize> {
4754
/// Is there an active write grant?
4855
write_in_progress: AtomicBool,
4956

50-
/// Have we already split?
51-
already_split: AtomicBool,
57+
/// Whether we have split the producer and/or consumer parts off.
58+
///
59+
/// See the `BIT_PRODUCER` and `BIT_CONSUMER` bits which define what parts have been split off.
60+
split_prod_cons: AtomicU8,
5261
}
5362

5463
unsafe impl<const A: usize> Sync for BBBuffer<A> {}
@@ -63,7 +72,7 @@ impl<'a, const N: usize> BBBuffer<N> {
6372
/// is placed at `static` scope within the `.bss` region, the explicit initialization
6473
/// will be elided (as it is already performed as part of memory initialization)
6574
///
66-
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
75+
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
6776
/// while splitting.
6877
///
6978
/// ```rust
@@ -86,7 +95,12 @@ impl<'a, const N: usize> BBBuffer<N> {
8695
/// # }
8796
/// ```
8897
pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> {
89-
if atomic::swap(&self.already_split, true, AcqRel) {
98+
// Set producer/consumer taken bit, error and reset if one was already set
99+
let prev = self
100+
.split_prod_cons
101+
.fetch_or(BIT_PRODUCER | BIT_CONSUMER, AcqRel);
102+
if prev > 0 {
103+
self.split_prod_cons.store(prev, Release);
90104
return Err(Error::AlreadySplit);
91105
}
92106

@@ -123,14 +137,82 @@ impl<'a, const N: usize> BBBuffer<N> {
123137
/// is placed at `static` scope within the `.bss` region, the explicit initialization
124138
/// will be elided (as it is already performed as part of memory initialization)
125139
///
126-
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
140+
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
127141
/// section while splitting.
128142
pub fn try_split_framed(&'a self) -> Result<(FrameProducer<'a, N>, FrameConsumer<'a, N>)> {
129143
let (producer, consumer) = self.try_split()?;
130144
Ok((FrameProducer { producer }, FrameConsumer { consumer }))
131145
}
132146

133-
/// Attempt to release the Producer and Consumer
147+
/// Attempt to take a `Producer` from the `BBBuffer` to gain access to the
148+
/// buffer. If a producer has already been taken, an error will be returned.
149+
///
150+
/// NOTE: When splitting, the underlying buffer will be explicitly initialized
151+
/// to zero. This may take a measurable amount of time, depending on the size
152+
/// of the buffer. This is necessary to prevent undefined behavior. If the buffer
153+
/// is placed at `static` scope within the `.bss` region, the explicit initialization
154+
/// will be elided (as it is already performed as part of memory initialization)
155+
///
156+
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
157+
/// while splitting.
158+
pub fn try_take_producer(&'a self) -> Result<Producer<'a, N>> {
159+
// Set producer taken bit, error if already set
160+
if self.split_prod_cons.fetch_or(BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 {
161+
return Err(Error::AlreadySplit);
162+
}
163+
164+
unsafe {
165+
// TODO: do we need to zero buffer here, like try_split?
166+
// // Explicitly zero the data to avoid undefined behavior.
167+
// // This is required, because we hand out references to the buffers,
168+
// // which mean that creating them as references is technically UB for now
169+
// let mu_ptr = self.buf.get();
170+
// (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);
171+
172+
let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
173+
174+
Ok(Producer {
175+
bbq: nn1,
176+
pd: PhantomData,
177+
})
178+
}
179+
}
180+
181+
/// Attempt to take a `Consumer` from the `BBBuffer` to gain access to the
182+
/// buffer. If a consumer has already been taken, an error will be returned.
183+
///
184+
/// NOTE: When splitting, the underlying buffer will be explicitly initialized
185+
/// to zero. This may take a measurable amount of time, depending on the size
186+
/// of the buffer. This is necessary to prevent undefined behavior. If the buffer
187+
/// is placed at `static` scope within the `.bss` region, the explicit initialization
188+
/// will be elided (as it is already performed as part of memory initialization)
189+
///
190+
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
191+
/// while splitting.
192+
pub fn try_take_consumer(&'a self) -> Result<Consumer<'a, N>> {
193+
// Set producer taken bit, error if already set
194+
if self.split_prod_cons.fetch_or(BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 {
195+
return Err(Error::AlreadySplit);
196+
}
197+
198+
unsafe {
199+
// TODO: do we need to zero buffer here, like try_split?
200+
// // Explicitly zero the data to avoid undefined behavior.
201+
// // This is required, because we hand out references to the buffers,
202+
// // which mean that creating them as references is technically UB for now
203+
// let mu_ptr = self.buf.get();
204+
// (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);
205+
206+
let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
207+
208+
Ok(Consumer {
209+
bbq: nn1,
210+
pd: PhantomData,
211+
})
212+
}
213+
}
214+
215+
/// Attempt to release the `Producer` and `Consumer`
134216
///
135217
/// This re-initializes the buffer so it may be split in a different mode at a later
136218
/// time. There must be no read or write grants active, or an error will be returned.
@@ -204,7 +286,7 @@ impl<'a, const N: usize> BBBuffer<N> {
204286
self.last.store(0, Release);
205287

206288
// Mark the buffer as ready to be split
207-
self.already_split.store(false, Release);
289+
self.split_prod_cons.store(0, Release);
208290

209291
Ok(())
210292
}
@@ -227,6 +309,148 @@ impl<'a, const N: usize> BBBuffer<N> {
227309
(FrameProducer { producer }, FrameConsumer { consumer })
228310
})
229311
}
312+
313+
/// Attempt to release the `Producer`.
314+
///
315+
/// This re-initializes the buffer so it may be split in a different mode at a later
316+
/// time. There must be no read or write grants active, or an error will be returned.
317+
///
318+
/// The `Producer` ust be from THIS `BBBuffer`, or an error will be returned.
319+
///
320+
/// ```rust
321+
/// # // bbqueue test shim!
322+
/// # fn bbqtest() {
323+
/// use bbqueue::BBBuffer;
324+
///
325+
/// // Create and split a new buffer
326+
/// let buffer: BBBuffer<6> = BBBuffer::new();
327+
/// let (prod, cons) = buffer.try_split().unwrap();
328+
///
329+
/// // Not possible to split twice
330+
/// assert!(buffer.try_split().is_err());
331+
///
332+
/// // Release the producer and consumer
333+
/// assert!(buffer.try_release(prod, cons).is_ok());
334+
///
335+
/// // Split the buffer in framed mode
336+
/// let (fprod, fcons) = buffer.try_split_framed().unwrap();
337+
/// # // bbqueue test shim!
338+
/// # }
339+
/// #
340+
/// # fn main() {
341+
/// # #[cfg(not(feature = "thumbv6"))]
342+
/// # bbqtest();
343+
/// # }
344+
/// ```
345+
pub fn try_release_producer(
346+
&'a self,
347+
prod: Producer<'a, N>,
348+
) -> CoreResult<(), Producer<'a, N>> {
349+
// Note: Re-entrancy is not possible because we require ownership
350+
// of the producer, which are not cloneable. We also
351+
// can assume the buffer has been split, because
352+
353+
// Is this our producer?
354+
let our_prod = prod.bbq.as_ptr() as *const Self == self;
355+
356+
if !(our_prod) {
357+
// Can't release, not our producer
358+
return Err(prod);
359+
}
360+
361+
let wr_in_progress = self.write_in_progress.load(Acquire);
362+
let rd_in_progress = self.read_in_progress.load(Acquire);
363+
364+
if wr_in_progress || rd_in_progress {
365+
// Can't release, active grant(s) in progress
366+
return Err(prod);
367+
}
368+
369+
// Drop the producer
370+
drop(prod);
371+
372+
// Re-initialize the buffer (not totally needed, but nice to do)
373+
self.write.store(0, Release);
374+
self.read.store(0, Release);
375+
self.reserve.store(0, Release);
376+
self.last.store(0, Release);
377+
378+
// Mark the buffer as ready to retake producer
379+
self.split_prod_cons.fetch_and(!BIT_PRODUCER, Release);
380+
381+
Ok(())
382+
}
383+
384+
/// Attempt to release the `Consumer`.
385+
///
386+
/// This re-initializes the buffer so it may be split in a different mode at a later
387+
/// time. There must be no read or write grants active, or an error will be returned.
388+
///
389+
/// The `Consumer` must be from THIS `BBBuffer`, or an error will be returned.
390+
///
391+
/// ```rust
392+
/// # // bbqueue test shim!
393+
/// # fn bbqtest() {
394+
/// use bbqueue::BBBuffer;
395+
///
396+
/// // Create and split a new buffer
397+
/// let buffer: BBBuffer<6> = BBBuffer::new();
398+
/// let (prod, cons) = buffer.try_split().unwrap();
399+
///
400+
/// // Not possible to split twice
401+
/// assert!(buffer.try_split().is_err());
402+
///
403+
/// // Release the producer and consumer
404+
/// assert!(buffer.try_release(prod, cons).is_ok());
405+
///
406+
/// // Split the buffer in framed mode
407+
/// let (fprod, fcons) = buffer.try_split_framed().unwrap();
408+
/// # // bbqueue test shim!
409+
/// # }
410+
/// #
411+
/// # fn main() {
412+
/// # #[cfg(not(feature = "thumbv6"))]
413+
/// # bbqtest();
414+
/// # }
415+
/// ```
416+
pub fn try_release_consumer(
417+
&'a self,
418+
cons: Consumer<'a, N>,
419+
) -> CoreResult<(), Consumer<'a, N>> {
420+
// Note: Re-entrancy is not possible because we require ownership
421+
// of the consumer, which are not cloneable. We also
422+
// can assume the buffer has been split, because
423+
424+
// Is this our consumer?
425+
let our_cons = cons.bbq.as_ptr() as *const Self == self;
426+
427+
if !(our_cons) {
428+
// Can't release, not our consumer
429+
return Err(cons);
430+
}
431+
432+
let wr_in_progress = self.write_in_progress.load(Acquire);
433+
let rd_in_progress = self.read_in_progress.load(Acquire);
434+
435+
if wr_in_progress || rd_in_progress {
436+
// Can't release, active grant(s) in progress
437+
return Err(cons);
438+
}
439+
440+
// Drop the consumer
441+
drop(cons);
442+
443+
// Re-initialize the buffer (not totally needed, but nice to do)
444+
self.write.store(0, Release);
445+
self.read.store(0, Release);
446+
self.reserve.store(0, Release);
447+
self.last.store(0, Release);
448+
449+
// Mark the buffer as ready to retake consumer
450+
self.split_prod_cons.fetch_and(!BIT_CONSUMER, Release);
451+
452+
Ok(())
453+
}
230454
}
231455

232456
impl<const A: usize> BBBuffer<A> {
@@ -280,7 +504,7 @@ impl<const A: usize> BBBuffer<A> {
280504
write_in_progress: AtomicBool::new(false),
281505

282506
// We haven't split at the start
283-
already_split: AtomicBool::new(false),
507+
split_prod_cons: AtomicU8::new(0),
284508
}
285509
}
286510
}
@@ -744,7 +968,7 @@ impl<'a, const N: usize> GrantW<'a, N> {
744968
/// If `used` is larger than the given grant, the maximum amount will
745969
/// be commited
746970
///
747-
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
971+
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
748972
/// section while committing.
749973
pub fn commit(mut self, used: usize) {
750974
self.commit_inner(used);
@@ -861,7 +1085,7 @@ impl<'a, const N: usize> GrantR<'a, N> {
8611085
/// If `used` is larger than the given grant, the full grant will
8621086
/// be released.
8631087
///
864-
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
1088+
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
8651089
/// section while releasing.
8661090
pub fn release(mut self, used: usize) {
8671091
// Saturate the grant release
@@ -969,7 +1193,7 @@ impl<'a, const N: usize> SplitGrantR<'a, N> {
9691193
/// If `used` is larger than the given grant, the full grant will
9701194
/// be released.
9711195
///
972-
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
1196+
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
9731197
/// section while releasing.
9741198
pub fn release(mut self, used: usize) {
9751199
// Saturate the grant release

0 commit comments

Comments
 (0)