use crate::repository::*;
use ::replication::Update;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use libpijul::changestore::ChangeStore;
use libpijul::fs::FsErrorC;
use libpijul::output::{FileError, OutputError};
use libpijul::pristine::sanakirja::MutTxn0;
use libpijul::pristine::sanakirja::SanakirjaError;
use libpijul::pristine::{ForkError, TreeErr, TxnErr};
use libpijul::{
ApplyError, ArcTxn, ChannelMutTxnT, ChannelRef, MutTxnT, MutTxnTExt, TxnT, TxnTExt,
UnrecordError,
};
use serde_derive::*;
use std::pin::Pin;
use thiserror::*;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tracing::*;
#[derive(Clone)]
pub struct H {
ci: crate::config_file::CiConfig,
jobs: crate::config::Jobs,
locks: RepositoryLocks,
db: crate::config::Db,
builders: std::sync::Arc<tokio::sync::Semaphore>,
}
impl H {
pub fn new(
ci: crate::config_file::CiConfig,
jobs: crate::config::Jobs,
locks: RepositoryLocks,
db: crate::config::Db,
builders: std::sync::Arc<tokio::sync::Semaphore>,
) -> Self {
H {
ci,
jobs,
locks,
db,
builders,
}
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Sk(#[from] SanakirjaError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Txn(#[from] TxnErr<SanakirjaError>),
#[error(transparent)]
Tree(#[from] TreeErr<SanakirjaError>),
#[error(transparent)]
File(#[from] FileError<crate::repository::changestore::Error, MutTxn0>),
#[error(transparent)]
Fs(#[from] FsErrorC<crate::repository::changestore::Error, MutTxn0>),
#[error(transparent)]
Fork(#[from] ForkError<SanakirjaError>),
#[error(transparent)]
Unrec(#[from] UnrecordError<crate::repository::changestore::Error, std::io::Error, MutTxn0>),
#[error(transparent)]
Apply(#[from] ApplyError<crate::repository::changestore::Error, MutTxn0>),
#[error(transparent)]
Output(#[from] OutputError<crate::repository::changestore::Error, MutTxn0, std::io::Error>),
#[error("Join error")]
Join,
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
#[error(transparent)]
Utf8(#[from] std::string::FromUtf8Error),
}
impl ::replication::Handler for H {
type Error = Error;
type F =
Pin<Box<dyn futures::Future<Output = Result<Option<Self::Error>, Self::Error>> + Send>>;
fn update(&self, is_source: bool, update: replication::Update) -> Self::F {
let s = self.clone();
Box::pin(async move {
match update {
Update::Change { .. } => {}
Update::Apply {
repo,
channel,
hash,
} => {
let repo_ = s.locks.get(&repo).await.unwrap();
match tokio::task::spawn_blocking(move || {
let pri = repo_.pristine.blocking_write();
let txn = pri.arc_txn_begin()?;
let channel_ = format!("{}_{}", repo, channel);
let mut txn_ = txn.write();
let channel_ = txn_.open_or_create_channel(&channel_)?;
let mut channel__ = channel_.write();
let result = txn_.apply_change(&repo_.changes, &mut *channel__, &hash);
match result {
Err(libpijul::ApplyError::LocalChange(
libpijul::LocalApplyError::ChangeAlreadyOnChannel { .. },
)) => {
error!(
"ignored error, apply {:?} to {:?}:{:?} = {:?}",
hash, repo, channel, result
);
return Ok(None);
}
Err(libpijul::ApplyError::LocalChange(
libpijul::LocalApplyError::DependencyMissing { hash },
)) => {
error!(
"apply {:?} to {:?}:{:?} = {:?}",
hash, repo, channel, result
);
return Ok(Some(
libpijul::ApplyError::LocalChange(
libpijul::LocalApplyError::DependencyMissing { hash },
)
.into(),
));
}
Ok(_) => {
debug!("apply {:?} to {:?}:{:?}", hash, repo, channel);
}
Err(e) => {
error!("apply {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
return Err(e.into());
}
}
txn_.touch_channel(&mut *channel__, None);
std::mem::drop(channel__);
std::mem::drop(txn_);
txn.commit()?;
Ok::<_, Error>(None)
})
.await
{
Ok(Ok(Some(x))) => return Ok(Some(x)),
Ok(Ok(None)) => {}
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(Error::Join),
}
}
Update::Eof { repo, channel } => {
if cfg!(feature = "jobs") {
let repo_ = s.locks.get(&repo).await.unwrap();
tokio::task::spawn_blocking(move || {
if let Some((temp, depl)) = {
let pri = repo_.pristine.blocking_write();
let txn = pri.arc_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let channel_ = {
let mut txn_ = txn.write();
txn_.open_or_create_channel(&channel)?
};
let changes = repo_.changes.clone();
s.output_for_deployment(&txn, &channel_, &changes)?
} {
tokio::spawn(async move {
use crate::db::jobs::dsl as jobs;
let id = diesel::insert_into(jobs::jobs)
.values((jobs::repo.eq(repo),))
.returning(jobs::id)
.get_result::<uuid::Uuid>(&mut s.db.get().await.unwrap())
.await
.unwrap();
let permit = s.builders.acquire().await.unwrap();
{
s.deploy(id, temp, depl).await?;
}
std::mem::drop(permit);
Ok::<_, Error>(())
});
}
Ok::<_, Error>(())
});
}
}
Update::Unrecord {
repo,
channel,
hash,
..
} => {
let repo_ = s.locks.get(&repo).await.unwrap();
match tokio::task::spawn_blocking(move || {
let pri = repo_.pristine.blocking_write();
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let mut channel_ = txn.open_or_create_channel(&channel)?;
let result = txn.unrecord(
&repo_.changes,
&mut channel_,
&hash,
0,
&libpijul::working_copy::sink(),
);
match result {
Err(libpijul::UnrecordError::ChangeNotInChannel { .. }) | Ok(_) => {
debug!("unrecord {:?} to {:?}:{:?}", hash, repo, channel);
}
Err(e) => {
error!("unrecord {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
return Err(e.into());
}
}
let mut in_other_channels = false;
for ch in txn.channels("")? {
if txn.get_revchanges(&ch, &hash.into())?.is_some() {
in_other_channels = true;
break;
}
}
txn.touch_channel(&mut *channel_.write(), None);
txn.commit()?;
Ok::<_, Error>(in_other_channels)
})
.await
{
Ok(Ok(false)) => {
let mut p = crate::repository::nest_changes_path(&s.locks.config, repo);
libpijul::changestore::filesystem::push_filename(&mut p, &hash);
if let Ok(meta) = tokio::fs::metadata(&p).await {
std::fs::remove_file(&p).unwrap_or(());
std::fs::remove_dir(p.parent().unwrap()).unwrap_or(());
if is_source {
crate::repository::free_used_storage(
&mut *s.db.get().await.unwrap(),
repo,
meta.len(),
)
.await
.unwrap();
}
}
}
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(Error::Join),
}
}
Update::NewChannel { repo, channel } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
txn.open_or_create_channel(&channel)?;
txn.commit()?;
}
Update::Fork { repo, channel, new } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
let chan = txn.open_or_create_channel(&channel)?;
let new = format!("{}_{}", repo, new);
match txn.fork(&chan, &new) {
Ok(_) => txn.commit()?,
Err(ForkError::ChannelNameExists(_)) => {}
Err(e) => return Err(e.into()),
}
}
Update::Prune { repo, channel } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
txn.drop_channel(&channel)?;
txn.commit()?;
}
Update::Rename { repo, channel, new } => {
let repo_ = s.locks.get(&repo).await.unwrap();
let pri = repo_.pristine.write().await;
let mut txn = pri.mut_txn_begin()?;
let channel = format!("{}_{}", repo, channel);
if let Some(mut c) = txn.load_channel(&channel)? {
let new = format!("{}_{}", repo, new);
debug!("rename {:?} to {:?}", channel, new);
txn.rename_channel(&mut c, &new)?;
txn.commit()?;
} else {
debug!("not rename");
}
}
}
Ok(None)
})
}
}
#[derive(Debug, Deserialize)]
struct Config {
deployment: Option<String>,
}
fn get_file<C: ChangeStore<Error = crate::repository::changestore::Error>>(
txn: &ArcTxn<libpijul::pristine::sanakirja::MutTxn0>,
channel: &ChannelRef<libpijul::pristine::sanakirja::MutTxn0>,
changes: &C,
path: &str,
) -> Result<Option<String>, Error> {
let txn_ = txn.read();
let channel_ = channel.read();
let (pos, is_dir) = txn_.follow_oldest_path(changes, &channel, path)?;
if is_dir {
return Ok(None);
}
let mut out = crate::repository::RawVertexBuf { out: Vec::new() };
use libpijul::ChannelTxnT;
let mut graph = libpijul::alive::retrieve(&*txn_, txn_.graph(&*channel_), pos, false)?;
let mut forward = Vec::new();
std::mem::drop(channel_);
std::mem::drop(txn_);
libpijul::alive::output_graph(changes, &txn, &channel, &mut out, &mut graph, &mut forward)
.map_err(|x| Error::File(x))?;
debug!("{:?}", out);
Ok(Some(String::from_utf8(out.out)?))
}
impl H {
fn output_for_deployment<
C: ChangeStore<Error = crate::repository::changestore::Error> + Clone + Send + Sync + 'static,
>(
&self,
txn: &ArcTxn<libpijul::pristine::sanakirja::MutTxn0>,
channel: &ChannelRef<libpijul::pristine::sanakirja::MutTxn0>,
changes: &C,
) -> Result<Option<(tempfile::TempDir, String)>, Error> {
if let Some(config) = get_file(txn, channel, changes, "pijul.toml")? {
debug!("config = {:?}", config);
if let Ok(parsed) = toml::from_str::<Config>(&config) {
debug!("{:?}", parsed);
if let Some(depl) = parsed.deployment {
let txn = txn.clone();
let channel = channel.clone();
let changes = changes.clone();
let tmp_dir = tempfile::tempdir().unwrap();
let wc =
libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path());
libpijul::output::output_repository_no_pending(
&wc, &changes, &txn, &channel, "", true, None, 1, 0,
)
.unwrap();
return Ok(Some((tmp_dir, depl)));
}
}
}
Ok(None)
}
async fn deploy(
&self,
id: uuid::Uuid,
tmp_dir: tempfile::TempDir,
depl: String,
) -> Result<(), Error> {
use std::process::Stdio;
let (status_tx, status_rx) = tokio::sync::watch::channel(None);
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
let p = tmp_dir.path().join(depl);
debug!("launching {:?}", p);
let mut cmd = tokio::process::Command::new(p)
.current_dir(tmp_dir.path())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.stdin(Stdio::null())
.spawn()
.unwrap();
use tokio::io::AsyncBufReadExt;
let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());
let mut stdout = stdout.lines();
let mut stdout_ok = true;
let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());
let mut stderr = stderr.lines();
self.jobs
.lock()
.unwrap()
.insert(id, (kill_tx, status_tx.clone(), status_rx));
let mut stderr_ok = true;
let mut buf_stdout = String::new();
let mut last_stdout = std::time::UNIX_EPOCH;
let mut buf_stderr = String::new();
let mut last_stderr = std::time::UNIX_EPOCH;
let bound = std::time::Duration::from_secs(1);
let mut files = if let Some(ref path) = self.ci.filesystem {
Some((
OpenOptions::new()
.append(true)
.create(true)
.open(&path.join(&format!("{}.stdout", id)))
.await
.unwrap(),
OpenOptions::new()
.append(true)
.create(true)
.open(&path.join(&format!("{}.stderr", id)))
.await
.unwrap(),
))
} else {
None
};
while stdout_ok || stderr_ok {
debug!(
"stdout || stderr {:?} {:?}",
buf_stdout.len(),
buf_stderr.len()
);
tokio::select! {
line = stdout.next_line(), if stdout_ok => {
let n = if let Some(line) = line? {
buf_stdout.push_str(&line);
buf_stdout.push('\n');
line.len() + 1
} else {
0
};
if last_stdout.elapsed().unwrap() >= bound || n == 0 {
debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());
if let Some((ref mut stdout, _)) = files {
stdout.write_all(buf_stdout.as_bytes()).await?;
stdout.flush().await?;
}
buf_stdout.clear();
debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());
last_stdout = std::time::SystemTime::now();
}
if n == 0 {
stdout_ok = false
}
}
line = stderr.next_line(), if stderr_ok => {
let n = if let Some(line) = line? {
buf_stderr.push_str(&line);
buf_stderr.push('\n');
line.len() + 1
} else {
0
};
if last_stderr.elapsed().unwrap() >= bound || n == 0 {
debug!("sending stderr to db {:?}", buf_stderr.len());
if let Some((_, ref mut stderr)) = files {
stderr.write_all(buf_stderr.as_bytes()).await?;
stderr.flush().await?;
}
buf_stderr.clear();
last_stderr = std::time::SystemTime::now();
}
debug!("{:?}", buf_stderr.len());
if n == 0 {
stderr_ok = false
}
}
}
}
let status = tokio::select! {
status = cmd.wait() => {
status?.code()
}
_ = kill_rx => {
cmd.kill().await?;
None
}
};
debug!("process exited with {:?}", status);
debug!("stderr {}", buf_stderr);
debug!("stdout {}", buf_stdout);
let now = chrono::Utc::now();
use crate::db::jobs::dsl as jobs;
diesel::update(jobs::jobs.find(id))
.set((jobs::status.eq(status), jobs::ended.eq(&now)))
.execute(&mut self.db.get().await.unwrap())
.await?;
status_tx.send(Some((now, status))).unwrap();
self.jobs.lock().unwrap().remove(&id);
Ok::<_, Error>(())
}
}