/usr/lib/ocaml/lwt/lwt_stream.mli is in liblwt-ocaml-dev 2.7.1-4build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 | (* Lightweight thread library for OCaml
* http://www.ocsigen.org/lwt
* Module Lwt_stream
* Copyright (C) 2009 Jérémie Dimino
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, with linking exceptions;
* either version 2.1 of the License, or (at your option) any later
* version. See COPYING file for details.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
* 02111-1307, USA.
*)
(** Data streams *)
type 'a t
(** A stream holding values of type ['a].
Naming convention: in this module, all functions applying a function
to each element of a stream are suffixed by:
- [_s] when the function returns a thread and calls are serialised
- [_p] when the function returns a thread and calls are parallelised
This module may undergo redesign or deprecation in the future. See
{{:https://github.com/ocsigen/lwt/issues/250} Flaws of [Lwt_stream]}. In
the meantime, you may want to consider using alternatives such as
{{:https://github.com/c-cube/lwt-pipe} lwt-pipe}. *)
(** {2 Construction} *)
val from : (unit -> 'a option Lwt.t) -> 'a t
(** [from f] creates a stream from the given input function. [f] is
called each time more input is needed, and the stream ends when
[f] returns [None].
If [f], or the thread produced by [f], raises an exception, that exception
is forwarded to the consumer of the stream (for example, a caller of
{!get}). Note that this does not end the stream. A subsequent attempt to
read from the stream will cause another call to [f], which may succeed
with a value. *)
val from_direct : (unit -> 'a option) -> 'a t
(** [from_direct f] does the same as {!from} but with a function
that does not return a thread. It is preferred that this
function be used rather than wrapping [f] into a function which
returns a thread.
The behavior when [f] raises an exception is the same as for {!from},
except that [f] does not produce a thread. *)
exception Closed
(** Exception raised by the push function of a push-stream when
pushing an element after the end of stream ([= None]) has been
pushed. *)
val create : unit -> 'a t * ('a option -> unit)
(** [create ()] returns a new stream and a push function.
To notify the stream's consumer of errors, either use a separate
communication channel, or use a
{{:http://caml.inria.fr/pub/docs/manual-ocaml/libref/Pervasives.html#TYPEresult}
[result]} stream. There is no way to push an exception into a
push-stream. *)
val create_with_reference : unit -> 'a t * ('a option -> unit) * ('b -> unit)
(** [create_with_reference ()] returns a new stream and a push
function. The last function allows a reference to be set to an
external source. This prevents the external source from being
garbage collected.
For example, to convert a reactive event to a stream:
{[
let stream, push, set_ref = Lwt_stream.create_with_reference () in
set_ref (map_event push event)
]}
*)
exception Full
(** Exception raised by the push function of a bounded push-stream
when the stream queue is full and a thread is already waiting to
push an element. *)
(** Type of sources for bounded push-streams. *)
class type ['a] bounded_push = object
method size : int
(** Size of the stream. *)
method resize : int -> unit
(** Change the size of the stream queue. Note that the new size
can smaller than the current stream queue size.
It raises [Invalid_argument] if [size < 0]. *)
method push : 'a -> unit Lwt.t
(** Pushes a new element to the stream. If the stream is full then
it will block until one element is consumed. If another thread
is already blocked on {!push}, it raises {!Full}. *)
method close : unit
(** Closes the stream. Any thread currently blocked on {!push}
fails with {!Closed}. *)
method count : int
(** Number of elements in the stream queue. *)
method blocked : bool
(** Is a thread is blocked on {!push} ? *)
method closed : bool
(** Is the stream closed ? *)
method set_reference : 'a. 'a -> unit
(** Set the reference to an external source. *)
end
val create_bounded : int -> 'a t * 'a bounded_push
(** [create_bounded size] returns a new stream and a bounded push
source. The stream can hold a maximum of [size] elements. When
this limit is reached, pushing a new element will block until
one is consumed.
Note that you cannot clone or parse (with {!parse}) a bounded
stream. These functions will raise [Invalid_argument] if you try
to do so.
It raises [Invalid_argument] if [size < 0]. *)
val of_list : 'a list -> 'a t
(** [of_list l] creates a stream returning all elements of [l]. The elements are
pushed into the stream immediately, resulting in a closed stream (in the
sense of {!is_closed}). *)
val of_array : 'a array -> 'a t
(** [of_array a] creates a stream returning all elements of [a]. The elements
are pushed into the stream immediately, resulting in a closed stream (in the
sense of {!is_closed}). *)
val of_string : string -> char t
(** [of_string str] creates a stream returning all characters of [str]. The
characters are pushed into the stream immediately, resulting in a closed
stream (in the sense of {!is_closed}). *)
val clone : 'a t -> 'a t
(** [clone st] clone the given stream. Operations on each stream
will not affect the other.
For example:
{[
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.clone st1;;
val st2 : int Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : int = 1
]}
It raises [Invalid_argument] if [st] is a bounded
push-stream. *)
(** {2 Destruction} *)
val to_list : 'a t -> 'a list Lwt.t
(** Returns the list of elements of the given stream *)
val to_string : char t -> string Lwt.t
(** Returns the word composed of all characters of the given
stream *)
(** {2 Data retrieval} *)
exception Empty
(** Exception raised when trying to retrieve data from an empty
stream. *)
val peek : 'a t -> 'a option Lwt.t
(** [peek st] returns the first element of the stream, if any,
without removing it. *)
val npeek : int -> 'a t -> 'a list Lwt.t
(** [npeek n st] returns at most the first [n] elements of [st],
without removing them. *)
val get : 'a t -> 'a option Lwt.t
(** [get st] removes and returns the first element of the stream, if
any. *)
val nget : int -> 'a t -> 'a list Lwt.t
(** [nget n st] removes and returns at most the first [n] elements of
[st]. *)
val get_while : ('a -> bool) -> 'a t -> 'a list Lwt.t
val get_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a list Lwt.t
(** [get_while f st] returns the longest prefix of [st] where all
elements satisfy [f]. *)
val next : 'a t -> 'a Lwt.t
(** [next st] removes and returns the next element of the stream or
fails with {!Empty}, if the stream is empty. *)
val last_new : 'a t -> 'a Lwt.t
(** [last_new st] returns the last element that can be obtained
without sleeping, or wait for one if none is available.
It fails with {!Empty} if the stream has no more elements. *)
val junk : 'a t -> unit Lwt.t
(** [junk st] removes the first element of [st]. *)
val njunk : int -> 'a t -> unit Lwt.t
(** [njunk n st] removes at most the first [n] elements of the
stream. *)
val junk_while : ('a -> bool) -> 'a t -> unit Lwt.t
val junk_while_s : ('a -> bool Lwt.t) -> 'a t -> unit Lwt.t
(** [junk_while f st] removes all elements at the beginning of the
streams which satisfy [f]. *)
val junk_old : 'a t -> unit Lwt.t
(** [junk_old st] removes all elements that are ready to be read
without yielding from [st].
For example, the [read_password] function of [Lwt_read_line]
uses it to flush keys previously typed by the user.
*)
val get_available : 'a t -> 'a list
(** [get_available st] returns all available elements of [l] without
blocking. *)
val get_available_up_to : int -> 'a t -> 'a list
(** [get_available_up_to n st] returns up to [n] elements of [l]
without blocking. *)
val is_empty : 'a t -> bool Lwt.t
(** [is_empty st] returns whether the given stream is empty. *)
val is_closed : 'a t -> bool
(** [is_closed st] returns whether the given stream has been closed. A closed
stream is not necessarily empty. It may still contain unread elements. If
[is_closed s = true], then all subsequent reads until the end of the
stream are guaranteed not to block.
@since 2.6.0 *)
val closed : 'a t -> unit Lwt.t
(** [closed st] returns a thread that will sleep until the stream has been
closed.
@since 2.6.0 *)
val on_termination : 'a t -> (unit -> unit) -> unit
[@@ocaml.deprecated " Bind on Lwt_stream.closed."]
(** [on_termination st f] executes [f] when the end of the stream [st]
is reached. Note that the stream may still contain elements if
{!peek} or similar was used.
@deprecated Use {!closed}. *)
val on_terminate : 'a t -> (unit -> unit) -> unit
[@@ocaml.deprecated " Bind on Lwt_stream.closed."]
(** Same as {!on_termination}.
@deprecated Use {!closed}. *)
(** {2 Stream transversal} *)
(** Note: all the following functions are destructive.
For example:
{[
# let st1 = Lwt_stream.of_list [1; 2; 3];;
val st1 : int Lwt_stream.t = <abstr>
# let st2 = Lwt_stream.map string_of_int st1;;
val st2 : string Lwt_stream.t = <abstr>
# lwt x = Lwt_stream.next st1;;
val x : int = 1
# lwt y = Lwt_stream.next st2;;
val y : string = "2"
]}
*)
val choose : 'a t list -> 'a t
(** [choose l] creates an stream from a list of streams. The
resulting stream will return elements returned by any stream of
[l] in an unspecified order. *)
val map : ('a -> 'b) -> 'a t -> 'b t
val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t
(** [map f st] maps the value returned by [st] with [f] *)
val filter : ('a -> bool) -> 'a t -> 'a t
val filter_s : ('a -> bool Lwt.t) -> 'a t -> 'a t
(** [filter f st] keeps only values, [x], such that [f x] is [true] *)
val filter_map : ('a -> 'b option) -> 'a t -> 'b t
val filter_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b t
(** [filter_map f st] filter and map [st] at the same time *)
val map_list : ('a -> 'b list) -> 'a t -> 'b t
val map_list_s : ('a -> 'b list Lwt.t) -> 'a t -> 'b t
(** [map_list f st] applies [f] on each element of [st] and flattens
the lists returned *)
val fold : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b Lwt.t
val fold_s : ('a -> 'b -> 'b Lwt.t) -> 'a t -> 'b -> 'b Lwt.t
(** [fold f s x] fold_like function for streams. *)
val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
(** [iter f s] iterates over all elements of the stream. *)
val find : ('a -> bool) -> 'a t -> 'a option Lwt.t
val find_s : ('a -> bool Lwt.t) -> 'a t -> 'a option Lwt.t
(** [find f s] find an element in a stream. *)
val find_map : ('a -> 'b option) -> 'a t -> 'b option Lwt.t
val find_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b option Lwt.t
(** [find_map f s] find and map at the same time. *)
val combine : 'a t -> 'b t -> ('a * 'b) t
(** [combine s1 s2] combines two streams. The stream will end when
either stream ends. *)
val append : 'a t -> 'a t -> 'a t
(** [append s1 s2] returns a stream which returns all elements of
[s1], then all elements of [s2] *)
val concat : 'a t t -> 'a t
(** [concat st] returns the concatenation of all streams of [st]. *)
val flatten : 'a list t -> 'a t
(** [flatten st = map_list (fun l -> l) st] *)
val wrap_exn : 'a t -> 'a Lwt.result t
(** [wrap_exn s] is a stream [s'] such that each time [s] yields a value [v],
[s'] yields [Result.Ok v], and when the source of [s] raises an exception
[e], [s'] yields [Result.Error e].
Note that push-streams (as returned by {!create}) never raise exceptions.
If the stream source keeps raising the same exception [e] each time the
stream is read, [s'] is unbounded. Reading it will produce [Result.Error e]
indefinitely.
@since 2.7.0 *)
(** {2 Parsing} *)
val parse : 'a t -> ('a t -> 'b Lwt.t) -> 'b Lwt.t
(** [parse st f] parses [st] with [f]. If [f] raise an exception,
[st] is restored to its previous state.
It raises [Invalid_argument] if [st] is a bounded
push-stream. *)
(** {2 Misc} *)
val hexdump : char t -> string t
(** [hexdump byte_stream] returns a stream which is the same as the
output of [hexdump -C].
Basically, here is a simple implementation of [hexdump -C]:
{[
let () = Lwt_main.run (Lwt_io.write_lines Lwt_io.stdout (Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin)))
]}
*)
(** {2 Deprecated} *)
type 'a result =
| Value of 'a
| Error of exn
[@@ocaml.deprecated
" This type is being replaced by Lwt.result and the corresponding function
Lwt_stream.wrap_exn."]
(** A value or an error.
@deprecated Replaced by {!wrap_exn}, which uses {!Lwt.result}. *)
[@@@ocaml.warning "-3"]
val map_exn : 'a t -> 'a result t
[@@ocaml.deprecated " Use Lwt_stream.wrap_exn"]
(** [map_exn s] returns a stream that captures all exceptions raised
by the source of the stream (the function passed to {!from}).
Note that for push-streams (as returned by {!create}) all
elements of the mapped streams are values.
If the stream source keeps raising the same exception [e] each time the
stream is read, the stream produced by [map_exn] is unbounded. Reading it
will produce [Lwt_stream.Error e] indefinitely.
@deprecated Use {!wrap_exn}. *)
[@@@ocaml.warning "+3"]
|