use crossbeam::{
channel::{Receiver, RecvError, Sender, TryRecvError},
sync::WaitGroup,
};
use std::{
marker::PhantomData,
ops::Deref,
sync::{Arc, Mutex},
thread::JoinHandle,
};
use tracing::{debug, trace};
use crate::{
commands::{DeviceCommand, Frame, HostCommand, RawCommandData},
driver::{CommandSequence, CommandSequenceContext, CommandSequenceError, Driver},
};
pub struct Device<T>
where
T: Driver,
{
worker_type: PhantomData<T>,
output_receiver: Receiver<Frame<DeviceCommand>>,
}
struct PendingFrame {
frame: Frame<RawCommandData>,
response: oneshot::Sender<Result<(), CommandSequenceError>>,
}
pub struct DeviceCommands(Sender<PendingFrame>);
impl<'a, T> Device<T>
where
T: Driver + Send + CommandSequenceContext + 'static,
{
pub fn new(driver: T) -> (Self, DeviceCommands, WaitGroup) {
let (sender, output_receiver) = crossbeam::channel::unbounded();
let (send_sender, receiver) = crossbeam::channel::unbounded();
let complete = WaitGroup::new();
let child_complete = complete.clone();
// TODO: Figure out if we can catch any panics
std::thread::spawn(move || Self::execute(driver, sender, receiver, child_complete));
(
Self {
output_receiver,
worker_type: PhantomData::default(),
},
DeviceCommands(send_sender),
complete,
)
}
fn execute(
mut driver: T,
mut sender: Sender<Frame<DeviceCommand>>,
mut receiver: Receiver<PendingFrame>,
_complete: WaitGroup,
) {
loop {
if let Ok(incoming) = driver.read_frame() {
trace!("got frame on serial port: {:?}", incoming);
if sender.send(incoming).is_err() {
break;
}
}
match receiver.try_recv() {
Ok(pending) => {
if pending.response.send(driver.send(pending.frame)).is_err() {
break;
}
}
Err(TryRecvError::Disconnected) => break,
_ => {}
}
}
}
}
impl<T> Deref for Device<T>
where
T: Driver,
{
type Target = Receiver<Frame<DeviceCommand>>;
fn deref(&self) -> &Self::Target {
&self.output_receiver
}
}
impl CommandSequenceContext for DeviceCommands {
fn send<T>(&mut self, command: Frame<T>) -> Result<(), crate::driver::CommandSequenceError>
where
T: Into<crate::commands::RawCommandData>,
{
let (response, wait) = oneshot::channel();
self
.0
.send(PendingFrame {
frame: match command {
Frame::Request(r) => Frame::Request(r.into()),
Frame::Response(r) => Frame::Response(r.into()),
},
response,
})
.map_err(|_| CommandSequenceError::Cancel)?;
wait.recv().map_err(|_| CommandSequenceError::Cancel)?
}
}