impl Lock {pub async fn mut_txn_lock<P: AsRef<Path>>(p: P) -> Result<Self, anyhow::Error> {let pp = p.as_ref().join("db_lock");Ok(Lock::MutTxn(mut_txn(&pp).await?))}pub async fn txn_lock<P: AsRef<Path>>(p: P) -> Result<Self, anyhow::Error> {let pp = p.as_ref().join("db_lock");Ok(Lock::Txn(txn(&pp).await?))}pub async fn commit(&mut self) -> Result<(), anyhow::Error> {match self {Lock::MutTxn(m) => m.commit().await,_ => Ok(()),}}}}
pristine: libpijul::pristine::sanakirja::Pristine::new(&pristine_dir)?,
lock: if mutable {lock::Lock::mut_txn_lock(&pristine_dir).await?} else {lock::Lock::txn_lock(&pristine_dir).await?},pristine: unsafe {libpijul::pristine::sanakirja::Pristine::new_nolock(&pristine_dir)?},
use anyhow::bail;use std::io::Read;use std::path::Path;use tokio::io::{AsyncReadExt, AsyncWriteExt};use tokio::net::UnixStream;pub struct TxnLock {_stream: UnixStream,}pub struct MutTxnLock {stream: UnixStream,}const TXN_BEGIN: u8 = 0;const MUT_TXN_BEGIN: u8 = 1;const COMMIT: u8 = 2;const ACK: u8 = 3;pub async fn mut_txn<P: AsRef<Path>>(file: P) -> Result<MutTxnLock, anyhow::Error> {let cmd = std::env::args().next().unwrap();let mut process = std::process::Command::new(&cmd).args(&["lock", file.as_ref().to_str().unwrap()]).stdout(std::process::Stdio::piped()).spawn()?;let s = process.stdout.as_mut().unwrap();s.read(&mut [0u8])?;let mut stream = UnixStream::connect(file).await?;stream.writable().await?;stream.write_all(&[MUT_TXN_BEGIN]).await?;let mut t = [0u8];stream.read_exact(&mut t).await?;if t[0] == ACK {Ok(MutTxnLock { stream })} else {bail!("Pristine locked")}}pub async fn txn<P: AsRef<Path>>(file: P) -> Result<TxnLock, anyhow::Error> {let cmd = std::env::args().next().unwrap();std::process::Command::new(&cmd).args(&["lock", file.as_ref().to_str().unwrap()]).spawn()?;let mut stream = UnixStream::connect(file).await?;stream.write_all(&[TXN_BEGIN]).await?;let mut t = [0u8];stream.read_exact(&mut t).await?;if t[0] == ACK {Ok(TxnLock { _stream: stream })} else {bail!("Pristine locked")}}impl MutTxnLock {pub async fn commit(&mut self) -> Result<(), anyhow::Error> {self.stream.write_all(&[COMMIT]).await?;Ok(())}}
use fs2::FileExt;use std::fs::OpenOptions;use std::path::Path;pub struct TxnLock {file: std::fs::File,}pub struct MutTxnLock {file: std::fs::File,}pub async fn mut_txn<P: AsRef<Path>>(file: P) -> Result<MutTxnLock, anyhow::Error> {let file = OpenOptions::new().write(true).create(true).open(file)?;file.lock_exclusive()?;Ok(MutTxnLock { file })}pub async fn txn<P: AsRef<Path>>(file: P) -> Result<TxnLock, anyhow::Error> {let file = OpenOptions::new().write(true).create(true).open(file)?;file.lock_shared()?;Ok(TxnLock { file })}impl MutTxnLock {pub async fn commit(&mut self) -> Result<(), anyhow::Error> {Ok(self.file.unlock()?)}}impl Drop for MutTxnLock {fn drop(&mut self) {self.file.unlock().unwrap_or(())}}impl Drop for TxnLock {fn drop(&mut self) {self.file.unlock().unwrap_or(())}}
SubCommand::Change(change) => change.run(),SubCommand::Channel(channel) => channel.run(),SubCommand::Protocol(protocol) => protocol.run(),
SubCommand::Change(change) => change.run().await,SubCommand::Channel(channel) => channel.run().await,SubCommand::Protocol(protocol) => protocol.run().await,
SubCommand::Git(git) => git.run(),SubCommand::Mv(mv) => mv.run(),SubCommand::Ls(ls) => ls.run(),SubCommand::Add(add) => add.run(),SubCommand::Remove(remove) => remove.run(),SubCommand::Reset(reset) => reset.run(),
SubCommand::Git(git) => git.run().await,SubCommand::Mv(mv) => mv.run().await,SubCommand::Ls(ls) => ls.run().await,SubCommand::Add(add) => add.run().await,SubCommand::Remove(remove) => remove.run().await,SubCommand::Reset(reset) => reset.run().await,
SubCommand::Debug(debug) => debug.run(),SubCommand::Fork(fork) => fork.run(),SubCommand::Unrecord(unrecord) => unrecord.run(),SubCommand::Apply(apply) => apply.run(),SubCommand::Remote(remote) => remote.run(),
SubCommand::Debug(debug) => debug.run().await,SubCommand::Fork(fork) => fork.run().await,SubCommand::Unrecord(unrecord) => unrecord.run().await,SubCommand::Apply(apply) => apply.run().await,SubCommand::Remote(remote) => remote.run().await,
use fs2::FileExt;use std::fs::OpenOptions;use std::io::Write;use std::path::PathBuf;use std::sync::Arc;use tokio::io::{AsyncReadExt, AsyncWriteExt};use tokio::net::UnixListener;use tokio::sync::{watch, Mutex};use clap::Clap;#[derive(Clap, Debug)]pub struct Lock {path: PathBuf,}const COMMIT: u8 = 2;const ACK: u8 = 3;const LOCKED: u8 = 4;impl Lock {pub async fn run(self) -> Result<(), anyhow::Error> {let lock = Arc::new(self.path.with_extension("lock"));let lockfile = OpenOptions::new().write(true).create(true).open(&lock.as_ref()).unwrap();if lockfile.try_lock_exclusive().is_err() {return Ok(());}let listener = UnixListener::bind(&self.path).unwrap();println!();let n_clients = Arc::new(Mutex::new(0usize));let file = Arc::new(self.path.clone());let muttxn = Arc::new(Mutex::new(()));let (tx, active_at_last_commit) = watch::channel(0);let tx = std::sync::Arc::new(tx);let clock = Arc::new(Mutex::new(0usize));let txn_counter = Arc::new(Mutex::new(0usize));let last_commit_date = Arc::new(Mutex::new(0usize));loop {let x = listener.accept().await;{let mut f = OpenOptions::new().write(true).create(true).append(true).open("log").unwrap();writeln!(f, "accepted").unwrap()}match x {Ok((mut stream, _addr)) => {*n_clients.lock().await += 1;{let mut f = OpenOptions::new().write(true).create(true).append(true).open("log").unwrap();writeln!(f, "n = {:?}", *n_clients.lock().await).unwrap();}let file = file.clone();let muttxn = muttxn.clone();let mut active_at_last_commit = active_at_last_commit.clone();let clock = clock.clone();let last_commit_date = last_commit_date.clone();let txn_counter = txn_counter.clone();let n_clients = n_clients.clone();let tx = tx.clone();let lock = lock.clone();tokio::spawn(async move {let mut t = [0u8];while let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == 1 {// muttxnlet lock = if let Ok(guard) = muttxn.try_lock() {guard} else {stream.write_all(&[LOCKED]).await.unwrap_or(());muttxn.lock().await};while *active_at_last_commit.borrow() > 0 {stream.write_all(&[LOCKED]).await.unwrap_or(());active_at_last_commit.changed().await.unwrap();}stream.write_all(&[ACK]).await.unwrap_or(());if let Ok(n) = stream.read(&mut t).await {if n == 0 {break;}if t[0] == COMMIT {// commitlet mut clock = clock.lock().await;*clock += 1;*last_commit_date.lock().await = *clock;let counter = *txn_counter.lock().await;tx.send(counter).unwrap();}}std::mem::drop(lock);} else {// txn*txn_counter.lock().await += 1;let start_date = *clock.lock().await;stream.write_all(&[ACK]).await.unwrap_or(());let n = stream.read(&mut t).await.unwrap_or(0);if n == 0 {break;}*txn_counter.lock().await -= 1;if start_date < *last_commit_date.lock().await {let last = *active_at_last_commit.borrow();tx.send(last - 1).unwrap();}}}{let mut f = OpenOptions::new().write(true).create(true).append(true).open("log").unwrap();writeln!(f, "n = {:?}", *n_clients.lock().await).unwrap();}if *n_clients.lock().await == 1 {tokio::time::sleep(std::time::Duration::from_secs(1)).await;if *n_clients.lock().await == 1 {std::fs::remove_file(file.as_ref()).unwrap_or(());let lockfile = OpenOptions::new().write(true).create(true).open(lock.as_ref()).unwrap();lockfile.unlock().unwrap_or(());std::process::exit(0)}}*n_clients.lock().await -= 1});}Err(_) => break,}}std::fs::remove_file(file.as_ref()).unwrap_or(());let lockfile = OpenOptions::new().write(true).create(true).open(lock.as_ref()).unwrap();lockfile.unlock().unwrap_or(());std::process::exit(0)}}
pub fn run(self) -> Result<(), anyhow::Error> {let repo = if let Ok(repo) = Repository::find_root(self.repo_path.clone()) {
pub async fn run(self) -> Result<(), anyhow::Error> {let repo = if let Ok(repo) = Repository::find_root(self.repo_path.clone()).await {
pub fn run(self) -> Result<(), anyhow::Error> {let repo = Repository::find_root(self.repo_path)?;
pub async fn run(self) -> Result<(), anyhow::Error> {let repo = unsafe { Repository::find_root_immutable(self.repo_path).await? };
pub fn run(self) -> Result<(), anyhow::Error> {let repo = Repository::find_root(self.repo_path.clone())?;
pub async fn run(self) -> Result<(), anyhow::Error> {let repo = unsafe { Repository::find_root_immutable(self.repo_path.clone()).await? };