Streaming.SinkModule with defintions for sinks.
Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can aquire resources that are guaranteed to be terminated when the sink is filled.
Sinks are a great way to define decoupled consumers that can be filled with Stream.into.
Sinks are independent from sources and streams. You can think of them as packed arguments for folding functions with early termination. Formally, they can also be interpreted as Moore machine.
type ('a, 'b) t = ('a, 'b) sinkType for sinks that consume elements of type 'a and, once done, produce a value of type 'b.
Implementing custom sinks is useful to create a collection of reusable streaming consumers for your application.
The following example demonstrates a sink that consumes all elements into a list:
let list_sink =
let init () = [] in
let push acc x = x :: acc in
let stop acc = List.rev acc in
Sink.make ~init ~push ~stop ()Alternatively, existing list/array/string/queue sinks, or others listed below, can be used.
val fill : 'r -> ('a, 'r) tfill result use result to fill the sink. This sink will not consume any input and will immediately produce result when used.
val fold : ('r -> 'a -> 'r) -> 'r -> ('a, 'r) tfold f init is a sink that reduces all input elements with the stepping function f starting with the accumulator value init.
val fold_while : ('r -> bool) -> ('r -> 'a -> 'r) -> 'r -> ('a, 'r) tfold_while full f init is similar to fold but can terminate early if full returns true.
val make : init:(unit -> 'acc) -> push:('acc -> 'a -> 'acc) -> ?full:('acc -> bool) ->
stop:('acc -> 'r) -> unit -> ('a, 'r) tCreates a sink from a function that initializes a state value, a stepping function to update that state and a stop function that produces the final result value. Optionally a full function can be passed to decide when the sink should terminate early.
Note: The calls to full should be cheap as this function will be called to avoid allocation of unnecessary resources. If the computation required to decide if the sink is full is expensive, consider caching it whenever possible.
val full : ('a, unit) tA full sink that will not consume any input and will not produce any results.
val is_full : ('a, 'r) t -> boolis_full sink is true if sink is full. Full sinks do not consume any elements but will be initialised to determine if they are full.
val is_empty : ('a, bool) tis_empty is a sink that produces true when it is stopped without consuming any elements.
push x sink updates sink's internal state by pushing x into it. The internal sink's state will be initialised.
val each : ('a -> unit) -> ('a, unit) tApplies an effectful action to all input elements producing nothing.
val len : ('a, int) tConsumes and counts all input elements.
val first : ('a, 'a option) tThe first input element, or None if the sink did not receive enough input.
Equivalent to nth 0.
val last : ('a, 'a option) tThe last input element, or None if the sink did not receive enough input.
val nth : int -> ('a, 'a option) tThe n-th input element, or None if the sink did not receive enough input.
val drain : ('a, unit) tConsumes all elements producing nothing. Useful for triggering actions in effectful streams.
val contains : where:('a -> bool) -> ('a, bool) tcontains ~where:pred finds the first element that satisfies pred returning None if there is no such element.
val find : where:('a -> bool) -> ('a, 'a option) tfind ~where:pred finds the first element that satisfies pred returning None if there is no such element.
val index : where:('a -> bool) -> ('a, int option) tSimilar to find but returns the index of the element that satisfies the predicate.
val minimum : by:('a -> 'a -> bool) -> ('a, 'a option) tFinds the minimum element in the sequence, using the given predicate as as the comparison between the input elements.
val maximum : by:('a -> 'a -> bool) -> ('a, 'a option) tFinds the maximum element in the sequence, using the given predicate as as the comparison between the input elements.
val all : where:('a -> bool) -> ('a, bool) tall ~where:pred is true if all input element satisfy pred. Will stop consuming elements when the first element that does not satisfy pred is found. Results in true for empty input.
val any : where:('a -> bool) -> ('a, bool) tany ~where:pred is true if at least one input element satisfies pred. Will stop consuming elements when such an element is found. Results in false for empty input.
val list : ('a, 'a list) tPuts all input elements into a list.
val list_rev : ('a, 'a list) tPuts all input elements into a list in the reverse order. Faster than list as no reveral is performed.
val array : ('a, 'a array) tPuts all input elements into an array.
val queue : ('a, 'a Stdlib.Queue.t) tPuts all input elements into a queue.
val string : (string, string) tConsumes and concatenates strings.
val bytes : (bytes, bytes) tConsumes and concatenates bytes.
val print : (string, unit) tPrints all input string elements to standard output as lines.
val file : string -> (string, unit) tfile path is a sink that writes input strings as lines into a file located at path.
val stdout : (string, unit) tA sink that writes input strings as lines to STDOUT.
val stderr : (string, unit) tA sink that writes input strings as lines to STDERR.
val sum : (int, int) tAdds all input integer values.
val product : (int, int) tProduct of input integer values. Stops if any input element is 0.
val mean : (float, float) tComputes a numerically stable arithmetic mean of all input elements.
zip left right computes both left and right at the same time with the same input being sent to both sinks. The results of both sinks are produced.
both left right computes both left and right at the same time with the same input. Alias for zip.
zip_left left right similar to zip, but only produces the result of the left sink.
zip_left left right similar to zip, but only produces the result of the right sink.
zip_with f left right similar to zip, but applies an aggregation function to results produced by left and right.
left <&> right is an operator version of zip left right.
left <& right is an operator version of zip_left left right.
left &> right is an operator version of zip_right left right.
many sinks computes all sinks at the same time with the same input. All sinks must have produce the same result type.
The sinks will be computed until all sinks are full or until the sink is stopped explicitly.
unzip left right is a sink that receives pairs 'a * 'b, sending the first element into left and the second into right. Both sinks are computed at the same time and their results returned as an output pair.
The sink becomes full when either left or right get full.
unzip_left left right is similar to unzip, but only produces the result of the left sink.
If right terminates first, left will be forced to terminate.
unzip_left left right is similar to unzip, but only produces the result of the right sink.
If left terminates first, right will be forced to terminate.
unzip_with f left right similar to unzip, but applies an aggregation function to results produced by left and right.
left <*> right is an operator version of unzip left right.
left <* right is an operator version of unzip_left left right.
left *> right is an operator version of unzip_right left right.
distribute left right is similar to zip but distributes the consumed elements over left and right alternating in a round robin fashion.
Type for race result values.
race left right runs both left and right sinks at the same time producing the result for the one that fills first.
If the sink is terminated prematurely, before either left or right are filled, Both of their values are produced.
Examples
let sink = Sink.(race (find ~where:(fun x -> x > 10)) (nth 8)) in
let result = Stream.of_list [1; 9; 0; 8; 30; 4] |> Stream.into sink in
assert (result = Sink.Left (Some 30))left <|> right is the operator version of race left right.
seq left right runs left and then right sequentially producing both of their results.
If the resulting sink is stopped before right was started, it will be forced to initialize and terminate.
seq_left left right is similar to seq, but only produces the result of the left sink.
seq_right left right is similar to seq, but only produces the result of the right sink.
left <+> right is an operator version of seq left right.
left <+ right is an operator version of seq_left left right.
left +> right is an operator version of seq_right left right.
flat_map f sink is a sink produced by applying the result of sink to f. This allows to sequence multiple sinks by inspecting the result value produced by the previous sinks.
flat_map is defined as the let* operator in Syntax.
Note: If sink is terminated exhausting the entire input, the sink produced by f will be initialized and immediately forced to terminate.
# let large_values () = Sink.flat_map
(function
| Some x -> Printf.printf "Found value: %d\n" x; Sink.list
| None -> Printf.printf "No value found\n"; Sink.fill [])
(Sink.find ~where:(fun x -> x > 100))
val sink1 : unit -> (int, int list) sink = <fun>
# let values = Stream.(90 -< 105) |> Stream.into (sink1 ())
Found value: 101
val values : int list = [102; 103; 104]sink >>= f is an operator version of flat_map f sink.
map f sink is a sink sink with the result transformed with f.
premap f sink is a sink that premaps the input values.
Examples
If sink consumes integers, but we have an input with strings, we can provide a conversion from strings to integers to premap:
let sink = Sink.(premap int_of_string sum) in
let result = Stream.of_list ["1"; "2"; "3"] |> Stream.into sink in
assert (result = 6)prefilter predicate sink is a sink that filter the input value for sink.
prefilter_map f sink applies f to every input value x of sink, discarding it if f x produces None, and keeping the transformed value otherwise.
val stop : ('a, 'r) t -> 'rStop the sink and produce the currently accumulated result. Any internal state will be terminated. Terminating an uninitialized sink will initialize it.
In addition to using the sinks and operations defined above, it is possible to create sinks with a convenient (let) notation.
A common example of a composed sink is the sink that computes the arithmetic mean:
let mean =
let open Sink.Syntax in
let+ total = Sink.sum
and+ count = Sink.len in
total / countThe resulting sink has type (int, int) sink and will only consume the input once!
module Syntax : sig ... endModule with syntax definitions for sinks.
module Functor : sig ... endModule that implements the "Functor" interface.
module Applicative : sig ... endModule that implements the "Applicative" interface.