VR6XFOE7TKQQYWIE26QTRB2XWYCZTNBWZ32SB43M2JVOW6EN5KJQC
(* | Some ("join", vchan) -> (
let guild_id = Option.get_exn guild_id in
let vchan = M.Snowflake.of_string vchan in
let voice = Client.voice client in
let call = Voice.Manager.get ~guild_id voice in
Voice.Call.join call ~channel_id:vchan >>= function
| Error e ->
let msg =
Msg.fmt "⚠️ Couldn't join voice channel: %a" Voice.Error.pp e
in
Client.send_message channel_id msg client
| Ok () -> Lwt.return_unit)
| Some ("leave", _) ->
let guild_id = Option.get_exn guild_id in
let voice = Client.voice client in
let call = Voice.Manager.get ~guild_id voice in
Voice.Call.leave call *)
| Some ("join", vchan) -> (
let guild_id = Option.get_exn guild_id in
let vchan = M.Snowflake.of_string vchan in
let voice = Client.voice client in
let call = Voice.Manager.get ~guild_id voice in
try Voice.Call.join call ~channel_id:vchan
with exn ->
let msg =
Msg.fmt "⚠️ Couldn't join voice channel: %s"
(Printexc.to_string exn)
in
Client.send_message channel_id msg client)
| Some ("leave", _) ->
let guild_id = Option.get_exn guild_id in
let voice = Client.voice client in
let call = Voice.Manager.get ~guild_id voice in
Voice.Call.leave call
(* open! Disco_core.Globals
open Lwt.Infix
module Snowflake = Disco_models.Snowflake
module E = Disco_core.Events
module Gateway = Disco_core.Gateway
module F = Relog.Field
open! Disco_core.Globals
module Snowflake = Disco_models.Snowflake
module E = Disco_core.Events
module Gateway = Disco_core.Gateway
module F = Relog.Field
module L = (val Relog.logger ~namespace:__MODULE__ ())
and gw_update = Srv of srv_info | Sess of sess_info | Dc
let wait_for_session conn =
let b = Backoff.create () in
let rec loop () =
match Atomic.get conn with
| Live session -> session
| Init ({ req = o_req; _ } as init) as st ->
let p, u = Promise.create () in
if
Atomic.compare_and_set conn st
(Init
{
init with
req =
(fun res ->
o_req res;
Promise.resolve u ());
})
then Promise.await p
else Backoff.once b;
loop ()
| Detached o_req as st ->
let p, u = Promise.create () in
if
Atomic.compare_and_set conn st
(Detached
(fun res ->
o_req res;
Promise.resolve u ()))
then Promise.await p
else Backoff.once b;
loop ()
in
loop ()
and conn_info = Empty | Got_srv of srv_info | Got_sess of sess_info
let manage ~net ~dmgr t =
Switch.run @@ fun sw ->
let user_id = (Gateway.user t.gw).id in
let connect ~srv ~sess =
Session.create ~sw ~net ~guild_id:t.guild_id ~user_id
~channel_id:sess.channel_id ~session_id:sess.session_id ~token:srv.token
srv.endpoint
in
let mixer_thread () =
Switch.on_release sw (fun () -> Mixer.destroy t.mixer);
Eio.Domain_manager.run dmgr @@ fun () ->
let play i f =
let s = wait_for_session t.conn in
Session.start_speaking s;
Session.send_rtp s f;
i < 15
in
let stop () = wait_for_session t.conn |> Session.stop_speaking in
Mixer.run ~play ~stop t.mixer
in
let b = Backoff.create () in
let rec main_thread () =
let ev = Eio.Stream.take t.evs in
handle ev
and cas ~ev ost nst fn =
if Atomic.compare_and_set t.conn ost nst then fn ()
else (
Backoff.once b;
handle ev)
and handle ev =
match (Atomic.get t.conn, ev) with
| (Detached o_req as st), Req_join (cid, n_req) ->
let req res =
o_req res;
n_req res
in
cas ~ev st (Init { cid; req; info = Empty }) @@ fun () ->
(* TODO timeout *)
L.info (fun m -> m "join request while detached");
Gateway.send_voice_state_update t.gw ~channel_id:cid ~self_mute:t.muted
~self_deaf:t.deafened t.guild_id;
main_thread ()
| Detached _, (Gw_dc | Gw_sess _ | Gw_srv _) ->
L.warn (fun m -> m "gateway event on detached call");
main_thread ()
| Init { info = Got_sess sess; req; _ }, Gw_srv srv
| Init { info = Got_srv srv; req; _ }, Gw_sess sess ->
(* any race conditions with mixer that require a CAS & extra state ? *)
let res =
match connect ~srv ~sess with
| sess ->
Atomic.set t.conn @@ Live sess;
Ok ()
| exception exn ->
Atomic.set t.conn @@ Detached stub;
Error exn
in
req res;
main_thread ()
| (Init ({ info = Got_sess _ | Empty; _ } as init) as st), Gw_sess sess ->
cas ~ev st (Init { init with info = Got_sess sess }) @@ main_thread
| (Init ({ info = Got_srv _ | Empty; _ } as init) as st), Gw_srv srv ->
cas ~ev st (Init { init with info = Got_srv srv }) @@ main_thread
| (Init { req; _ } as st), Gw_dc ->
cas ~ev st (Detached stub) @@ fun () -> req @@ Error Detached
| (Init ({ cid; req = o_req; _ } as init) as st), Req_join (n_cid, n_req)
when Snowflake.(cid = n_cid) ->
let req res =
o_req res;
n_req res
in
cas ~ev st (Init { init with req }) @@ main_thread
| (Init ({ req = o_req; _ } as init) as st), Req_join _ ->
let req res =
o_req res;
Eio.Stream.add t.evs ev
in
cas ~ev st (Init { init with req }) @@ main_thread
| Live sess, Req_join (cid, req)
when Snowflake.(cid = Session.channel_id sess) ->
req @@ Ok ()
| (Live sess as st), Req_join _ ->
cas ~ev st (Detached stub) @@ fun () ->
L.info (fun m -> m "join request while live, switching channels");
Session.disconnect sess;
handle ev
| (Live sess as st), Gw_sess nsess
when Snowflake.(Session.channel_id sess <> nsess.channel_id)
|| String.(Session.session_id sess <> nsess.session_id) ->
let srv = Session.{ token = token sess; endpoint = endpoint sess } in
cas ~ev st
(Init { req = stub; cid = nsess.channel_id; info = Got_srv srv })
@@ fun () ->
L.warn (fun m -> m "new voice session info, reconnecting");
Session.disconnect sess;
handle ev
| (Live sess as st), Gw_dc ->
cas ~ev st (Detached stub) @@ fun () ->
L.info (fun m -> m "disconnected");
Session.disconnect sess;
main_thread ()
| Live _, (Gw_sess _ | Gw_srv _) -> main_thread ()
in
Fiber.both main_thread mixer_thread;
Mixer.destroy t.mixer;
raise Exit
let make ?(muted = false) ?(deafened = false) ~guild_id gw =
let ( let+ ) = Result.( let+ ) in
let op_rx = Lwt_pipe.create () in
let mixer_rx = Lwt_pipe.create () in
let+ mixer = Mixer.create (Lwt_pipe.write mixer_rx) in
let t = { gw; guild_id; muted; deafened; tx = op_rx; mixer } in
let make ~sw ~net ~dmgr ?(muted = false) ?(deafened = false) ~guild_id gw =
let t =
{
gw;
guild_id;
mixer = Mixer.create ();
conn = Atomic.make @@ Detached stub;
muted;
deafened;
evs = Eio.Stream.create max_int;
}
in
Fiber.fork ~sw (fun () -> try manage ~net ~dmgr t with Exit -> ());
t
let read_op () =
Lwt_pipe.read op_rx >|= Option.get_exn >|= function
| Req r -> `Op r
| Gw g -> `Gw g
in
let _read_mixer () =
Lwt_pipe.read mixer_rx >|= Option.get_exn >|= fun mix -> `Mixer mix
in
let update_server ~token ~endpoint t =
Eio.Stream.add t.evs @@ Gw_srv { token; endpoint }
let do_connect ~srv ~sess =
let user_id = (Gateway.user t.gw).id in
let guild_id = t.guild_id in
Session.create ~guild_id ~user_id ~channel_id:sess.channel_id
~session_id:sess.session_id ~token:srv.token srv.endpoint
in
let update_session ?channel_id ~session_id t =
let ev =
match channel_id with
| None -> Gw_dc
| Some channel_id -> Gw_sess { channel_id; session_id }
in
Eio.Stream.add t.evs ev
let run () =
let conn = ref Detached in
let rec poll q =
Lwt.nchoose_split q >>= function
| [], [] -> Lwt_result.return ()
| rs, ps -> (
match handle ps rs with
| Ok [] -> Lwt_result.return ()
| Ok q -> poll q
| Error _ as err -> Lwt.return err)
and handle out rs =
match (rs, !conn) with
| [], _ -> Ok out
| `Noop :: xs, _ -> handle out xs
| `Cancel :: xs, Init { req; _ } ->
Lwt.wakeup_later (snd req) @@ Error.msg "timed out";
conn := Detached;
handle out xs
| `Cancel :: xs, _ -> handle out xs
| `Connected (Ok session) :: xs, (Init { req; _ } | Connecting { req; _ })
->
conn := Live session;
L.info (fun m -> m "connected");
Lwt.wakeup_later (snd req) (Ok ());
handle out xs
| ( `Connected (Error _ as err) :: xs,
(Init { req; _ } | Connecting { req; _ }) ) ->
conn := Detached;
L.err (fun m -> m "error connecting");
Lwt.wakeup_later (snd req) err;
handle out xs
| `Connected (Ok session) :: xs, _ ->
conn := Live session;
handle out xs
| `Connected (Error _) :: xs, _ ->
conn := Detached;
handle out xs
| ( (`Op (Join (cid, nreq)) as op) :: xs,
(Init { req; channel_id; _ } | Connecting { req; channel_id; _ }) ) ->
L.info (fun m -> m "join request while connecting");
if Snowflake.(cid = channel_id) then (
Lwt.async (fun () ->
fst req >|= fun res -> Lwt.wakeup_later (snd nreq) res);
handle (read_op () :: out) xs)
else handle (Lwt.return op :: read_op () :: out) xs
| `Op (Join (channel_id, req)) :: xs, Live session ->
L.info (fun m -> m "join request while connected");
if Snowflake.(channel_id = Session.channel_id session) then (
Lwt.wakeup_later (snd req) (Ok ());
handle (read_op () :: out) xs)
else (
conn := Init { channel_id; req; info = Empty };
let reconn =
Session.disconnect session >>= fun () ->
Gateway.send_voice_state_update t.gw ~channel_id
~self_mute:t.muted ~self_deaf:t.deafened t.guild_id
>|= fun () -> `Noop
in
handle (reconn :: read_op () :: out) xs)
| `Op (Join (channel_id, req)) :: xs, Detached ->
L.info (fun m -> m "join request while detached");
conn := Init { channel_id; req; info = Empty };
let send =
Gateway.send_voice_state_update t.gw ~channel_id ~self_mute:t.muted
~self_deaf:t.deafened t.guild_id
>|= fun () -> `Noop
in
let timeout =
Lwt.pick
[
(Lwt_unix.sleep 5. >|= fun () -> `Cancel);
(fst req >|= fun _ -> `Noop);
]
in
handle (send :: timeout :: read_op () :: out) xs
| `Gw Dc :: xs, Live session ->
L.info (fun m -> m "disconnected");
conn := Detached;
let dc = Session.disconnect session >|= fun () -> `Noop in
handle (dc :: read_op () :: out) xs
| `Gw (Srv srv) :: xs, Live s ->
if
String.(
Session.endpoint s <> srv.endpoint || Session.token s <> srv.token)
then (
L.warn (fun m -> m "new voice server info, reconnecting");
let req = Lwt.wait () in
let sess =
{
session_id = Session.session_id s;
channel_id = Session.channel_id s;
}
in
conn := Connecting { channel_id = sess.channel_id; req };
let reconn =
Session.disconnect s >>= fun () ->
do_connect ~srv ~sess >|= fun res -> `Connected res
in
handle (reconn :: read_op () :: out) xs)
else handle (read_op () :: out) xs
| `Gw (Sess sess) :: xs, Live s ->
if
Snowflake.(Session.channel_id s <> sess.channel_id)
|| String.(Session.session_id s <> sess.session_id)
then (
L.warn (fun m -> m "new voice session info, reconnecting");
let req = Lwt.wait () in
let srv =
{ token = Session.token s; endpoint = Session.endpoint s }
in
conn := Connecting { channel_id = sess.channel_id; req };
let reconn =
Session.disconnect s >>= fun () ->
do_connect ~srv ~sess >|= fun res -> `Connected res
in
handle (reconn :: read_op () :: out) xs)
else handle (read_op () :: out) xs
| `Gw _ :: xs, Detached ->
L.warn (fun m -> m "gateway event on detached call");
handle (read_op () :: out) xs
| (`Gw _ as ev) :: xs, Connecting _ ->
L.info (fun m -> m "buffering gw event while connecting");
handle (Lwt.return ev :: read_op () :: out) xs
| `Gw (Srv srv) :: xs, Init { info = Got_sess sess; channel_id; req }
| `Gw (Sess sess) :: xs, Init { info = Got_srv srv; channel_id; req } ->
conn := Connecting { channel_id; req };
let connect = do_connect ~srv ~sess >|= fun res -> `Connected res in
handle (connect :: read_op () :: out) xs
| `Gw (Srv nsrv) :: xs, Init ({ info = Got_srv _ | Empty; _ } as init) ->
conn := Init { init with info = Got_srv nsrv };
handle (read_op () :: out) xs
| `Gw (Sess nsess) :: xs, Init ({ info = Got_sess _ | Empty; _ } as init)
->
conn := Init { init with info = Got_sess nsess };
handle (read_op () :: out) xs
| `Gw Dc :: xs, Init { req; _ } ->
Lwt.wakeup_later (snd req) (Error.msg "dc'ed when connecting?");
handle (read_op () :: out) xs
in
poll [ read_op () ]
in
let evloop =
run () >|= function
| Ok () -> ()
| Error e -> L.err (fun m -> m "call evloop crashed: %a" Error.pp e)
in
Lwt_pipe.keep op_rx evloop;
Lwt.on_termination evloop (fun () ->
Lwt_pipe.close_nonblock op_rx;
Mixer.destroy mixer;
Lwt_pipe.close_nonblock mixer_rx);
t
let update_server ~token ~endpoint t =
Lwt_pipe.write_exn t.tx @@ Gw (Srv { token; endpoint })
let update_session ?channel_id ~session_id t =
let upd =
match channel_id with
| None -> Dc
| Some channel_id -> Sess { channel_id; session_id }
in
Lwt_pipe.write_exn t.tx @@ Gw upd
let join ~channel_id t =
let p, u = Promise.create () in
Eio.Stream.add t.evs @@ Req_join (channel_id, Promise.resolve u);
Promise.await_exn p
let join ~channel_id t =
let req = Lwt.wait () in
Lwt_pipe.write_exn t.tx @@ Req (Join (channel_id, req)) >>= fun () -> fst req
let leave t =
Lwt_pipe.write t.tx @@ Gw Dc >>= fun _ ->
Gateway.send_voice_state_update t.gw ~self_mute:t.muted ~self_deaf:t.deafened
t.guild_id *)
let leave t =
Eio.Stream.add t.evs Gw_dc;
Gateway.send_voice_state_update t.gw ~self_mute:t.muted ~self_deaf:t.deafened
t.guild_id
(* open! Disco_core.Globals
module Snowflake = Disco_models.Snowflake
module E = Disco_core.Events
module Sf_map = Map.Make (Snowflake)
module Gateway = Disco_core.Gateway
module L = (val Relog.logger ~namespace:__MODULE__ ())
module F = Relog.Field
open! Disco_core.Globals
module Snowflake = Disco_models.Snowflake
module E = Disco_core.Events
module Sf_map = Map.Make (Snowflake)
module Gateway = Disco_core.Gateway
module L = (val Relog.logger ~namespace:__MODULE__ ())
module F = Relog.Field
let server_update t srv =
Eio_mutex.with_ t.mtx @@ fun () ->
L.info (fun m -> m "srv update");
Sf_map.get srv.E.Voice_server_update.guild_id t.calls
|> Option.iter (fun call ->
Call.update_server ~token:srv.token ~endpoint:srv.endpoint call)
let server_update t srv =
Eio_mutex.with_ t.mtx @@ fun () ->
L.info (fun m -> m "srv update");
Sf_map.get srv.E.Voice_server_update.guild_id t.calls
|> Option.iter (fun call ->
Call.update_server ~token:srv.token ~endpoint:srv.endpoint call)
let state_update t vst =
Eio_mutex.with_ t.mtx @@ fun () ->
L.info (fun m -> m "state update");
match vst.E.Voice_state.guild_id with
| Some guild_id when Snowflake.(user_id t = vst.user_id) ->
Sf_map.get guild_id t.calls
|> Option.iter (fun call ->
Call.update_session ?channel_id:vst.channel_id
~session_id:vst.session_id call)
| _ -> ()
let state_update t vst =
Eio_mutex.with_ t.mtx @@ fun () ->
L.info (fun m -> m "state update");
match vst.E.Voice_state.guild_id with
| Some guild_id when Snowflake.(user_id t = vst.user_id) ->
Sf_map.get guild_id t.calls
|> Option.iter (fun call ->
Call.update_session ?channel_id:vst.channel_id
~session_id:vst.session_id call)
| _ -> ()
let make gw =
let t = { mtx = Eio_mutex.make (); calls = Sf_map.empty; gw } in
Gateway.sub gw ~fn:(function
| E.Voice_server_update v -> server_update t v
| Voice_state_update v -> state_update t
| _ -> ());
t
let make ~env ~sw gw =
let net = Eio.Stdenv.net env in
let dmgr = Eio.Stdenv.domain_mgr env in
let evs = Eio.Stream.create 0 in
let poison () = Eio.Stream.add evs `Poison in
let rec t =
{
mtx = Eio_mutex.make ();
calls = Sf_map.empty;
gw;
spawn;
poison;
dead = false;
}
and spawn guild_id =
let p, u = Promise.create () in
Eio.Stream.add evs @@ `Spawn (guild_id, Promise.resolve u);
Promise.await_exn p
in
Gateway.sub gw ~fn:(function
| E.Voice_server_update v -> server_update t v
| Voice_state_update v -> state_update t v
| _ -> ());
let run () =
Switch.run @@ fun sw ->
let rec loop () =
match Eio.Stream.take evs with
| `Poison ->
Eio_mutex.with_ t.mtx @@ fun () ->
Sf_map.to_seq t.calls
|> Seq.map (fun (_, call) () -> Call.leave call)
|> Seq.to_list |> Fiber.all;
t.calls <- Sf_map.empty
| `Spawn (guild_id, req) ->
Eio_mutex.with_ t.mtx (fun () ->
match Sf_map.get guild_id t.calls with
| Some call -> req @@ Ok call
| None ->
let res =
Result.guard @@ fun () ->
Call.make ~sw ~net ~dmgr ~guild_id gw
in
Result.iter
(fun call -> t.calls <- Sf_map.add guild_id call t.calls)
res;
req res);
loop ()
in
loop ();
raise Exit
in
Fiber.fork ~sw (fun () -> try run () with Exit -> ());
t
let get t ~guild_id =
Eio_mutex.with_ t.mtx @@ fun () ->
match Sf_map.get guild_id t.calls with
| Some call -> call
| None ->
let call = Call.make t.gw ~guild_id |> Result.get_exn in
t.calls <- Sf_map.add guild_id call t.calls;
call *)
let get t ~guild_id =
Eio_mutex.lock t.mtx;
match Sf_map.get guild_id t.calls with
| Some call ->
Eio_mutex.unlock t.mtx;
call
| None ->
Eio_mutex.unlock t.mtx;
t.spawn guild_id
type pcm_f32_frame =
(float, Bigarray.float32_elt, Bigarray.c_layout) Bigarray.Array1.t
let spin_sleep ?(acc = 100_000L) dur =
let b = Backoff.create () in
let elapsed =
let start = Mtime_clock.now () in
fun () -> Mtime.span start (Mtime_clock.now ()) |> Mtime.Span.to_uint64_ns
in
(if Int64.(dur > acc) then
let init_s = Int64.(to_float (dur - acc)) /. 1e9 in
Unix.sleepf init_s);
while Int64.(elapsed () < dur) do
Backoff.once b
done
and state = Idle | Playing of stream
and stream = { waker : waker; rx : bigstring Lwt_pipe.Reader.t }
and waker = bool Lwt.t * bool Lwt.u
and 'a res = ('a, exn) result -> unit
and state = Idle | Playing of stream
and stream = { waker : bool -> unit; rx : Audio_stream.t }
let waker : unit -> waker = Lwt.task
let make_silence ?n () = Audio_stream.n_silence_pipe ?n ()
let create () = { state = Idle; silence = None; dead = false }
let now () = Mtime_clock.elapsed_ns ()
let playing t = match t.state with Playing s -> Some s | _ -> None
let is_playing t = match t.state with Playing _ -> true | _ -> false
let make_silence ?n () = Audio_stream.silence_frames ?n ()
let destroy_active t =
match t.state with
| Playing s ->
Lwt.wakeup_later (snd s.waker) false;
t.state <- Idle
| Idle -> ()
let create () =
{
reqs = Lf_queue.create ();
state = Idle;
silence = None;
dead = Atomic.make false;
wakeup = Atomic.make None;
}
let stop = destroy_active
let wakeup t =
let b = Backoff.create () in
let rec loop () =
match Atomic.get t.wakeup with
| None -> ()
| Some w as waker when Atomic.compare_and_set t.wakeup waker None -> w ()
| Some _ ->
Backoff.once b;
loop ()
in
loop ()
let play ~s t =
match t.state with
| Playing os ->
Lwt.wakeup_later (snd os.waker) false;
if Option.is_none t.silence then t.silence <- Some (make_silence ());
t.state <- Playing s
| Idle -> t.state <- Playing s
let check t = if Atomic.get t.dead then raise Destroyed
let run ~yield ~play ~stop t =
let rec exhaust ?(i = 0) p =
Lwt_pipe.read p >>= function
| Some frame ->
play (i + 1) frame >>= fun ok ->
let i = i + 1 in
if ok then exhaust ~i p else Lwt.return (`Yield i)
| None -> Lwt.return (`Closed i)
in
let framelen = float Rtp._FRAME_LEN /. 1e3 in
let schedule_next ?drift ?(frames = 1) () =
let timeout = float frames *. framelen in
let with_drift d =
let delta = Int64.(now () - d |> to_float) /. 1e9 in
timeout -. Float.min delta timeout
in
let timeout =
Option.map with_drift drift |> Option.get_or ~default:timeout
in
L.trace (fun m ->
m "sent %d frames, next tick in %f seconds" frames timeout);
Lwt_unix.sleep timeout >|= fun () -> `Tick
in
let last_yield = ref None in
let last_tick = ref (now ()) in
let rec yield_till tick =
let y =
match !last_yield with
| Some y -> y
| None ->
let y = yield t in
last_yield := Some y;
y
in
Lwt.choose [ tick; (y >|= fun () -> `Yield) ] >>= function
| `Yield when t.dead ->
Lwt.cancel tick;
Lwt.return_unit
| `Yield ->
last_yield := None;
yield_till tick
| `Tick ->
let t = now () in
L.trace (fun m ->
let delta = Int64.(t - !last_tick |> to_float) in
m "tick +%2fms" (delta /. 1e6));
last_tick := t;
Lwt.return_unit
in
let rec f' ?(drift = now ()) ?(i = 0) () =
match (t.silence, t.state) with
| _ when t.dead ->
destroy_active t;
Lwt.return_unit
| Some s, st -> (
exhaust ~i s >>= function
| `Closed i ->
t.silence <- None;
(match st with Idle -> stop () | _ -> Lwt.return_unit)
>>= f' ~drift ~i
| `Yield frames -> yield_till (schedule_next ~drift ~frames ()) >>= f'
)
| None, Idle ->
(match !last_yield with
| Some y ->
last_yield := None;
y
| None -> yield t)
>>= f'
| None, Playing s -> (
exhaust ~i s.rx >>= function
| `Closed i ->
t.silence <- Some (make_silence ());
t.state <- Idle;
Lwt.wakeup_later (snd s.waker) true;
f' ~drift ~i ()
| `Yield frames ->
yield_till (schedule_next ~drift ~frames ()) >>= fun () -> f' ())
in
f' ()
end
let now_playing t =
let p, u = Promise.create () in
req t (Now_playing (Promise.resolve u));
Promise.await_exn p
let is_playing t = Option.is_some @@ now_playing t
let stop t =
let p, u = Promise.create () in
req t (Stop (Promise.resolve u));
Promise.await_exn p
type t = { driver : Driver.t; evloop_tx : evloop_msg Lwt_pipe.Writer.t }
and evloop_msg = Stop | Play of pcm_s16_frame Lwt_pipe.Reader.t * Driver.waker
let play ~s t =
let p, u = Promise.create () in
req t (Play (s, Promise.resolve u));
Promise.await_exn p
let create ?(burst = 15) out =
let chan = Lwt_pipe.create () in
let evloop_tx = Lwt_pipe.Writer.map ~f:(fun msg -> `Req msg) chan in
let driver = Driver.create () in
let yield driver =
Lwt_pipe.read chan >|= Option.get_or ~default:`Poison >|= function
| `Req (Play (s, waker)) ->
let s = Lwt_pipe.Reader.filter_map ~f:encode s in
Driver.play driver ~s:{ waker; rx = s }
| `Req Stop -> Driver.stop driver
| `Poison -> Driver.destroy driver
in
let play i frame = out (`Play frame) >|= fun r -> r && i < burst in
let stop () = out `Stop >|= ignore in
let f = Driver.run ~yield ~play ~stop driver in
Lwt_pipe.keep chan f;
Lwt_pipe.link_close chan ~after:evloop_tx;
Lwt.on_termination f (fun () -> Lwt_pipe.close_nonblock chan);
{ driver; evloop_tx }
let destroy t =
Atomic.set t.dead true;
wakeup t
let play ?(k = fun ~status:_ s -> Lwt_pipe.close_nonblock s) t stream =
let ((wp, _) as waker) = Driver.waker () in
Lwt.on_success wp (fun status -> k ~status stream);
Lwt_pipe.write_exn t.evloop_tx (Play (stream, waker))
let destroy_active t =
match t.state with
| Playing s ->
s.waker false;
t.state <- Idle
| Idle -> ()
let destroy t = Lwt_pipe.close_nonblock t.evloop_tx *)
let run ~play ~stop t =
let now () = Mtime_clock.elapsed_ns () in
let rec exhaust ?(i = 0) src =
Audio_stream.read src |> function
| Some frame ->
let i = i + 1 in
if play (i + 1) frame then exhaust ~i src else `Yield i
| None -> `Closed i
in
let framelen = Int64.(of_int Rtp._FRAME_LEN * 100_000L) in
let schedule_next ?drift ?(frames = 1) () =
let timeout = Int64.(of_int frames * framelen) in
let with_drift d =
let delta = Int64.(now () - d) in
Int64.(timeout - min delta timeout)
in
let timeout =
Option.map with_drift drift |> Option.get_or ~default:timeout
in
L.trace (fun m ->
m "sent %d frames, next tick in %Ld ns (%f s)" frames timeout
(Int64.to_float timeout /. 1e9));
spin_sleep timeout
in
let rec loop ?(drift = now ()) ?(i = 0) () =
if Atomic.get t.dead then (
destroy_active t;
(* TODO @quartz55: cancel/cleanup queued requests (how?) *)
raise Exit);
handle_reqs ();
process ~drift ~i
and handle_reqs () =
match Lf_queue.pop t.reqs with
| Some (Play (s, res)) ->
(match t.state with
| Playing os ->
os.waker false;
if Option.is_none t.silence then t.silence <- Some (make_silence ());
t.state <- Playing s
| Idle -> t.state <- Playing s);
res (Ok ());
handle_reqs ()
| Some (Stop res) ->
destroy_active t;
res (Ok ());
handle_reqs ()
| Some (Now_playing res) ->
res @@ Ok (match t.state with Idle -> None | Playing s -> Some s);
handle_reqs ()
| None -> ()
and process ~drift ~i =
match (t.silence, t.state) with
| Some s, st -> (
exhaust ~i s |> function
| `Closed i ->
t.silence <- None;
(match st with Idle -> stop () | _ -> ());
loop ~drift ~i ()
| `Yield frames ->
schedule_next ~drift ~frames ();
loop ())
| None, Playing s -> (
exhaust ~i s.rx |> function
| `Closed i ->
t.silence <- Some (make_silence ());
t.state <- Idle;
s.waker true;
loop ~drift ~i ()
| `Yield frames ->
schedule_next ~drift ~frames ();
loop ())
| None, Idle ->
assert (Option.is_none @@ Atomic.get t.wakeup);
let p, u = Promise.create () in
Atomic.set t.wakeup (Some (Promise.resolve u));
Promise.await p;
loop ()
in
try loop () with Exit -> ()
let guild_id { guild_id; _ } = guild_id
let user_id { user_id; _ } = user_id
let channel_id { channel_id; _ } = channel_id
let session_id { session_id; _ } = session_id
let token { token; _ } = token
let endpoint { endpoint; _ } = endpoint
let discord_addr = `Udp (Eio.Net.Ipaddr.of_raw ip, port) in
let sock = Eio.Net.datagram_socket ~sw net discord_addr in
L.info (fun m ->
m "discovering external ip using discord's voice server: %s:%d" ip port);
let ipaddr = Eio_unix.Ipaddr.of_unix @@ Unix.inet_addr_of_string ip in
let discord_addr = `Udp (ipaddr, port) in
L.dbg (fun m ->
m "discovering external ip using discord's voice server: %a"
Eio.Net.Sockaddr.pp discord_addr);
let sock =
Eio.Net.datagram_socket ~sw net @@ `Udp (Eio.Net.Ipaddr.V4.any, 0)
in
L.info (fun m -> m "discovered ip and port: %s:%d" ip_d port_d);
let local_addr = `Udp (Eio.Net.Ipaddr.of_raw ip_d, port_d) in
L.dbg (fun m -> m "discovered ip and port: %s:%d" ip_d port_d);
let ipaddr_d = Eio_unix.Ipaddr.of_unix @@ Unix.inet_addr_of_string ip_d in
let local_addr = `Udp (ipaddr_d, port_d) in