This website
use crate::repository::*;
use ::replication::Update;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use libpijul::changestore::ChangeStore;
use libpijul::fs::FsErrorC;
use libpijul::output::{FileError, OutputError};
use libpijul::pristine::sanakirja::MutTxn0;
use libpijul::pristine::sanakirja::SanakirjaError;
use libpijul::pristine::{ForkError, TreeErr, TxnErr};
use libpijul::{
    ApplyError, ArcTxn, ChannelMutTxnT, ChannelRef, MutTxnT, MutTxnTExt, TxnT, TxnTExt,
    UnrecordError,
};
use serde_derive::*;
use std::pin::Pin;
use thiserror::*;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tracing::*;

#[derive(Clone)]
pub struct H {
    ci: crate::config_file::CiConfig,
    jobs: crate::config::Jobs,
    locks: RepositoryLocks,
    db: crate::config::Db,
    builders: std::sync::Arc<tokio::sync::Semaphore>,
}

impl H {
    pub fn new(
        ci: crate::config_file::CiConfig,
        jobs: crate::config::Jobs,
        locks: RepositoryLocks,
        db: crate::config::Db,
        builders: std::sync::Arc<tokio::sync::Semaphore>,
    ) -> Self {
        H {
            ci,
            jobs,
            locks,
            db,
            builders,
        }
    }
}

#[derive(Debug, Error)]
pub enum Error {
    #[error(transparent)]
    Sk(#[from] SanakirjaError),
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Txn(#[from] TxnErr<SanakirjaError>),
    #[error(transparent)]
    Tree(#[from] TreeErr<SanakirjaError>),
    #[error(transparent)]
    File(#[from] FileError<crate::repository::changestore::Error, MutTxn0>),
    #[error(transparent)]
    Fs(#[from] FsErrorC<crate::repository::changestore::Error, MutTxn0>),
    #[error(transparent)]
    Fork(#[from] ForkError<SanakirjaError>),
    #[error(transparent)]
    Unrec(#[from] UnrecordError<crate::repository::changestore::Error, std::io::Error, MutTxn0>),
    #[error(transparent)]
    Apply(#[from] ApplyError<crate::repository::changestore::Error, MutTxn0>),
    #[error(transparent)]
    Output(#[from] OutputError<crate::repository::changestore::Error, MutTxn0, std::io::Error>),
    #[error("Join error")]
    Join,
    #[error(transparent)]
    Diesel(#[from] diesel::result::Error),
    #[error(transparent)]
    Utf8(#[from] std::string::FromUtf8Error),
}

impl ::replication::Handler for H {
    type Error = Error;
    type F =
        Pin<Box<dyn futures::Future<Output = Result<Option<Self::Error>, Self::Error>> + Send>>;

    fn update(&self, is_source: bool, update: replication::Update) -> Self::F {
        let s = self.clone();
        Box::pin(async move {
            match update {
                Update::Change { .. } => {}
                Update::Apply {
                    repo,
                    channel,
                    hash,
                } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    match tokio::task::spawn_blocking(move || {
                        let pri = repo_.pristine.blocking_write();
                        let txn = pri.arc_txn_begin()?;
                        let channel_ = format!("{}_{}", repo, channel);
                        let mut txn_ = txn.write();
                        let channel_ = txn_.open_or_create_channel(&channel_)?;
                        let mut channel__ = channel_.write();
                        let result = txn_.apply_change(&repo_.changes, &mut *channel__, &hash);
                        match result {
                            Err(libpijul::ApplyError::LocalChange(
                                libpijul::LocalApplyError::ChangeAlreadyOnChannel { .. },
                            )) => {
                                error!(
                                    "ignored error, apply {:?} to {:?}:{:?} = {:?}",
                                    hash, repo, channel, result
                                );
                                return Ok(None);
                            }
                            Err(libpijul::ApplyError::LocalChange(
                                libpijul::LocalApplyError::DependencyMissing { hash },
                            )) => {
                                error!(
                                    "apply {:?} to {:?}:{:?} = {:?}",
                                    hash, repo, channel, result
                                );
                                return Ok(Some(
                                    libpijul::ApplyError::LocalChange(
                                        libpijul::LocalApplyError::DependencyMissing { hash },
                                    )
                                    .into(),
                                ));
                            }
                            Ok(_) => {
                                debug!("apply {:?} to {:?}:{:?}", hash, repo, channel);
                            }
                            Err(e) => {
                                error!("apply {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
                                return Err(e.into());
                            }
                        }
                        txn_.touch_channel(&mut *channel__, None);
                        std::mem::drop(channel__);
                        std::mem::drop(txn_);
                        txn.commit()?;
                        Ok::<_, Error>(None)
                    })
                    .await
                    {
                        Ok(Ok(Some(x))) => return Ok(Some(x)),
                        Ok(Ok(None)) => {}
                        Ok(Err(e)) => return Err(e.into()),
                        Err(_) => return Err(Error::Join),
                    }
                }
                Update::Eof { repo, channel } => {
                    if cfg!(feature = "jobs") {
                        let repo_ = s.locks.get(&repo).await.unwrap();
                        tokio::task::spawn_blocking(move || {
                            if let Some((temp, depl)) = {
                                let pri = repo_.pristine.blocking_write();
                                let txn = pri.arc_txn_begin()?;
                                let channel = format!("{}_{}", repo, channel);
                                let channel_ = {
                                    let mut txn_ = txn.write();
                                    txn_.open_or_create_channel(&channel)?
                                };
                                let changes = repo_.changes.clone();

                                s.output_for_deployment(&txn, &channel_, &changes)?
                            } {
                                tokio::spawn(async move {
                                    use crate::db::jobs::dsl as jobs;
                                    let id = diesel::insert_into(jobs::jobs)
                                        .values((jobs::repo.eq(repo),))
                                        .returning(jobs::id)
                                        .get_result::<uuid::Uuid>(&mut s.db.get().await.unwrap())
                                        .await
                                        .unwrap();

                                    let permit = s.builders.acquire().await.unwrap();
                                    {
                                        s.deploy(id, temp, depl).await?;
                                    }
                                    std::mem::drop(permit);
                                    Ok::<_, Error>(())
                                });
                            }
                            Ok::<_, Error>(())
                        });
                    }
                }
                Update::Unrecord {
                    repo,
                    channel,
                    hash,
                    ..
                } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    match tokio::task::spawn_blocking(move || {
                        let pri = repo_.pristine.blocking_write();
                        let mut txn = pri.mut_txn_begin()?;
                        let channel = format!("{}_{}", repo, channel);
                        let mut channel_ = txn.open_or_create_channel(&channel)?;
                        let result = txn.unrecord(
                            &repo_.changes,
                            &mut channel_,
                            &hash,
                            0,
                            &libpijul::working_copy::sink(),
                        );
                        match result {
                            Err(libpijul::UnrecordError::ChangeNotInChannel { .. }) | Ok(_) => {
                                debug!("unrecord {:?} to {:?}:{:?}", hash, repo, channel);
                            }
                            Err(e) => {
                                error!("unrecord {:?} to {:?}:{:?} = {:?}", hash, repo, channel, e);
                                return Err(e.into());
                            }
                        }

                        let mut in_other_channels = false;
                        for ch in txn.channels("")? {
                            if txn.get_revchanges(&ch, &hash.into())?.is_some() {
                                in_other_channels = true;
                                break;
                            }
                        }
                        txn.touch_channel(&mut *channel_.write(), None);
                        txn.commit()?;
                        Ok::<_, Error>(in_other_channels)
                    })
                    .await
                    {
                        Ok(Ok(false)) => {
                            let mut p = crate::repository::nest_changes_path(&s.locks.config, repo);
                            libpijul::changestore::filesystem::push_filename(&mut p, &hash);
                            if let Ok(meta) = tokio::fs::metadata(&p).await {
                                std::fs::remove_file(&p).unwrap_or(());
                                std::fs::remove_dir(p.parent().unwrap()).unwrap_or(());
                                if is_source {
                                    crate::repository::free_used_storage(
                                        &mut *s.db.get().await.unwrap(),
                                        repo,
                                        meta.len(),
                                    )
                                    .await
                                    .unwrap();
                                }
                            }
                        }
                        Ok(Ok(_)) => {}
                        Ok(Err(e)) => return Err(e.into()),
                        Err(_) => return Err(Error::Join),
                    }
                }
                Update::NewChannel { repo, channel } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    txn.open_or_create_channel(&channel)?;
                    txn.commit()?;
                }
                Update::Fork { repo, channel, new } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    let chan = txn.open_or_create_channel(&channel)?;
                    let new = format!("{}_{}", repo, new);
                    match txn.fork(&chan, &new) {
                        Ok(_) => txn.commit()?,
                        Err(ForkError::ChannelNameExists(_)) => {}
                        Err(e) => return Err(e.into()),
                    }
                }
                Update::Prune { repo, channel } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    txn.drop_channel(&channel)?;
                    txn.commit()?;
                }
                Update::Rename { repo, channel, new } => {
                    let repo_ = s.locks.get(&repo).await.unwrap();
                    let pri = repo_.pristine.write().await;
                    let mut txn = pri.mut_txn_begin()?;
                    let channel = format!("{}_{}", repo, channel);
                    if let Some(mut c) = txn.load_channel(&channel)? {
                        let new = format!("{}_{}", repo, new);
                        debug!("rename {:?} to {:?}", channel, new);
                        txn.rename_channel(&mut c, &new)?;
                        txn.commit()?;
                    } else {
                        debug!("not rename");
                    }
                }
            }
            Ok(None)
        })
    }
}

#[derive(Debug, Deserialize)]
struct Config {
    deployment: Option<String>,
}

fn get_file<C: ChangeStore<Error = crate::repository::changestore::Error>>(
    txn: &ArcTxn<libpijul::pristine::sanakirja::MutTxn0>,
    channel: &ChannelRef<libpijul::pristine::sanakirja::MutTxn0>,
    changes: &C,
    path: &str,
) -> Result<Option<String>, Error> {
    let txn_ = txn.read();
    let channel_ = channel.read();
    let (pos, is_dir) = txn_.follow_oldest_path(changes, &channel, path)?;
    if is_dir {
        return Ok(None);
    }
    let mut out = crate::repository::RawVertexBuf { out: Vec::new() };

    use libpijul::ChannelTxnT;
    let mut graph = libpijul::alive::retrieve(&*txn_, txn_.graph(&*channel_), pos, false)?;

    let mut forward = Vec::new();
    std::mem::drop(channel_);
    std::mem::drop(txn_);
    libpijul::alive::output_graph(changes, &txn, &channel, &mut out, &mut graph, &mut forward)
        .map_err(|x| Error::File(x))?;
    debug!("{:?}", out);
    Ok(Some(String::from_utf8(out.out)?))
}

impl H {
    fn output_for_deployment<
        C: ChangeStore<Error = crate::repository::changestore::Error> + Clone + Send + Sync + 'static,
    >(
        &self,
        txn: &ArcTxn<libpijul::pristine::sanakirja::MutTxn0>,
        channel: &ChannelRef<libpijul::pristine::sanakirja::MutTxn0>,
        changes: &C,
    ) -> Result<Option<(tempfile::TempDir, String)>, Error> {
        if let Some(config) = get_file(txn, channel, changes, "pijul.toml")? {
            debug!("config = {:?}", config);

            if let Ok(parsed) = toml::from_str::<Config>(&config) {
                debug!("{:?}", parsed);
                if let Some(depl) = parsed.deployment {
                    let txn = txn.clone();
                    let channel = channel.clone();
                    let changes = changes.clone();
                    let tmp_dir = tempfile::tempdir().unwrap();
                    let wc =
                        libpijul::working_copy::filesystem::FileSystem::from_root(tmp_dir.path());
                    libpijul::output::output_repository_no_pending(
                        &wc, &changes, &txn, &channel, "", true, None, 1, 0,
                    )
                    .unwrap();
                    return Ok(Some((tmp_dir, depl)));
                }
            }
        }
        Ok(None)
    }

    async fn deploy(
        &self,
        id: uuid::Uuid,
        tmp_dir: tempfile::TempDir,
        depl: String,
    ) -> Result<(), Error> {
        use std::process::Stdio;
        let (status_tx, status_rx) = tokio::sync::watch::channel(None);
        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
        let p = tmp_dir.path().join(depl);
        debug!("launching {:?}", p);

        let mut cmd = tokio::process::Command::new(p)
            .current_dir(tmp_dir.path())
            .stderr(Stdio::piped())
            .stdout(Stdio::piped())
            .stdin(Stdio::null())
            .spawn()
            .unwrap();

        use tokio::io::AsyncBufReadExt;
        let stdout = tokio::io::BufReader::new(cmd.stdout.take().unwrap());
        let mut stdout = stdout.lines();
        let mut stdout_ok = true;
        let stderr = tokio::io::BufReader::new(cmd.stderr.take().unwrap());
        let mut stderr = stderr.lines();

        self.jobs
            .lock()
            .unwrap()
            .insert(id, (kill_tx, status_tx.clone(), status_rx));

        let mut stderr_ok = true;
        let mut buf_stdout = String::new();
        let mut last_stdout = std::time::UNIX_EPOCH;
        let mut buf_stderr = String::new();
        let mut last_stderr = std::time::UNIX_EPOCH;
        let bound = std::time::Duration::from_secs(1);
        let mut files = if let Some(ref path) = self.ci.filesystem {
            Some((
                OpenOptions::new()
                    .append(true)
                    .create(true)
                    .open(&path.join(&format!("{}.stdout", id)))
                    .await
                    .unwrap(),
                OpenOptions::new()
                    .append(true)
                    .create(true)
                    .open(&path.join(&format!("{}.stderr", id)))
                    .await
                    .unwrap(),
            ))
        } else {
            None
        };
        while stdout_ok || stderr_ok {
            debug!(
                "stdout || stderr {:?} {:?}",
                buf_stdout.len(),
                buf_stderr.len()
            );
            tokio::select! {
                line = stdout.next_line(), if stdout_ok => {
                    let n = if let Some(line) = line? {
                        buf_stdout.push_str(&line);
                        buf_stdout.push('\n');
                        line.len() + 1
                    } else {
                        0
                    };
                    if last_stdout.elapsed().unwrap() >= bound  || n == 0 {
                        debug!("sending stdout to db {:?} {:?}", buf_stdout.len(), buf_stderr.len());

                        if let Some((ref mut stdout, _)) = files {
                            stdout.write_all(buf_stdout.as_bytes()).await?;
                            stdout.flush().await?;
                        }
                        buf_stdout.clear();
                        debug!("stdout/stderr {:?} {:?}", buf_stdout.len(), buf_stderr.len());
                        last_stdout = std::time::SystemTime::now();
                    }
                    if n == 0 {
                        stdout_ok = false
                    }
                }
                line = stderr.next_line(), if stderr_ok => {
                    let n = if let Some(line) = line? {
                        buf_stderr.push_str(&line);
                        buf_stderr.push('\n');
                        line.len() + 1
                    } else {
                        0
                    };
                    if last_stderr.elapsed().unwrap() >= bound || n == 0 {
                        debug!("sending stderr to db {:?}", buf_stderr.len());
                        if let Some((_, ref mut stderr)) = files {
                            stderr.write_all(buf_stderr.as_bytes()).await?;
                            stderr.flush().await?;
                        }
                        buf_stderr.clear();
                        last_stderr = std::time::SystemTime::now();
                    }
                    debug!("{:?}", buf_stderr.len());
                    if n == 0 {
                        stderr_ok = false
                    }
                }
            }
        }

        let status = tokio::select! {
            status = cmd.wait() => {
                status?.code()
            }
            _ = kill_rx => {
                cmd.kill().await?;
                None
            }
        };
        debug!("process exited with {:?}", status);
        debug!("stderr {}", buf_stderr);
        debug!("stdout {}", buf_stdout);
        let now = chrono::Utc::now();
        use crate::db::jobs::dsl as jobs;
        diesel::update(jobs::jobs.find(id))
            .set((jobs::status.eq(status), jobs::ended.eq(&now)))
            .execute(&mut self.db.get().await.unwrap())
            .await?;
        status_tx.send(Some((now, status))).unwrap();
        self.jobs.lock().unwrap().remove(&id);
        Ok::<_, Error>(())
    }
}