use crate::permissions::Perm;
use crate::{Config, get_user_login};
use axum::{
Router,
extract::ws::{WebSocket, WebSocketUpgrade},
extract::{Json, Path, State},
response::Response,
routing::{any, get},
};
use axum_extra::extract::SignedCookieJar;
use diesel::{
BoolExpressionMethods, ExpressionMethods, NullableExpressionMethods, OptionalExtension,
QueryDsl, Queryable, QueryableByName, Selectable, SelectableHelper,
};
use diesel_async::RunQueryDsl;
use futures::StreamExt;
use inotify::WatchMask;
use serde_derive::*;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
use tracing::*;
pub fn router() -> Router<Config> {
Router::new()
.route("/{owner}/{repo}", get(list_jobs))
.route("/{owner}/{repo}/{job_id}", get(job))
.route("/{owner}/{repo}/{job_id}/ws", any(ws_handler))
}
#[derive(Debug, Deserialize)]
pub struct JobPath {
owner: String,
repo: String,
job_id: uuid::Uuid,
}
#[derive(Debug, Deserialize)]
pub struct JobsPath {
owner: String,
repo: String,
}
#[derive(Debug, Selectable, Queryable, QueryableByName, Serialize)]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(table_name = crate::db::jobs::dsl)]
struct Job {
id: uuid::Uuid,
started: chrono::DateTime<chrono::Utc>,
ended: Option<chrono::DateTime<chrono::Utc>>,
status: Option<i32>,
}
#[derive(Debug, Serialize)]
pub struct Jobs {
login: Option<String>,
jobs: Vec<Job>,
}
pub async fn list_jobs(
State(config): State<Config>,
jar: SignedCookieJar,
Path(path): Path<JobsPath>,
) -> Result<Json<Jobs>, crate::Error> {
let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await?;
use crate::db::jobs::dsl as jobs;
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
Ok(Json(Jobs {
login,
jobs: repos::repositories
.inner_join(jobs::jobs)
.inner_join(users::users)
.filter(users::login.eq(path.owner))
.filter(repos::name.eq(path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::READ_JOBS.bits()
)))
.select(Job::as_select())
.order_by(jobs::started.desc())
.get_results::<Job>(&mut db)
.await?,
}))
}
#[derive(Debug, Serialize)]
pub struct Job_ {
login: Option<String>,
#[serde(flatten)]
job: Job,
}
pub async fn job(
State(config): State<Config>,
jar: SignedCookieJar,
Path(path): Path<JobPath>,
) -> Result<Json<Job_>, crate::Error> {
let (uid, login) = if let Some((a, b)) = get_user_login(&jar, &config).await? {
(Some(a), Some(b))
} else {
(None, None)
};
let mut db = config.db.get().await?;
use crate::db::jobs::dsl as jobs;
use crate::db::repositories::dsl as repos;
use crate::db::users::dsl as users;
if let Some(job) = repos::repositories
.inner_join(jobs::jobs)
.inner_join(users::users)
.filter(users::login.eq(path.owner))
.filter(repos::name.eq(path.repo))
.filter(repos::owner.nullable().eq(uid).or(crate::has_permissions!(
uid.unwrap_or(uuid::Uuid::nil()),
repos::id,
Perm::READ_JOBS.bits()
)))
.filter(jobs::id.eq(path.job_id))
.select(Job::as_select())
.order_by(jobs::started.desc())
.get_result::<Job>(&mut db)
.await
.optional()?
{
Ok(Json(Job_ { login, job }))
} else {
Err(crate::Error::NotFound)
}
}
pub async fn ws_handler(
State(config): State<Config>,
Path(path): Path<JobPath>,
ws: WebSocketUpgrade,
) -> Response {
ws.on_upgrade(move |socket| handle_socket(config, path.job_id, socket))
}
#[derive(Debug, Deserialize, Serialize)]
enum Msg<'a> {
State {
stdout: usize,
stderr: usize,
},
Chunk {
channel: i32,
offset: usize,
content: &'a str,
},
Status {
ended: chrono::DateTime<chrono::Utc>,
status: Option<i32>,
},
}
async fn handle_socket(config: Config, id: uuid::Uuid, mut socket: WebSocket) {
let mut remote_stdout = 0;
let mut remote_stderr = 0;
send_all(
&config,
id,
&mut remote_stdout,
&mut remote_stderr,
&mut socket,
)
.await
.unwrap();
let Some(mut status) = config
.jobs
.lock()
.unwrap()
.get(&id)
.map(|(_, _, w)| w.clone())
else {
debug!("closing");
socket
.send(
serde_json::to_string(&Msg::Status {
ended: chrono::DateTime::UNIX_EPOCH,
status: None,
})
.unwrap()
.into(),
)
.await
.unwrap_or(());
return;
};
let mut status_ok = true;
let mut notify_ok = true;
let mut notify_buffer = [0; 1024];
let mut notify = if let Some(ref path) = config.ci.filesystem {
let inotify = inotify::Inotify::init().expect("Error while initializing inotify instance");
let mut w = inotify.watches();
w.add(&path.join(&format!("{}.stdout", id)), WatchMask::MODIFY)
.unwrap();
w.add(&path.join(&format!("{}.stderr", id)), WatchMask::MODIFY)
.unwrap();
inotify.into_event_stream(&mut notify_buffer).unwrap()
} else {
inotify::Inotify::init()
.unwrap()
.into_event_stream(&mut notify_buffer)
.unwrap()
};
let mut stdout = String::new();
let mut stderr = String::new();
while notify_ok || status_ok {
debug!("waiting job");
tokio::select! {
n = notify.next(), if notify_ok => {
debug!("{:?}", n);
if n.is_none() {
notify_ok = false;
continue
}
if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
.await
.unwrap()
{
socket.send(a.into()).await.unwrap_or(());
socket.send(b.into()).await.unwrap_or(());
}
}
x = status.changed(), if status_ok => {
debug!("status {:?}", x);
if x.is_err() {
status_ok = false
}
let status = status.borrow_and_update().clone();
if let Some((a, b)) = send_remaining(&config, id, &mut remote_stdout, &mut remote_stderr, &mut stdout, &mut stderr)
.await
.unwrap()
{
socket.send(a.into()).await.unwrap_or(());
socket.send(b.into()).await.unwrap_or(());
}
debug!("status {:?}", status);
if let Some((ended, status)) = status {
socket.send(serde_json::to_string(&Msg::Status {
ended,
status
}).unwrap().into()).await.unwrap_or(());
} else {
debug!("Nothing to send");
}
}
else =>{
socket.send(serde_json::to_string(&Msg::Status {
ended: chrono::DateTime::UNIX_EPOCH,
status: None,
}).unwrap().into()).await.unwrap_or(());
break
}
}
}
}
async fn send_all(
config: &Config,
id: uuid::Uuid,
remote_stdout: &mut usize,
remote_stderr: &mut usize,
socket: &mut WebSocket,
) -> Result<(), crate::Error> {
use crate::db::jobs::dsl as jobs;
if let Some((ended, status)) = jobs::jobs
.find(id)
.select((jobs::ended, jobs::status))
.get_result::<(Option<chrono::DateTime<chrono::Utc>>, Option<i32>)>(
&mut config.db.get().await.unwrap(),
)
.await
.optional()
.unwrap()
{
if let Some(ended) = ended {
socket
.send(
serde_json::to_string(&Msg::Status { ended, status })
.unwrap()
.into(),
)
.await
.unwrap();
}
let Some(ref path) = config.ci.filesystem else {
return Ok(());
};
let mut outf = BufReader::new(
tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
.await
.unwrap(),
);
outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
.await?;
let mut buf = String::with_capacity(8192);
while let Ok(n) = outf.read_line(&mut buf).await {
*remote_stdout += n;
if buf.len() >= 4096 || n == 0 {
socket
.send(
serde_json::to_string(&Msg::Chunk {
channel: 0,
offset: 0,
content: &buf,
})
.unwrap()
.into(),
)
.await
.unwrap();
buf.clear()
}
if n == 0 {
break;
}
}
buf.clear();
let mut errf = BufReader::new(
tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
.await
.unwrap(),
);
errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
.await?;
while let Ok(n) = errf.read_line(&mut buf).await {
*remote_stderr += n;
if buf.len() >= 4096 || n == 0 {
socket
.send(
serde_json::to_string(&Msg::Chunk {
channel: 1,
offset: 0,
content: &buf,
})
.unwrap()
.into(),
)
.await
.unwrap();
buf.clear()
}
if n == 0 {
break;
}
}
}
Ok(())
}
async fn send_remaining(
config: &Config,
id: uuid::Uuid,
remote_stdout: &mut usize,
remote_stderr: &mut usize,
stdout: &mut String,
stderr: &mut String,
) -> Result<Option<(String, String)>, crate::Error> {
if let Some(ref path) = config.ci.filesystem {
let mut outf = tokio::fs::File::open(&path.join(&format!("{}.stdout", id)))
.await
.unwrap();
let mut errf = tokio::fs::File::open(&path.join(&format!("{}.stderr", id)))
.await
.unwrap();
outf.seek(std::io::SeekFrom::Start(*remote_stdout as u64))
.await?;
errf.seek(std::io::SeekFrom::Start(*remote_stderr as u64))
.await?;
stdout.clear();
stderr.clear();
outf.read_to_string(stdout).await?;
errf.read_to_string(stderr).await?;
*remote_stdout += stdout.len();
*remote_stderr += stderr.len();
Ok(Some((
serde_json::to_string(&Msg::Chunk {
channel: 0,
offset: 0,
content: &stdout,
})
.unwrap(),
serde_json::to_string(&Msg::Chunk {
channel: 1,
offset: 0,
content: &stderr,
})
.unwrap(),
)))
} else {
Ok(None)
}
}