use super::Builder;use crate::{client,message::Message,store::Store,};use crossbeam_utils::sync::WaitGroup;use flume::{Receiver,Sender,};use log::warn;use std::{os::unix::net::UnixDatagram,path::PathBuf,sync::{atomic::{AtomicBool,Ordering,},Arc,},thread,};use thiserror::Error;const BUFFER_SIZE: usize = 65_527;#[derive(Error, Debug)]pub enum Error {#[error("can not receive message from socket: {0}")]ReceiveFromSocket(std::io::Error),#[error("can not send received data to processing: {0}")]SendBuffer(flume::SendError<Vec<u8>>),#[error("can not deserialize message: {0}")]DeserializeMessage(bincode::Error),#[error("can not receive data from channel: {0}")]ReceiveData(flume::RecvError),#[error("can not remove socket: {0}")]RemoveSocket(std::io::Error),#[error("can not setup ctrlc handler: {0}")]SetupCtrlHandler(ctrlc::Error),}pub struct Server {pub(super) entries: sled::Db,pub(super) socket: UnixDatagram,pub(super) socket_path: PathBuf,pub(super) store: Store,pub(super) stopping: Arc<AtomicBool>,pub(super) wait_group: WaitGroup,}pub fn builder(cache_dir: PathBuf, data_dir: PathBuf, socket: PathBuf) -> Builder {Builder {cache_dir,data_dir,socket,}}impl Server {pub fn run(self) -> Result<(), Error> {let data_sender =Self::start_processor(Arc::clone(&self.stopping), self.wait_group.clone())?;Self::start_receiver(Arc::clone(&self.stopping),self.wait_group.clone(),self.socket,data_sender,)?;Self::ctrl_c_watcher(self.stopping, self.socket_path.clone())?;self.wait_group.wait();std::fs::remove_file(&self.socket_path).map_err(Error::RemoveSocket)?;Ok(())}fn ctrl_c_watcher(stopping: Arc<AtomicBool>, socket_path: PathBuf) -> Result<(), Error> {ctrlc::set_handler(move || {stopping.store(true, Ordering::SeqCst);let client = client::new(socket_path.clone());if let Err(err) = client.send(&Message::Stop) {warn!("{}", err);}}).map_err(Error::SetupCtrlHandler)?;Ok(())}fn start_receiver(stopping: Arc<AtomicBool>,wait_group: WaitGroup,socket: UnixDatagram,data_sender: Sender<Vec<u8>>,) -> Result<(), Error> {thread::spawn(move || {loop {if dbg!(stopping.load(Ordering::SeqCst)) {dbg!("break loop");break;}if let Err(err) = Self::receive(&socket, &data_sender) {warn!("{}", err)}}drop(wait_group)});Ok(())}fn receive(socket: &UnixDatagram, data_sender: &Sender<Vec<u8>>) -> Result<(), Error> {dbg!("receive");let mut buffer = [0_u8; BUFFER_SIZE];let (written, _) = socket.recv_from(&mut buffer).map_err(Error::ReceiveFromSocket)?;data_sender.send(buffer[0..written].to_vec()).map_err(Error::SendBuffer)?;Ok(())}fn start_processor(stopping: Arc<AtomicBool>,wait_group: WaitGroup,) -> Result<Sender<Vec<u8>>, Error> {let (data_sender, data_receiver) = flume::unbounded();thread::spawn(move || {loop {if dbg!(stopping.load(Ordering::SeqCst)) {break;}if let Err(err) = Self::process(&stopping, &data_receiver) {warn!("{}", err)}}drop(wait_group)});Ok(data_sender)}fn process(stopping: &Arc<AtomicBool>, data_receiver: &Receiver<Vec<u8>>) -> Result<(), Error> {let data = data_receiver.recv().map_err(Error::ReceiveData)?;let message = bincode::deserialize(&data).map_err(Error::DeserializeMessage)?;dbg!(&message);match message {Message::Stop => stopping.store(true, Ordering::SeqCst),_ => todo!(),}Ok(())}}
use thiserror::Error;const BUFFER_SIZE: usize = 65_527;#[derive(Error, Debug)]pub enum Error {#[error("command for session already started")]SessionCommandAlreadyStarted,#[error("command for session not started yet")]SessionCommandNotStarted,impl Server {let (data_sender, data_receiver) = flume::unbounded();let (signal_sender, signal_receiver) = flume::unbounded();let stopping = Arc::new(AtomicBool::new(false));Self::ctrl_c_watcher(signal_sender)?;Self::signal_watcher(signal_receiver, Arc::clone(&stopping));Self::start_processor(data_receiver);loop {}}let writer = std::io::BufWriter::new(file);Ok(self)}fn signal_watcher(signal_receiver: Receiver<RunState>, stopping: Arc<AtomicBool>) {thread::spawn(move || loop {match signal_receiver.recv() {Ok(signal) => match signal {RunState::Stop => stopping.store(true, Ordering::SeqCst),RunState::Continue => continue,},Err(err) => warn!("{}", err),}});serde_json::to_writer(writer, &self.state).map_err(Error::SerializeState)?;let file = std::fs::File::create(&self.cache_path).map_err(Error::CreateCacheFile)?;}match message {Message::CommandStart(data) => self.command_start(data),Message::Running => self.command_running(),Message::Disable(uuid) => self.disable_session(uuid),Message::Enable(uuid) => self.enable_session(uuid),}}Ok(RunState::Continue)}}}fn disable_session(&mut self, session_id: Uuid) -> Result<RunState, Error> {self.state.entries.remove(&session_id).expect("already tested if exists");self.state.disabled_session.insert(session_id);Ok(RunState::Continue)}fn enable_session(&mut self, session_id: Uuid) -> Result<RunState, Error> {self.state.disabled_session.remove(&session_id);Ok(RunState::Continue)}fn command_running(&self) -> Result<RunState, Error> {info!("session_id={session_id}, command={command}",session_id = session_id,command = entry.command)});Ok(RunState::Continue)self.state.entries.iter().for_each(|(session_id, entry)| {self.store.add(&entry).map_err(Error::AddStore)?;}return Err(Error::SessionCommandAlreadyStarted);}}return Err(Error::SessionCommandNotStarted);}let start = self.entries.remove(&finish.session_id).expect("already tested if exists");let entry = Entry::from_messages(start, finish);.statefn command_finished(&mut self, finish: &CommandFinished) -> Result<RunState, Error> {if self.state.disabled_session.contains(&finish.session_id) {return Err(Error::DisabledSession(finish.session_id));}if !self.state.entries.contains_key(&finish.session_id) {Ok(RunState::Continue)if self.state.disabled_session.contains(&start.session_id) {return Err(Error::DisabledSession(start.session_id));}self.state.entries.insert(start.session_id, start);fn command_start(&mut self, start: CommandStart) -> Result<RunState, Error> {if self.state.entries.contains_key(&start.session_id) {fn start_processor(data_receiver: Receiver<Vec<u8>>) -> Result<(), Error> {loop {let buffer = match data_receiver.recv() {Ok(b) => b,Err(err) => {warn!("{}", err);continue;}};let message = bincode::deserialize(&buffer).map_err(Error::DeserializeMessage)?;}Ok(())}let mut buffer = [0_u8; BUFFER_SIZE];fn receive(socket: &UnixDatagram, data_sender: &Sender<Vec<u8>>) -> Result<(), Error> {Message::CommandFinished(data) => self.command_finished(&data),Message::Stop => Ok(RunState::Stop),let (written, _) = socket.recv_from(&mut buffer).map_err(Error::ReceiveFromSocket)?;data_sender.send(buffer[0..written].to_vec()).map_err(Error::SendBuffer)?;fn ctrl_c_watcher(signal_sender: Sender<RunState>) -> Result<(), Error> {ctrlc::set_handler(move || {if let Err(err) = signal_sender.send(RunState::Stop) {warn!("{}", err)}}).map_err(Error::SetupCtrlHandler)?;Ok(())}fn process(&mut self, message: Message) -> Result<RunState, Error> {std::fs::remove_file(&socket_path).map_err(Error::RemoveSocket)?;if let Err(err) = Self::receive(&socket, &data_sender) {warn!("{}", err)if stopping.load(Ordering::SeqCst) {break;}let socket_path_parent = socket_path.parent().ok_or(Error::NoSocketPathParent)?;std::fs::create_dir_all(socket_path_parent).map_err(Error::CreateSocketPathParent)?;info!("starting server listening on path {:?}", socket_path);let socket = UnixDatagram::bind(&socket_path).map_err(Error::BindSocket)?;pub fn start(mut self, socket_path: &PathBuf) -> Result<Self, Error> {fn from_cachefile(cache_path: PathBuf, data_dir: PathBuf) -> Result<Server, Error> {let reader = std::io::BufReader::new(file);let store = store::new(data_dir);Ok(Server {store,cache_path,})}state,let state = serde_json::from_reader(reader).map_err(Error::DeserializeState)?;let file = std::fs::File::open(&cache_path).map_err(Error::OpenCacheFile)?;}}pub fn new(cache_path: PathBuf, data_dir: PathBuf) -> Result<Server, Error> {if cache_path.exists() {from_cachefile(cache_path, data_dir)} else {Ok(Server {store: store::new(data_dir),cache_path,})state: State::default(),#[derive(Debug, Default, Serialize, Deserialize)]struct State {entries: HashMap<Uuid, CommandStart>,disabled_session: HashSet<Uuid>,}#[error("can not add to storeo: {0}")]AddStore(crate::store::Error),}match self {}}}#[derive(Debug)]pub struct Server {store: Store,cache_path: PathBuf,}state: State,Self::Stop => true,Self::Continue => false,#[derive(Debug)]enum RunState {Stop,Continue,}impl RunState {const fn is_stop(&self) -> bool {#[error("can not open cache file: {0}")]OpenCacheFile(std::io::Error),#[error("can not deserialize cache entries: {0}")]#[error("can not bind to socket: {0}")]BindSocket(std::io::Error),#[error("can not remove socket: {0}")]RemoveSocket(std::io::Error),#[error("can not create cache file: {0}")]CreateCacheFile(std::io::Error),#[error("can not serialize cache entries: {0}")]#[error("can not receive message from socket: {0}")]ReceiveFromSocket(std::io::Error),#[error("no parent directory for socket path")]NoSocketPathParent,#[error("can not create socket parent directory: {0}")]CreateSocketPathParent(std::io::Error),#[error("not recording because session {0} is disabled")]DisabledSession(Uuid),#[error("can not send received data to processing: {0}")]SendBuffer(flume::SendError<Vec<u8>>),#[error("can not setup ctrlc handler: {0}")]SetupCtrlHandler(ctrlc::Error),SerializeState(serde_json::Error),DeserializeState(serde_json::Error),use uuid::Uuid;use crate::{entry::Entry,message::{CommandFinished,CommandStart,Message,},store,use std::{os::unix::net::UnixDatagram,path::PathBuf,sync::{atomic::{AtomicBool,Ordering,},Arc,},thread,};collections::{HashMap,HashSet,},store::Store,};use flume::{Receiver,Sender,};use log::{info,warn,};use serde::{Deserialize,Serialize,};
use crate::message::CommandStart;use serde::Serialize;use std::path::Path;use thiserror::Error;use uuid::Uuid;#[derive(Error, Debug)]pub enum Error {#[error("can not open entries database: {0}")]OpenEntriesDatabase(sled::Error),#[error("can not open disabled_sessions database: {0}")]OpenDisabledSessionsDatabase(sled::Error),#[error("can not serialize data: {0}")]SerializeData(bincode::Error),#[error("can not deserialize entry: {0}")]DeserializeEntry(bincode::Error),#[error("{0}")]Sled(#[from] sled::Error),#[error("entry does not exist in db")]EntryNotExist,}pub fn new(path: impl AsRef<Path>) -> Result<Db, Error> {let entries = sled::open(path.as_ref().join("entries")).map_err(Error::OpenEntriesDatabase)?;let disabled_sessions = sled::open(path.as_ref().join("disabled_sessions")).map_err(Error::OpenDisabledSessionsDatabase)?;Ok(Db {entries,disabled_sessions,})}pub struct Db {entries: sled::Db,disabled_sessions: sled::Db,}impl Db {pub fn contains_entry(&self, uuid: &Uuid) -> Result<bool, Error> {let key = Self::serialize(uuid)?;let contains = self.entries.contains_key(key)?;Ok(contains)}pub fn is_session_disabled(&self, uuid: &Uuid) -> Result<bool, Error> {let key = Self::serialize(uuid)?;let contains = self.disabled_sessions.contains_key(key)?;Ok(contains)}pub fn add_entry(&self, entry: &CommandStart) -> Result<(), Error> {let key = Self::serialize(&entry.session_id)?;let value = Self::serialize(&entry)?;self.entries.insert(key, value)?;Ok(())}pub fn remove_entry(&self, uuid: &Uuid) -> Result<CommandStart, Error> {let key = Self::serialize(uuid)?;let data = self.entries.remove(key)?.ok_or(Error::EntryNotExist)?;let entry = Self::deserialize_entry(&data)?;Ok(entry)}pub fn disable_session(&self, uuid: &Uuid) -> Result<(), Error> {let key = Self::serialize(uuid)?;let value = Self::serialize(true)?;self.disabled_sessions.insert(key, value)?;self.remove_entry(&uuid)?;Ok(())}pub fn enable_session(&self, uuid: &Uuid) -> Result<(), Error> {let key = Self::serialize(uuid)?;self.disabled_sessions.remove(&key)?;Ok(())}fn serialize(data: impl Serialize) -> Result<Vec<u8>, Error> {let bytes = bincode::serialize(&data).map_err(Error::SerializeData)?;Ok(bytes)}fn deserialize_entry(data: &sled::IVec) -> Result<CommandStart, Error> {let entry = bincode::deserialize(&data).map_err(Error::DeserializeEntry)?;Ok(entry)}}
pub use server::{builder,Error as ServerError,Server,
pub use Error as ServerError;use crate::{client,entry::Entry,message::{CommandFinished,CommandStart,Message,},store::Store,};use crossbeam_utils::sync::WaitGroup;use db::Db;use flume::{Receiver,Sender,};use log::warn;use std::{os::unix::net::UnixDatagram,path::{Path,PathBuf,},sync::{atomic::{AtomicBool,Ordering,},Arc,},thread,
use thiserror::Error;use uuid::Uuid;const BUFFER_SIZE: usize = 65_527;#[derive(Error, Debug)]pub enum Error {#[error("can not receive message from socket: {0}")]ReceiveFromSocket(std::io::Error),#[error("can not send received data to processing: {0}")]SendBuffer(flume::SendError<Vec<u8>>),#[error("can not deserialize message: {0}")]DeserializeMessage(bincode::Error),#[error("can not receive data from channel: {0}")]ReceiveData(flume::RecvError),#[error("can not remove socket: {0}")]RemoveSocket(std::io::Error),#[error("can not setup ctrlc handler: {0}")]SetupCtrlHandler(ctrlc::Error),#[error("command for session already started")]SessionCommandAlreadyStarted,#[error("command for session not started yet")]SessionCommandNotStarted,#[error("can not check if key exists in db: {0}")]CheckContainsEntry(db::Error),#[error("can not check if session is disabled in db: {0}")]CheckDisabledSession(db::Error),#[error("not recording because session {0} is disabled")]DisabledSession(Uuid),#[error("can not add entry to db: {0}")]AddDbEntry(db::Error),#[error("can not remove entry from db: {0}")]RemoveDbEntry(db::Error),#[error("can not add to storeo: {0}")]AddStore(crate::store::Error),#[error("db error: {0}")]Db(#[from] db::Error),}pub struct Server {pub(super) db: Db,pub(super) socket: UnixDatagram,pub(super) socket_path: PathBuf,pub(super) store: Store,pub(super) stopping: Arc<AtomicBool>,pub(super) wait_group: WaitGroup,}pub fn builder(cache_dir: PathBuf, data_dir: PathBuf, socket: PathBuf) -> Builder {Builder {cache_dir,data_dir,socket,}}impl Server {pub fn run(self) -> Result<(), Error> {let data_sender = Self::start_processor(Arc::clone(&self.stopping),self.wait_group.clone(),self.db,self.store,self.socket_path.clone(),)?;Self::start_receiver(Arc::clone(&self.stopping),self.wait_group.clone(),self.socket,data_sender,)?;Self::ctrl_c_watcher(self.stopping, self.socket_path.clone())?;self.wait_group.wait();std::fs::remove_file(&self.socket_path).map_err(Error::RemoveSocket)?;Ok(())}fn ctrl_c_watcher(stopping: Arc<AtomicBool>, socket_path: PathBuf) -> Result<(), Error> {ctrlc::set_handler(move || {stopping.store(true, Ordering::SeqCst);let client = client::new(socket_path.clone());if let Err(err) = client.send(&Message::Stop) {warn!("{}", err);}}).map_err(Error::SetupCtrlHandler)?;Ok(())}fn start_receiver(stopping: Arc<AtomicBool>,wait_group: WaitGroup,socket: UnixDatagram,data_sender: Sender<Vec<u8>>,) -> Result<(), Error> {thread::spawn(move || {loop {if stopping.load(Ordering::SeqCst) {break;}if let Err(err) = Self::receive(&socket, &data_sender) {warn!("{}", err)}}drop(wait_group)});Ok(())}fn receive(socket: &UnixDatagram, data_sender: &Sender<Vec<u8>>) -> Result<(), Error> {let mut buffer = [0_u8; BUFFER_SIZE];let (written, _) = socket.recv_from(&mut buffer).map_err(Error::ReceiveFromSocket)?;data_sender.send(buffer[0..written].to_vec()).map_err(Error::SendBuffer)?;Ok(())}fn start_processor(stopping: Arc<AtomicBool>,wait_group: WaitGroup,db: Db,store: Store,socket_path: PathBuf,) -> Result<Sender<Vec<u8>>, Error> {let (data_sender, data_receiver) = flume::unbounded();thread::spawn(move || {loop {if stopping.load(Ordering::SeqCst) {break;}if let Err(err) =Self::process(&stopping, &data_receiver, &db, &store, &socket_path){warn!("{}", err)}}drop(wait_group)});Ok(data_sender)}fn process(stopping: &Arc<AtomicBool>,data_receiver: &Receiver<Vec<u8>>,db: &Db,store: &Store,socket_path: impl AsRef<Path>,) -> Result<(), Error> {let data = data_receiver.recv().map_err(Error::ReceiveData)?;let message = bincode::deserialize(&data).map_err(Error::DeserializeMessage)?;dbg!(&message);match message {Message::Stop => {stopping.store(true, Ordering::SeqCst);let client = client::new(socket_path.as_ref().to_path_buf());if let Err(err) = client.send(&Message::Stop) {warn!("{}", err);}Ok(())}Message::CommandStart(data) => Self::command_start(db, &data),Message::CommandFinished(data) => Self::command_finished(db, store, &data),Message::Disable(uuid) => Self::disable_session(db, &uuid),Message::Enable(uuid) => Self::enable_session(db, &uuid),}}fn command_start(db: &Db, data: &CommandStart) -> Result<(), Error> {if db.contains_entry(&data.session_id).map_err(Error::CheckContainsEntry)?{return Err(Error::SessionCommandAlreadyStarted);}if db.is_session_disabled(&data.session_id).map_err(Error::CheckDisabledSession)?{return Err(Error::DisabledSession(data.session_id));}db.add_entry(data).map_err(Error::AddDbEntry)?;Ok(())}fn command_finished(db: &Db, store: &Store, data: &CommandFinished) -> Result<(), Error> {if db.is_session_disabled(&data.session_id).map_err(Error::CheckDisabledSession)?{return Err(Error::DisabledSession(data.session_id));}if !db.contains_entry(&data.session_id).map_err(Error::CheckContainsEntry)?{return Err(Error::SessionCommandNotStarted);}let start = db.remove_entry(&data.session_id).map_err(Error::RemoveDbEntry)?;let entry = Entry::from_messages(start, data);store.add(&entry).map_err(Error::AddStore)?;Ok(())}fn disable_session(db: &Db, uuid: &Uuid) -> Result<(), Error> {db.disable_session(uuid)?;Ok(())}fn enable_session(db: &Db, uuid: &Uuid) -> Result<(), Error> {db.enable_session(uuid)?;Ok(())}}