K7NUNACPGSZU4TMH5OIPO74ANZZK2P2VVZMPJ46Q3NTZJIU4H6NAC
AE4PHS6EEAG647L3MTTR6IKZJZGRTP6TGOUEBSTUYKKDFIOPF47QC
WHBCEBCDWG6ZK57VRW2IPOVRR4AYOMNPAAVKAFXGRT7QEVQUBJ4AC
MGOB3WXKVDFPE5PPKWC4EEST2CR4NQ7B2FASVYHO5JNNGXC5NNPAC
FEHWT3MOW3STJXNBQWXSY6ZDSJIJWVU6EWCXA6KARH7DCYFNM5NQC
SM5TELBWYEM67PHIK2RSGJ3E4TCUT2XLVEZIJGM2GEI45RHSBH6AC
AW3IHN2SW4JYIT4Y26Q45BLD5RFJ3POVSTECR4GQ3ZQVB463EZVQC
VR6XFOE7TKQQYWIE26QTRB2XWYCZTNBWZ32SB43M2JVOW6EN5KJQC
LILEG6KRUBQN7AIFHDK5N3X4JPBOLK2ATIO5JGRU3AEG5CCIWBNAC
G4U6J7TOHDXUL6ECIGJUSTQPURB54PC4VIAW2UGXDJPPFQLR64LQC
P35HCXDOT4NKKPTM6OMKEVKBLB3VLEODS7LNKRG7777RLOFDGU2AC
# pijul
.ignore
.pijul
let play ~audio t =
let p, u = Promise.create () in
Eio.Stream.add t.evs @@ Req_play (audio, Promise.resolve u);
Promise.await_exn p
let stop t =
let p, u = Promise.create () in
Eio.Stream.add t.evs @@ Req_stop (Promise.resolve u);
Promise.await_exn p
module Parser = struct
let frame ~sample_p ~channels ~size ~kind =
let buf = Bigarray.Array1.create kind Bigarray.c_layout (channels * size) in
let p =
let open Angstrom in
let sample = count channels sample_p in
let rec samples ?(i = 0) () =
end_of_input >>| (fun () -> `eof) <|> (sample >>| fun s -> `sample s)
>>= function
| `eof -> return ()
| `sample chans ->
List.iteri (fun ci c -> buf.{i + ci} <- c) chans;
samples ~i:(i + channels) ()
in
samples () >>| fun () -> buf
in
fun buf ->
Angstrom.parse_bigstring ~consume:Angstrom.Consume.All p buf |> function
| Ok frame -> frame
| Error str -> failwith @@ "error parsing raw audio frame " ^ str
exception Invalid_input of string
let s16le ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.LE.any_int16 ~channels ~size:frame_size
~kind:Bigarray.int16_signed
module Parser = struct
let frame ~sample_p ~channels ~size ~kind =
let buf = Bigarray.Array1.create kind Bigarray.c_layout (channels * size) in
let p =
let open Angstrom in
let sample = count channels sample_p in
let rec samples ?(i = 0) () =
end_of_input >>| (fun () -> `eof) <|> (sample >>| fun s -> `sample s)
>>= function
| `eof -> return ()
| `sample chans ->
List.iteri (fun ci c -> buf.{i + ci} <- c) chans;
samples ~i:(i + channels) ()
in
samples () >>| fun () -> buf
in
fun buf ->
Angstrom.parse_bigstring ~consume:Angstrom.Consume.All p buf |> function
| Ok frame -> frame
| Error str -> failwith @@ "error parsing raw audio frame " ^ str
let s16be ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.BE.any_int16 ~channels ~size:frame_size
~kind:Bigarray.int16_signed
let s16le ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.LE.any_int16 ~channels ~size:frame_size
~kind:Bigarray.int16_signed
let f32le ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.LE.any_float ~channels ~size:frame_size
~kind:Bigarray.float32
let s16be ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.BE.any_int16 ~channels ~size:frame_size
~kind:Bigarray.int16_signed
let f32be ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.BE.any_float ~channels ~size:frame_size
~kind:Bigarray.float32
let f32le ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.LE.any_float ~channels ~size:frame_size
~kind:Bigarray.float32
let f64le ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.LE.any_double ~channels ~size:frame_size
~kind:Bigarray.float64
let f32be ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.BE.any_float ~channels ~size:frame_size
~kind:Bigarray.float32
let f64be ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.BE.any_double ~channels ~size:frame_size
~kind:Bigarray.float64
end
let f64le ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.LE.any_double ~channels ~size:frame_size
~kind:Bigarray.float64
let pcm_args =
[
"-nostdin";
"-analyzeduration";
"0";
"-f";
"s16le";
"-ar";
"48000";
"-ac";
"2";
"-flush_packets";
"1";
]
let f64be ?(channels = 2) ?(frame_size = 48000) =
frame ~sample_p:Angstrom.BE.any_double ~channels ~size:frame_size
~kind:Bigarray.float64
end
let frame_reader () =
let buf = Bigstringaf.create (Rtp._FRAME_SIZE * Rtp._CHANNELS * 2) in
let blen = Bigstringaf.length buf in
let parse_frame =
let parser =
Parser.s16le ~channels:Rtp._CHANNELS ~frame_size:Rtp._FRAME_SIZE
in
fun () -> parser buf
in
fun ic ->
Lwt_io.read_into_exactly_bigstring ic buf 0 blen |> Lwt_result.catch
>|= function
| Ok () -> `Frame (parse_frame ())
| Error End_of_file -> `Eof
| Error exn -> `Exn exn
let writer = function
| `Stream s ->
let f oc =
Lwt_pipe.read s >>= function
| Some d -> (
Lwt_io.write_from_exactly_bigstring oc d 0 (Bigstringaf.length d)
|> Lwt_result.catch
>|= function
| Ok () -> true
| _ -> false)
| None -> Lwt_io.close oc >|= Fun.const false
in
f
| _ -> fun oc -> Lwt_io.close oc >|= Fun.const false
let close_input = function `Stream s -> Lwt_pipe.close_nonblock s | _ -> ()
let create input =
let cmd = "ffmpeg" in
let i =
match input with
| `Stream _ -> [ "-i"; "-" ]
| `File path -> [ "-i"; path ]
| `Url url -> [ "-i"; url ]
in
let args = List.concat [ i; pcm_args; stdout_args ] in
Switch.run @@ fun sw ->
let cmd_str = cmd :: args |> List.to_string ~sep:" " Fun.id in
L.debug (fun m -> m "running cmd: %s" cmd_str);
let init () = Eio_proc.spawn ~sw in
let stop proc = Eio_proc.close proc |> ignore in
let pull proc = None in
let p = Lwt_pipe.create () in
let p_p, u_p = Lwt.wait () in
let spawn () =
let cmd_str = cmd :: args |> List.to_string ~sep:" " Fun.id in
L.debug (fun m -> m "running cmd: %s" cmd_str);
Lwt_process.with_process_full
("", Array.of_list @@ (cmd :: args))
(fun proc ->
L.debug (fun m -> m "ffmpeg running with pid=%d" proc#pid);
let read_logs () =
Lwt_io.read ~count:0x100 proc#stderr >|= fun s ->
`Logs (String.length s = 0)
in
let read_frame () = frame_reader () proc#stdout in
let write () = writer input proc#stdin >|= fun r -> `Write r in
let rec poll' q =
Lwt.nchoose_split q >>= function
| [], [] -> Lwt_result.return ()
| rs, ps -> (
match handle ~out:ps rs with
| Ok [] -> Lwt_result.return ()
| Ok q -> poll' q
| Error _ as e -> Lwt.return e)
and handle ?(out = []) = function
| [] -> Ok out
| (`Write false | `Eof | `Logs true) :: xs -> handle ~out xs
| `Logs false :: xs -> handle ~out:(read_logs () :: out) xs
| `Frame frame :: xs ->
if Lwt.is_sleeping p_p then Lwt.wakeup_later u_p (Ok p);
let fwd = Lwt_pipe.write p frame >|= fun r -> `Fwd r in
handle ~out:(fwd :: out) xs
| `Fwd true :: xs ->
let r = read_frame () in
handle ~out:(r :: out) xs
| `Fwd false :: _ ->
L.warn (fun m -> m "receiving end of pipe closed, cleaning up...");
List.iter Lwt.cancel out;
let clean =
Lwt_io.abort proc#stdin >>= fun () ->
Lwt_io.abort proc#stdout >>= fun () ->
Lwt_io.abort proc#stderr >|= fun () ->
proc#terminate;
`Eof
in
Ok [ clean ]
| `Write true :: xs ->
let w = write () in
handle ~out:(w :: out) xs
| (`Exn _ as e) :: _ ->
List.iter Lwt.cancel out;
L.err (fun m -> m "boop %a" Error.pp e);
Error e
in
let stdout_args = [ "pipe:" ]
let res =
let open Lwt_result.Syntax in
let* () =
poll' [ read_frame (); write () ] >>= function
| Error _ as e ->
Lwt_pipe.close_nonblock p;
close_input input;
proc#close >|= Fun.const e
| Ok () as o -> Lwt.return o
in
L.info (fun m -> m "closing the process");
proc#close >|= function
| Unix.WEXITED 0 when Lwt.is_sleeping p_p ->
Error.msg "0 byte stream?"
| Unix.WEXITED 0 -> Ok ()
| Unix.WEXITED n -> Error.msgf "non 0 status code: %d" n
| Unix.WSIGNALED n | Unix.WSTOPPED n ->
L.warn (fun m -> m "stopped with code: %d" n);
Ok ()
in
let frame_reader () =
let buf = Bigstringaf.create (Rtp._FRAME_SIZE * Rtp._CHANNELS * 2) in
let c_buf = Cstruct.of_bigarray buf in
let parse_frame =
let parser =
Parser.s16le ~channels:Rtp._CHANNELS ~frame_size:Rtp._FRAME_SIZE
in
fun () -> parser buf
in
fun flow ->
match Eio.Flow.read_exact flow c_buf with
| () -> Some (parse_frame ())
| exception End_of_file -> None
res >|= fun r ->
L.err (fun m -> m "oyy m8");
match (Lwt.is_sleeping p_p, r) with
| true, (Error _ as e) -> Lwt.wakeup_later u_p e
| true, Ok () -> failwith "unreachable"
| false, Error e -> L.err (fun m -> m "%s" (Error.to_string e))
| false, Ok () -> ())
in
let k =
Lwt.catch spawn (fun e ->
L.err (fun m -> m "spawn error: %a" Error.pp (`Exn e));
Lwt.return_unit)
in
Lwt_pipe.keep p k;
Lwt.on_termination k (fun () ->
L.err (fun m -> m "HALP");
Lwt_pipe.close_nonblock p);
p_p |> Lwt_result.map of_pipe *)
let process ~sw input =
(* TODO @quartz55: better handle this for use cases like nix *)
let cmd = "ffmpeg" in
let i =
match input with
| `Stream _ -> [ "-analyzeduration"; "0"; "-i"; "-" ]
| `File path -> [ "-analyzeduration"; "-i"; path ]
| `Url url -> [ "-i"; url ]
in
let args = List.concat [ i; pcm_args; stdout_args ] in
let p, u = Promise.create () in
let spawn () =
Switch.run @@ fun sw ->
let cmd_str = cmd :: args |> List.to_string ~sep:" " Fun.id in
L.debug (fun m -> m "running cmd: %s" cmd_str);
let proc = Eio_proc.spawn ~sw cmd ~args in
L.debug (fun m -> m "ffmpeg running with pid=%d" (Eio_proc.pid proc));
let stdin, stderr, stdout =
Eio_proc.(stdin proc, stderr proc, stdout proc)
in
let read = frame_reader () in
let logs_thread =
Fiber.fork_promise ~sw @@ fun () ->
let logs_b = Buffer.create (80 * 40) in
let logs = Eio.Flow.buffer_sink logs_b in
Eio.Flow.copy stderr logs;
Buffer.to_bytes logs_b |> Bytes.unsafe_to_string
in
let cleanup () =
let status = Eio_proc.close proc in
match Promise.await_exn logs_thread with
| logs -> (status, logs)
| exception exc ->
let reason = Printexc.to_string exc in
L.warn (fun m -> m "couldn't get ffmpeg logs: %s" reason);
(status, Format.sprintf "<no logs>: reason: %s" reason)
in
let write_thread () =
match input with
| `Stream flow -> Eio.Flow.copy flow stdin
| _ -> Eio.Flow.close stdin
in
Fiber.fork ~sw write_thread;
(* try to read at least one frame to check if valid input *)
match read stdout with
| Some f ->
let done_p, done_u = Promise.create () in
let init () = true in
let pull first =
(* TODO error handling *)
if first then Some (f, false)
else read stdout |> Option.map (fun f -> (f, false))
in
let stop _ =
let _ = cleanup () in
if not @@ Promise.is_resolved done_p then Promise.resolve done_u ()
in
let src = Streaming.Source.make ~init ~pull ~stop () in
Promise.resolve_ok u src;
Promise.await done_p;
raise Exit
| None ->
let status, logs = cleanup () in
let status_str =
match status with
| Unix.WEXITED n -> "exited " ^ string_of_int n
| Unix.WSIGNALED n -> "signaled " ^ string_of_int n
| Unix.WSTOPPED n -> "stopped " ^ string_of_int n
in
let exc =
Invalid_input (Format.sprintf "status: %s logs: %s" status_str logs)
in
Promise.resolve_error u exc
| exception exc ->
let status, logs = cleanup () in
let status_str =
match status with
| Unix.WEXITED n -> "exited " ^ string_of_int n
| Unix.WSIGNALED n -> "signaled " ^ string_of_int n
| Unix.WSTOPPED n -> "stopped " ^ string_of_int n
in
let exc =
Invalid_input
(Format.sprintf "exc: %s status: %s logs: %s"
(Printexc.to_string exc) status_str logs)
in
Promise.resolve_error u exc
in
Fiber.fork ~sw (fun () -> try spawn () with Exit -> ());
Promise.await_exn p |> Audio_stream.of_raw_s16
(* module Make (C : Mirage_clock.MCLOCK) = struct
module Platform (M : Alcotest_engine.Monad.S) = struct
let time () = Duration.to_f @@ C.elapsed_ns ()
let getcwd () = ""
let stdout_isatty () = true
let stdout_columns () = None
let setup_std_outputs ?style_renderer:_ ?utf_8:_ () = ()
(* Pre-4.07 doesn't support empty variant types. *)
type file_descriptor = { empty : 'a. 'a }
let log_trap_supported = false
let prepare_log_trap ~root:_ = assert false
let file_exists _ = assert false
let open_write_only _ = assert false
let close = function (fd : file_descriptor) -> fd.empty
let with_redirect = function (fd : file_descriptor) -> fd.empty
let home_directory () =
Error (`Msg "Home directory not available for the MirageOS platform")
end
module Tester = Alcotest_engine.V1.Cli.Make (Platform) (Lwt)
include Tester
let test_case_sync n s f = test_case n s (fun x -> Lwt.return (f x))
let run_test fn args =
let async_ex, async_waker = Lwt.wait () in
let handle_exn ex =
Logs.debug (fun f -> f "Uncaught async exception: %a" Fmt.exn ex);
if Lwt.state async_ex = Lwt.Sleep then Lwt.wakeup_exn async_waker ex
in
Lwt.async_exception_hook := handle_exn;
Lwt_switch.with_switch (fun sw -> Lwt.pick [ fn sw args; async_ex ])
let test_case n s f = test_case n s (run_test f)
end *)
(* The tests
let test_lowercase () =
Alcotest.(check string) "same string" "hello!" (To_test.lowercase "hELLO!")
let test_capitalize () =
Alcotest.(check string) "same string" "World." (To_test.capitalize "world.")
let test_str_concat () =
Alcotest.(check string) "same string" "foobar" (To_test.str_concat ["foo"; "bar"])
let test_list_concat () =
Alcotest.(check (list int)) "same lists" [1; 2; 3] (To_test.list_concat [1] [2; 3])
(* Run it *)
let () =
let open Alcotest in
run "Utils" [
"string-case", [
test_case "Lower case" `Quick test_lowercase;
test_case "Capitalization" `Quick test_capitalize;
];
"string-concat", [ test_case "String mashing" `Quick test_str_concat ];
"list-concat", [ test_case "List mashing" `Slow test_list_concat ];
] *)