This website
use crate::permissions::Perm;
use crate::{Config, get_user_login};
use axum::{
    Router,
    extract::ws::{WebSocket, WebSocketUpgrade},
    extract::{Json, Path, State},
    response::Response,
    routing::{any, get},
};
use axum_extra::extract::SignedCookieJar;
use diesel::{
    BoolExpressionMethods, ExpressionMethods, NullableExpressionMethods, OptionalExtension,
    QueryDsl, Queryable, QueryableByName, Selectable, SelectableHelper,
};
use diesel_async::RunQueryDsl;
use futures::StreamExt;
use inotify::WatchMask;
use serde_derive::*;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
use tracing::*;

pub fn router() -> Router<Config> {
    Router::new()
        .route("/{owner}/{repo}", get(list_jobs))
        .route("/{owner}/{repo}/{job_id}", get(job))
        .route("/{owner}/{repo}/{job_id}/ws", any(ws_handler))
}

#[derive(Debug, Deserialize)]
pub struct JobPath {
    owner: String,
    repo: String,
    job_id: uuid::Uuid,
}

#[derive(Debug, Deserialize)]
pub struct JobsPath {
    owner: String,
    repo: String,
}

#[derive(Debug, Selectable, Queryable, QueryableByName, Serialize)]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(table_name = crate::db::jobs::dsl)]
struct Job {
    id: uuid::Uuid,
    started: chrono::DateTime<chrono::Utc>,
    ended: Option<chrono::DateTime<chrono::Utc>>,
    status: Option<i32>,
}

#[derive(Debug, Serialize)]
pub struct Jobs {
    login: Option<String>,
    jobs: Vec<Job>,
}

pub async fn list_jobs(
    State(config): State<Config>,
    jar: SignedCookieJar,
    Path(path): Path<JobsPath>,
) -> Result<Json<Jobs>, crate::Error> {
    let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await?;

    use crate::db::jobs::dsl as jobs;
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;

    Ok(Json(Jobs {
        login,
        jobs: repos::repositories
            .inner_join(jobs::jobs)
            .inner_join(users::users)
            .filter(users::login.eq(path.owner))
            .filter(repos::name.eq(path.repo))
            .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
                uid.unwrap_or(uuid::Uuid::nil()),
                repos::id,
                Perm::READ_JOBS.bits()
            )))
            .select(Job::as_select())
            .order_by(jobs::started.desc())
            .get_results::<Job>(&mut db)
            .await?,
    }))
}

#[derive(Debug, Serialize)]
pub struct Job_ {
    login: Option<String>,
    #[serde(flatten)]
    job: Job,
}

pub async fn job(
    State(config): State<Config>,
    jar: SignedCookieJar,
    Path(path): Path<JobPath>,
) -> Result<Json<Job_>, crate::Error> {
    let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
        (Some(a), Some(b))
    } else {
        (None, None)
    };
    let mut db = config.db.get().await?;

    use crate::db::jobs::dsl as jobs;
    use crate::db::repositories::dsl as repos;
    use crate::db::users::dsl as users;

    if let Some(job) = repos::repositories
        .inner_join(jobs::jobs)
        .inner_join(users::users)
        .filter(users::login.eq(path.owner))
        .filter(repos::name.eq(path.repo))
        .filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
            uid.unwrap_or(uuid::Uuid::nil()),
            repos::id,
            Perm::READ_JOBS.bits()
        )))
        .filter(jobs::id.eq(path.job_id))
        .select(Job::as_select())
        .order_by(jobs::started.desc())
        .get_result::<Job>(&mut db)
        .await
        .optional()?
    {
        Ok(Json(Job_ { login, job }))
    } else {
        Err(crate::Error::NotFound)
    }
}

pub async fn ws_handler(
    State(config): State<Config>,
    Path(path): Path<JobPath>,
    ws: WebSocketUpgrade,
) -> Response {
    ws.on_upgrade(move |socket| handle_socket(config, path.job_id, socket))
}

#[derive(Debug, Deserialize, Serialize)]
enum Msg<'a> {
    State {
        stdout: usize,
        stderr: usize,
    },
    Chunk {
        channel: i32,
        offset: usize,
        content: &'a str,
    },
    Status {
        ended: chrono::DateTime<chrono::Utc>,
        status: Option<i32>,
    },
}

async fn handle_socket(config: Config, id: uuid::Uuid, mut socket: WebSocket) {
    let mut remote_stdout = 0;
    let mut remote_stderr = 0;
    send_all(
        &config,
        id,
        &mut remote_stdout,
        &mut remote_stderr,
        &mut socket,
    )
    .await
    .unwrap();

    let Some(mut status) = config
        .jobs
        .lock()
        .unwrap()
        .get(&id)
        .map(|(_, _, w)| w.clone())
    else {
        debug!("closing");
        socket
            .send(
                serde_json::to_string(&Msg::Status {
                    ended: chrono::DateTime::UNIX_EPOCH,
                    status: None,
                })
                .unwrap()
                .into(),
            )
            .await
            .unwrap_or(());
        return;
    };
    let mut status_ok = true;
    let mut notify_ok = true;

    let mut notify_buffer = [0; 1024];
    let mut notify = if let Some(ref path) = config.ci.filesystem {
        let inotify = inotify::Inotify::init().expect("Error while initializing inotify instance");
        let mut w = inotify.watches();
        w.add(&path.join(&format!("{}.stdout", id)), WatchMask::MODIFY)
            .unwrap();
        w.add(&path.join(&format!("{}.stderr", id)), WatchMask::MODIFY)
            .unwrap();
        inotify.into_event_stream(&mut notify_buffer).unwrap()
    } else {
        inotify::Inotify::init()
            .unwrap()
            .into_event_stream(&mut notify_buffer)
            .unwrap()
    };

    let mut stdout = String::new();
    let mut stderr = String::new();
    while notify_ok || status_ok {
        debug!("waiting job");
        tokio::select! {
            n = notify.next(), if notify_ok => {
                debug!("{:?}", n);
                if n.is_none() {
                    notify_ok = false;
                    continue
                }
                if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
                    .await
                    .unwrap()
                {
                    socket.send(a.into()).await.unwrap_or(());
                    socket.send(b.into()).await.unwrap_or(());
                }
            }
            x = status.changed(), if status_ok => {
                debug!("status {:?}", x);
                if x.is_err() {
                    status_ok = false
                }
                let status = status.borrow_and_update().clone();
                if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
                    .await
                    .unwrap()
                {
                    socket.send(a.into()).await.unwrap_or(());
                    socket.send(b.into()).await.unwrap_or(());
                }
                debug!("status {:?}", status);
                if let Some((ended, status)) = status {
                    socket.send(serde_json::to_string(&Msg::Status {
                        ended,
                        status
                    }).unwrap().into()).await.unwrap_or(());
                } else {
                    debug!("Nothing to send");
                }
            }
            else =>{
                socket.send(serde_json::to_string(&Msg::Status {
                    ended: chrono::DateTime::UNIX_EPOCH,
                    status: None,
                }).unwrap().into()).await.unwrap_or(());
                break
            }

        }
    }
}

async fn send_all(
    config: &Config,
    id: uuid::Uuid,
    remote_stdout: &mut usize,
    remote_stderr: &mut usize,
    socket: &mut WebSocket,
) -> Result<(), crate::Error> {
    use crate::db::jobs::dsl as jobs;
    if let Some((ended, status)) = jobs::jobs
        .find(id)
        .select((jobs::ended, jobs::status))
        .get_result::<(Option<chrono::DateTime<chrono::Utc>>, Option<i32>)>(
            &mut config.db.get().await.unwrap(),
        )
        .await
        .optional()
        .unwrap()
    {
        if let Some(ended) = ended {
            socket
                .send(
                    serde_json::to_string(&Msg::Status { ended, status })
                        .unwrap()
                        .into(),
                )
                .await
                .unwrap();
        }
        let Some(ref path) = config.ci.filesystem else {
            return Ok(());
        };

        let mut outf = BufReader::new(
            tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
                .await
                .unwrap(),
        );
        outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
            .await?;

        let mut buf = String::with_capacity(8192);
        while let Ok(n) = outf.read_line(&mut buf).await {
            *remote_stdout += n;
            if buf.len() >= 4096 || n == 0 {
                socket
                    .send(
                        serde_json::to_string(&Msg::Chunk {
                            channel: 0,
                            offset: 0,
                            content: &buf,
                        })
                        .unwrap()
                        .into(),
                    )
                    .await
                    .unwrap();
                buf.clear()
            }
            if n == 0 {
                break;
            }
        }
        buf.clear();
        let mut errf = BufReader::new(
            tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
                .await
                .unwrap(),
        );
        errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
            .await?;
        while let Ok(n) = errf.read_line(&mut buf).await {
            *remote_stderr += n;
            if buf.len() >= 4096 || n == 0 {
                socket
                    .send(
                        serde_json::to_string(&Msg::Chunk {
                            channel: 1,
                            offset: 0,
                            content: &buf,
                        })
                        .unwrap()
                        .into(),
                    )
                    .await
                    .unwrap();
                buf.clear()
            }
            if n == 0 {
                break;
            }
        }
    }
    Ok(())
}

async fn send_remaining(
    config: &Config,
    id: uuid::Uuid,
    remote_stdout: &mut usize,
    remote_stderr: &mut usize,
    stdout: &mut String,
    stderr: &mut String,
) -> Result<Option<(String, String)>, crate::Error> {
    if let Some(ref path) = config.ci.filesystem {
        let mut outf = tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
            .await
            .unwrap();
        let mut errf = tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
            .await
            .unwrap();
        outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
            .await?;
        errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
            .await?;
        stdout.clear();
        stderr.clear();
        outf.read_to_string(stdout).await?;
        errf.read_to_string(stderr).await?;
        *remote_stdout += stdout.len();
        *remote_stderr += stderr.len();

        Ok(Some((
            serde_json::to_string(&Msg::Chunk {
                channel: 0,
                offset: 0,
                content: &stdout,
            })
            .unwrap(),
            serde_json::to_string(&Msg::Chunk {
                channel: 1,
                offset: 0,
                content: &stderr,
            })
            .unwrap(),
        )))
    } else {
        Ok(None)
    }
}