HMJFS6WWD2JJZB57HBXRHYS2A2N25YDDAJ443KRMT6K3ALHIBRVQC
let cli_fmter = Relog.Formatter.default ~color:true ~oneline:false () in
let cli_fmt = Format.std_formatter in
Relog.Sink.make (fun r ->
if Relog.(Level.compare (Record.level r) Level.Debug) <= 0 then
cli_fmter cli_fmt r
else ())
let open Relog in
let cli_fmter = Formatter.default ~color:true ~oneline:false () in
let cli_fmt = Format.formatter_of_out_channel stderr in
Sink.make (fun r ->
if Level.Infix.(Record.level r <= Debug) then cli_fmter cli_fmt r else ())
module Mpmc = Dp_utils.Mpmc
let () =
let source, sink = Mpmc.create () in
let sink2 = Mpmc.Sink.(clone sink |> map ~f:(fun i -> i * 2)) in
let _cancel =
Mpmc.Sink.iter sink2 ~f:(function
| Some v -> Printf.printf "(sink 2) %d\n%!" v
| None -> Printf.printf "(sink 2) dropped\n%!")
in
let _cancel =
Mpmc.Sink.iter sink ~f:(function
| Some v -> Printf.printf "(sink 1) %d\n%!" v
| None -> Printf.printf "(sink 1) dropped\n%!")
in
Seq.(1 -- 10 |> iter (fun v -> Mpmc.Source.write source v |> ignore));
Seq.(1 -- 5 |> iter (fun v -> Mpmc.Source.write source v |> ignore));
Mpmc.Source.close source
"ocaml@4.11.2000@d41d8cd9", "@reason-native/pastel@0.3.0@d41d8cd9",
"@opam/yojson@opam:1.7.0@7056d985",
"@opam/ptime@opam:0.8.5@0051d642", "@opam/dune@opam:2.8.4@ee414d6c",
"@opam/containers@opam:3.2@c4e3f662",
"@esy-ocaml/reason@3.7.0@d41d8cd9"
"ocaml@4.11.2000@d41d8cd9", "@opam/yojson@opam:1.7.0@7056d985",
"@opam/reason@opam:3.7.0@191be014",
"@opam/ptime@opam:0.8.5@0051d642", "@opam/dune@opam:2.8.4@1490e2a1",
"@opam/containers@opam:3.2@c4e3f662"
},
"@reason-native/pastel@0.3.0@d41d8cd9": {
"id": "@reason-native/pastel@0.3.0@d41d8cd9",
"name": "@reason-native/pastel",
"version": "0.3.0",
"source": {
"type": "install",
"source": [
"archive:https://registry.npmjs.org/@reason-native/pastel/-/pastel-0.3.0.tgz#sha1:07da3c5a0933e61bc3b353bc85aa71ac7c0f311c"
]
},
"overrides": [],
"dependencies": [
"ocaml@4.11.2000@d41d8cd9", "@opam/re@opam:1.9.0@d4d5e13d",
"@opam/dune@opam:2.8.4@ee414d6c", "@esy-ocaml/reason@3.7.0@d41d8cd9"
],
"devDependencies": []
"ocaml@4.11.2000@d41d8cd9", "@opam/dune@opam:2.8.4@ee414d6c",
"ocaml@4.11.2000@d41d8cd9", "@opam/dune@opam:2.8.4@1490e2a1",
"@esy-ocaml/substs@0.0.1@d41d8cd9"
],
"devDependencies": [
"ocaml@4.11.2000@d41d8cd9", "@opam/dune@opam:2.8.4@1490e2a1"
]
},
"@opam/reason@opam:3.7.0@191be014": {
"id": "@opam/reason@opam:3.7.0@191be014",
"name": "@opam/reason",
"version": "opam:3.7.0",
"source": {
"type": "install",
"source": [
"archive:https://opam.ocaml.org/cache/md5/7e/7eb8cbbff8565b93ebfabf4eca7254d4#md5:7eb8cbbff8565b93ebfabf4eca7254d4",
"archive:https://registry.npmjs.org/@esy-ocaml/reason/-/reason-3.7.0.tgz#md5:7eb8cbbff8565b93ebfabf4eca7254d4"
],
"opam": {
"name": "reason",
"version": "3.7.0",
"path": "esy.lock/opam/reason.3.7.0"
}
},
"overrides": [],
"dependencies": [
"ocaml@4.11.2000@d41d8cd9", "@opam/result@opam:1.5@6b753c82",
"@opam/ppx_derivers@opam:1.2.1@ecf0aa45",
"@opam/ocamlfind@opam:1.8.1@b7dc3072",
"@opam/merlin-extend@opam:0.6@404f814c",
"@opam/menhir@opam:20210310@50de9216",
"@opam/fix@opam:20201120@5c318621", "@opam/dune@opam:2.8.4@1490e2a1",
"ocaml@4.11.2000@d41d8cd9", "@opam/dune@opam:2.8.4@ee414d6c"
"ocaml@4.11.2000@d41d8cd9", "@opam/result@opam:1.5@6b753c82",
"@opam/ppx_derivers@opam:1.2.1@ecf0aa45",
"@opam/merlin-extend@opam:0.6@404f814c",
"@opam/menhir@opam:20210310@50de9216",
"@opam/fix@opam:20201120@5c318621", "@opam/dune@opam:2.8.4@1490e2a1"
"archive:https://opam.ocaml.org/cache/sha256/4e/4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac#sha256:4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac",
"archive:https://github.com/ocaml/dune/releases/download/2.8.4/dune-2.8.4.tbz#sha256:4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac"
"archive:https://opam.ocaml.org/cache/sha256/79/79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629#sha256:79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629",
"archive:https://github.com/ocaml/dune/releases/download/2.8.5/dune-2.8.5.tbz#sha256:79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629"
"archive:https://opam.ocaml.org/cache/sha256/4e/4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac#sha256:4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac",
"archive:https://github.com/ocaml/dune/releases/download/2.8.4/dune-2.8.4.tbz#sha256:4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac"
"archive:https://opam.ocaml.org/cache/sha256/79/79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629#sha256:79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629",
"archive:https://github.com/ocaml/dune/releases/download/2.8.5/dune-2.8.5.tbz#sha256:79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629"
"@esy-ocaml/reason@3.7.0@d41d8cd9": {
"id": "@esy-ocaml/reason@3.7.0@d41d8cd9",
"name": "@esy-ocaml/reason",
"version": "3.7.0",
"source": {
"type": "install",
"source": [
"archive:https://registry.npmjs.org/@esy-ocaml/reason/-/reason-3.7.0.tgz#sha1:36f92c1c854477c4da37e4769a045ffe60e4fb10"
]
},
"overrides": [],
"dependencies": [
"ocaml@4.11.2000@d41d8cd9", "@opam/result@opam:1.5@6b753c82",
"@opam/ppx_derivers@opam:1.2.1@ecf0aa45",
"@opam/ocamlfind@opam:1.8.1@b7dc3072",
"@opam/merlin-extend@opam:0.6@404f814c",
"@opam/menhir@opam:20210310@50de9216",
"@opam/fix@opam:20201120@5c318621", "@opam/dune@opam:2.8.4@ee414d6c"
],
"devDependencies": []
},
"sha256=4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac"
"sha512=efc1834c4add40138a101734665a1f462c19fe76d1cbb457b1fc20f95991118a50b24d485fb98d39046e41bec03885a8dc071bf8add51083ac9780bff9f6668a"
"sha256=79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629"
"sha512=4ef6cdea0768a29de0108cb61b04ef471cb494762c865265f20d7d15ed65a39557f7e34f2dbd466352a6567cce29d7ba21be6569afafbcfc2871720b9466dcae"
"sha256=4e6420177584aabdc3b7b37aee3026b094b82bf5d7ed175344a68e321f72e8ac"
"sha512=efc1834c4add40138a101734665a1f462c19fe76d1cbb457b1fc20f95991118a50b24d485fb98d39046e41bec03885a8dc071bf8add51083ac9780bff9f6668a"
"sha256=79011283fb74c7a27eb17ad752efbcc39b39633cbacc8d7be97e8ea869443629"
"sha512=4ef6cdea0768a29de0108cb61b04ef471cb494762c865265f20d7d15ed65a39557f7e34f2dbd466352a6567cce29d7ba21be6569afafbcfc2871720b9466dcae"
opam-version: "2.0"
maintainer: "Jordan Walke <jordojw@gmail.com>"
authors: [ "Jordan Walke <jordojw@gmail.com>" ]
license: "MIT"
homepage: "https://github.com/facebook/reason"
doc: "http://reasonml.github.io/"
bug-reports: "https://github.com/facebook/reason/issues"
dev-repo: "git://github.com/facebook/reason.git"
tags: [ "syntax" ]
build: [
["dune" "build" "-p" name "-j" jobs]
]
depends: [
"ocaml" {>= "4.03" & < "4.13"}
"dune" {>= "1.4"}
"ocamlfind" {build}
"menhir" {>= "20170418"}
"merlin-extend" {>= "0.6"}
"ppx_derivers" {< "2.0"}
"fix"
"result"
]
synopsis: "Reason: Syntax & Toolchain for OCaml"
description: """
Reason gives OCaml a new syntax that is remniscient of languages like
JavaScript. It's also the umbrella project for a set of tools for the OCaml &
JavaScript ecosystem."""
url {
src: "https://registry.npmjs.org/@esy-ocaml/reason/-/reason-3.7.0.tgz"
checksum: "md5=7eb8cbbff8565b93ebfabf4eca7254d4"
}
let y = yield t >|= fun () -> if t.dead then `Dead else `Yield in
Lwt.choose [ tick; y ] >>= function
| `Dead ->
let y =
match !last_yield with
| Some y -> y
| None ->
let y = yield t in
last_yield := Some y;
y
in
Lwt.choose [ tick; (y >|= fun () -> `Yield) ] >>= function
| `Yield when t.dead ->
(library
(name dp_utils)
(public_name discopotty.utils)
(preprocess
(pps ppx_deriving.show ppx_deriving.ord))
(libraries containers containers-data ke stdint lwt relog.lib))
open Containers
type 'a sink = {
mutable readers : 'a reader Ke.Fke.t;
mutable buf : 'a Ke.Fke.t;
mutable dropped : bool;
clone : unit -> 'a sink;
}
and 'a reader = { mutable canceled : bool; dispatch : 'a option -> unit }
type 'a source = { mutable sinks : 'a sink Weak.t; mutable closed : bool }
let stub = Sys.opaque_identity (fun () -> ())
module Sink = struct
type 'a t = 'a sink
let clone t =
if t.dropped then failwith "cannot clone dropped sink";
let s = t.clone () in
s.buf <- t.buf;
s
let rec make ?clone () =
{
readers = Ke.Fke.empty;
buf = Ke.Fke.empty;
dropped = false;
clone = clone |> Option.get_or ~default:(fun () -> make ());
}
let peek t = Ke.Fke.peek t.buf
let read_opt t =
match Ke.Fke.pop t.buf with
| Some (a, tl) ->
t.buf <- tl;
Some a
| None -> None
let schedule_read t ~f =
match read_opt t with
| Some a ->
f (Some a);
stub
| None when t.dropped ->
f None;
stub
| None ->
let r = { canceled = false; dispatch = f } in
t.readers <- Ke.Fke.push t.readers r;
fun () -> if not r.canceled then r.canceled <- true
let push t v =
if t.dropped then false
else
let rec f q =
match Ke.Fke.pop q with
| Some ({ canceled = true; _ }, tl) ->
print_endline "skipping cancelled reader";
f tl
| Some ({ dispatch; _ }, tl) ->
print_endline "waking up reader";
t.readers <- tl;
dispatch (Some v)
| None ->
print_endline "pushed to buffer";
t.readers <- Ke.Fke.empty;
t.buf <- Ke.Fke.push t.buf v
in
f t.readers;
true
let drop t =
if not t.dropped then (
t.dropped <- true;
Ke.Fke.iter
(function
| { canceled = true; _ } -> () | { dispatch; _ } -> dispatch None)
t.readers;
t.readers <- Ke.Fke.empty)
let map ~f t =
let o = make () in
let rec fwd = function
| Some v ->
if push o (f v) then schedule_read t ~f:fwd |> ignore else drop o
| None -> drop o
in
schedule_read t ~f:fwd |> ignore;
o
let filter ~f t =
let o = make () in
let rec fwd = function
| Some v when f v ->
if push o v then schedule_read t ~f:fwd |> ignore else drop o
| Some _ -> schedule_read t ~f:fwd |> ignore
| None -> drop o
in
schedule_read t ~f:fwd |> ignore;
o
let iter t ~f =
let cancel = ref stub in
let rec iter' = function
| Some v ->
f (Some v);
cancel := schedule_read t ~f:iter'
| None -> f None
in
cancel := schedule_read t ~f:iter';
fun () -> !cancel ()
let rec merge sinks =
let ks = ref @@ ((1 lsl List.length sinks) - 1) in
let o = make ~clone:(fun () -> List.map clone sinks |> merge) () in
let c = Array.make (List.length sinks) stub in
let cancel () = Array.iter (fun c -> c ()) c in
let rec fwd i k = function
| Some v ->
if push o v then c.(i) <- schedule_read k ~f:(fwd i k)
else (
cancel ();
List.iter drop sinks)
| None ->
ks := !ks land lnot (1 lsl i);
if !ks = 0 then drop o
in
List.iteri (fun i k -> c.(i) <- schedule_read k ~f:(fwd i k)) sinks;
o
module Lwt = struct
let read t =
let p, u = Lwt.task () in
let cancel = schedule_read t ~f:(Lwt.wakeup_later u) in
Lwt.on_cancel p cancel;
p
let iter_s ~f t =
let open Lwt.Infix in
let rec iter () =
read t >>= function Some v -> f v >>= iter | None -> Lwt.return_unit
in
iter ()
let iter_p ~f t =
let open Lwt.Infix in
let rec cleanup ?(acc = []) = function
| [] -> Ok acc
| p :: ps -> (
match Lwt.state p with
| Lwt.Fail e -> Error e
| Lwt.Sleep -> cleanup ~acc:(p :: acc) ps
| Lwt.Return _ -> cleanup ~acc ps)
in
let rec join ?(acc = []) () =
read t >>= function
| None -> Lwt.join acc
| Some v -> (
match cleanup acc with
| Ok acc -> join ~acc:(f v :: acc) ()
| Error exn -> Lwt.fail exn)
in
join ()
end
end
module Source = struct
type 'a t = 'a source
let sinks t =
Seq.(0 --^ Weak.length t.sinks) |> Seq.filter_map (Weak.get t.sinks)
let write t x =
if t.closed then false
else
sinks t
|> Seq.filter (fun s -> not @@ s.dropped)
|> Seq.fold (fun flag s -> flag || Sink.push s x) false
let close t =
t.closed <- true;
sinks t |> Seq.iter Sink.drop
end
let grow t =
let len = Weak.length t.sinks in
if Obj.Ephemeron.max_ephe_length - len < len then
failwith "reached maximum number of sinks";
let w = Weak.create (len * 2) in
Weak.blit t.sinks 0 w 0 len;
t.sinks <- w
let get_free_sink_ref t =
let len = Weak.length t.sinks in
let rec f' ?(i = 0) w =
if i >= len then None
else
match Weak.get w i with
| None -> Some i
| Some c when c.dropped -> Some i
| Some _ -> f' ~i:(i + 1) w
in
match f' t.sinks with
| Some i -> i
| None ->
grow t;
len
let create () =
let sinks = Weak.create 4 in
let source = { sinks; closed = false } in
let rec make_sink () =
let r = get_free_sink_ref source in
let sink =
{
readers = Ke.Fke.empty;
buf = Ke.Fke.empty;
dropped = false;
clone = make_sink;
}
in
Gc.finalise
(fun k ->
Printf.printf "gc'ing sink with %d buffered elements and %d readers\n%!"
(Ke.Fke.length k.buf) (Ke.Fke.length k.readers);
Sink.drop k)
sink;
Weak.set source.sinks r (Some sink);
sink
in
(source, make_sink ())