This file is indexed.

/usr/lib/ocaml/lwt/lwt_stream.mli is in liblwt-ocaml-dev 2.7.1-4build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
(* Lightweight thread library for OCaml
 * http://www.ocsigen.org/lwt
 * Module Lwt_stream
 * Copyright (C) 2009 Jérémie Dimino
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation, with linking exceptions;
 * either version 2.1 of the License, or (at your option) any later
 * version. See COPYING file for details.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
 * 02111-1307, USA.
 *)

(** Data streams *)

type 'a t
(** A stream holding values of type ['a].

    Naming convention: in this module, all functions applying a function
    to each element of a stream are suffixed by:

    - [_s] when the function returns a thread and calls are serialised
    - [_p] when the function returns a thread and calls are parallelised

    This module may undergo redesign or deprecation in the future. See
    {{:https://github.com/ocsigen/lwt/issues/250} Flaws of [Lwt_stream]}. In
    the meantime, you may want to consider using alternatives such as
    {{:https://github.com/c-cube/lwt-pipe} lwt-pipe}. *)

(** {2 Construction} *)

val from : (unit -> 'a option Lwt.t) -> 'a t
  (** [from f] creates a stream from the given input function. [f] is
      called each time more input is needed, and the stream ends when
      [f] returns [None].

      If [f], or the thread produced by [f], raises an exception, that exception
      is forwarded to the consumer of the stream (for example, a caller of
      {!get}). Note that this does not end the stream. A subsequent attempt to
      read from the stream will cause another call to [f], which may succeed
      with a value. *)

val from_direct : (unit -> 'a option) -> 'a t
  (** [from_direct f] does the same as {!from} but with a function
      that does not return a thread. It is preferred that this
      function be used rather than wrapping [f] into a function which
      returns a thread.

      The behavior when [f] raises an exception is the same as for {!from},
      except that [f] does not produce a thread. *)

exception Closed
  (** Exception raised by the push function of a push-stream when
      pushing an element after the end of stream ([= None]) has been
      pushed. *)

val create : unit -> 'a t * ('a option -> unit)
  (** [create ()] returns a new stream and a push function.

      To notify the stream's consumer of errors, either use a separate
      communication channel, or use a
      {{:http://caml.inria.fr/pub/docs/manual-ocaml/libref/Pervasives.html#TYPEresult}
      [result]} stream. There is no way to push an exception into a
      push-stream. *)

val create_with_reference : unit -> 'a t * ('a option -> unit) * ('b -> unit)
  (** [create_with_reference ()] returns a new stream and a push
      function. The last function allows a reference to be set to an
      external source. This prevents the external source from being
      garbage collected.

      For example, to convert a reactive event to a stream:

      {[
        let stream, push, set_ref = Lwt_stream.create_with_reference () in
        set_ref (map_event push event)
      ]}
  *)

exception Full
  (** Exception raised by the push function of a bounded push-stream
      when the stream queue is full and a thread is already waiting to
      push an element. *)

(** Type of sources for bounded push-streams. *)
class type ['a] bounded_push = object
  method size : int
    (** Size of the stream. *)

  method resize : int -> unit
    (** Change the size of the stream queue. Note that the new size
        can smaller than the current stream queue size.

        It raises [Invalid_argument] if [size < 0]. *)

  method push : 'a -> unit Lwt.t
    (** Pushes a new element to the stream. If the stream is full then
        it will block until one element is consumed. If another thread
        is already blocked on {!push}, it raises {!Full}. *)

  method close : unit
    (** Closes the stream. Any thread currently blocked on {!push}
        fails with {!Closed}. *)

  method count : int
    (** Number of elements in the stream queue. *)

  method blocked : bool
    (** Is a thread is blocked on {!push} ? *)

  method closed : bool
    (** Is the stream closed ? *)

  method set_reference : 'a. 'a -> unit
    (** Set the reference to an external source. *)
end

val create_bounded : int -> 'a t * 'a bounded_push
  (** [create_bounded size] returns a new stream and a bounded push
      source. The stream can hold a maximum of [size] elements.  When
      this limit is reached, pushing a new element will block until
      one is consumed.

      Note that you cannot clone or parse (with {!parse}) a bounded
      stream. These functions will raise [Invalid_argument] if you try
      to do so.

      It raises [Invalid_argument] if [size < 0]. *)

val of_list : 'a list -> 'a t
(** [of_list l] creates a stream returning all elements of [l]. The elements are
    pushed into the stream immediately, resulting in a closed stream (in the
    sense of {!is_closed}). *)

val of_array : 'a array -> 'a t
(** [of_array a] creates a stream returning all elements of [a]. The elements
    are pushed into the stream immediately, resulting in a closed stream (in the
    sense of {!is_closed}). *)

val of_string : string -> char t
(** [of_string str] creates a stream returning all characters of [str]. The
    characters are pushed into the stream immediately, resulting in a closed
    stream (in the sense of {!is_closed}). *)

val clone : 'a t -> 'a t
  (** [clone st] clone the given stream. Operations on each stream
      will not affect the other.

      For example:

      {[
        # let st1 = Lwt_stream.of_list [1; 2; 3];;
        val st1 : int Lwt_stream.t = <abstr>
        # let st2 = Lwt_stream.clone st1;;
        val st2 : int Lwt_stream.t = <abstr>
        # lwt x = Lwt_stream.next st1;;
        val x : int = 1
        # lwt y = Lwt_stream.next st2;;
        val y : int = 1
      ]}

      It raises [Invalid_argument] if [st] is a bounded
      push-stream. *)

(** {2 Destruction} *)

val to_list : 'a t -> 'a list Lwt.t
  (** Returns the list of elements of the given stream *)

val to_string : char t -> string Lwt.t
  (** Returns the word composed of all characters of the given
      stream *)

(** {2 Data retrieval} *)

exception Empty
  (** Exception raised when trying to retrieve data from an empty
      stream. *)

val peek : 'a t -> 'a option Lwt.t
  (** [peek st] returns the first element of the stream, if any,
      without removing it. *)

val npeek : int -> 'a t -> 'a list Lwt.t
  (** [npeek n st] returns at most the first [n] elements of [st],
      without removing them. *)

val get : 'a t -> 'a option Lwt.t
  (** [get st] removes and returns the first element of the stream, if
      any. *)

val nget : int -> 'a t -> 'a list Lwt.t
  (** [nget n st] removes and returns at most the first [n] elements of
      [st]. *)

val get_while : ('a -> bool) -> 'a t -> 'a list Lwt.t
val get_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a list Lwt.t
  (** [get_while f st] returns the longest prefix of [st] where all
      elements satisfy [f]. *)

val next : 'a t -> 'a Lwt.t
  (** [next st] removes and returns the next element of the stream or
      fails with {!Empty}, if the stream is empty. *)

val last_new : 'a t -> 'a Lwt.t
  (** [last_new st] returns the last element that can be obtained
      without sleeping, or wait for one if none is available.

      It fails with {!Empty} if the stream has no more elements. *)

val junk : 'a t -> unit Lwt.t
  (** [junk st] removes the first element of [st]. *)

val njunk : int -> 'a t -> unit Lwt.t
  (** [njunk n st] removes at most the first [n] elements of the
      stream. *)

val junk_while : ('a -> bool) -> 'a t -> unit Lwt.t
val junk_while_s : ('a -> bool Lwt.t) -> 'a t -> unit Lwt.t
  (** [junk_while f st] removes all elements at the beginning of the
      streams which satisfy [f]. *)

val junk_old : 'a t -> unit Lwt.t
  (** [junk_old st] removes all elements that are ready to be read
      without yielding from [st].

      For example, the [read_password] function of [Lwt_read_line]
      uses it to flush keys previously typed by the user.
   *)

val get_available : 'a t -> 'a list
  (** [get_available st] returns all available elements of [l] without
      blocking. *)

val get_available_up_to : int -> 'a t -> 'a list
  (** [get_available_up_to n st] returns up to [n] elements of [l]
      without blocking. *)

val is_empty : 'a t -> bool Lwt.t
  (** [is_empty st] returns whether the given stream is empty. *)

val is_closed : 'a t -> bool
  (** [is_closed st] returns whether the given stream has been closed. A closed
      stream is not necessarily empty. It may still contain unread elements. If
      [is_closed s = true], then all subsequent reads until the end of the
      stream are guaranteed not to block.

      @since 2.6.0 *)

val closed : 'a t -> unit Lwt.t
  (** [closed st] returns a thread that will sleep until the stream has been
      closed.

      @since 2.6.0 *)

val on_termination : 'a t -> (unit -> unit) -> unit
  [@@ocaml.deprecated " Bind on Lwt_stream.closed."]
  (** [on_termination st f] executes [f] when the end of the stream [st]
      is reached. Note that the stream may still contain elements if
      {!peek} or similar was used.

      @deprecated Use {!closed}. *)

val on_terminate : 'a t -> (unit -> unit) -> unit
  [@@ocaml.deprecated " Bind on Lwt_stream.closed."]
  (** Same as {!on_termination}.

      @deprecated Use {!closed}. *)

(** {2 Stream transversal} *)

(** Note: all the following functions are destructive.

    For example:

    {[
      # let st1 = Lwt_stream.of_list [1; 2; 3];;
      val st1 : int Lwt_stream.t = <abstr>
      # let st2 = Lwt_stream.map string_of_int st1;;
      val st2 : string Lwt_stream.t = <abstr>
      # lwt x = Lwt_stream.next st1;;
      val x : int = 1
      # lwt y = Lwt_stream.next st2;;
      val y : string = "2"
    ]}
*)

val choose : 'a t list -> 'a t
  (** [choose l] creates an stream from a list of streams. The
      resulting stream will return elements returned by any stream of
      [l] in an unspecified order. *)

val map : ('a -> 'b) -> 'a t -> 'b t
val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t
  (** [map f st] maps the value returned by [st] with [f] *)

val filter : ('a -> bool) -> 'a t -> 'a t
val filter_s : ('a -> bool Lwt.t) -> 'a t -> 'a t
  (** [filter f st] keeps only values, [x], such that [f x] is [true] *)

val filter_map : ('a -> 'b option) -> 'a t -> 'b t
val filter_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b t
  (** [filter_map f st] filter and map [st] at the same time *)

val map_list : ('a -> 'b list) -> 'a t -> 'b t
val map_list_s : ('a -> 'b list Lwt.t) -> 'a t -> 'b t
  (** [map_list f st] applies [f] on each element of [st] and flattens
      the lists returned *)

val fold : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b Lwt.t
val fold_s : ('a -> 'b -> 'b Lwt.t) -> 'a t -> 'b -> 'b Lwt.t
  (** [fold f s x] fold_like function for streams. *)

val iter : ('a -> unit) -> 'a t -> unit Lwt.t
val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t
  (** [iter f s] iterates over all elements of the stream. *)

val find : ('a -> bool) -> 'a t -> 'a option Lwt.t
val find_s : ('a -> bool Lwt.t) -> 'a t -> 'a option Lwt.t
  (** [find f s] find an element in a stream. *)

val find_map : ('a -> 'b option) -> 'a t -> 'b option Lwt.t
val find_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b option Lwt.t
  (** [find_map f s] find and map at the same time. *)

val combine : 'a t -> 'b t -> ('a * 'b) t
  (** [combine s1 s2] combines two streams. The stream will end when
      either stream ends. *)

val append : 'a t -> 'a t -> 'a t
  (** [append s1 s2] returns a stream which returns all elements of
      [s1], then all elements of [s2] *)

val concat : 'a t t -> 'a t
  (** [concat st] returns the concatenation of all streams of [st]. *)

val flatten : 'a list t -> 'a t
  (** [flatten st = map_list (fun l -> l) st] *)

val wrap_exn : 'a t -> 'a Lwt.result t
(** [wrap_exn s] is a stream [s'] such that each time [s] yields a value [v],
    [s'] yields [Result.Ok v], and when the source of [s] raises an exception
    [e], [s'] yields [Result.Error e].

    Note that push-streams (as returned by {!create}) never raise exceptions.

    If the stream source keeps raising the same exception [e] each time the
    stream is read, [s'] is unbounded. Reading it will produce [Result.Error e]
    indefinitely.

    @since 2.7.0 *)

(** {2 Parsing} *)

val parse : 'a t -> ('a t -> 'b Lwt.t) -> 'b Lwt.t
  (** [parse st f] parses [st] with [f]. If [f] raise an exception,
      [st] is restored to its previous state.

      It raises [Invalid_argument] if [st] is a bounded
      push-stream. *)

(** {2 Misc} *)

val hexdump : char t -> string t
  (** [hexdump byte_stream] returns a stream which is the same as the
      output of [hexdump -C].

      Basically, here is a simple implementation of [hexdump -C]:

      {[
        let () = Lwt_main.run (Lwt_io.write_lines Lwt_io.stdout (Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin)))
      ]}
  *)

(** {2 Deprecated} *)

type 'a result =
  | Value of 'a
  | Error of exn
  [@@ocaml.deprecated
" This type is being replaced by Lwt.result and the corresponding function
 Lwt_stream.wrap_exn."]
(** A value or an error.

    @deprecated Replaced by {!wrap_exn}, which uses {!Lwt.result}. *)

[@@@ocaml.warning "-3"]
val map_exn : 'a t -> 'a result t
  [@@ocaml.deprecated " Use Lwt_stream.wrap_exn"]
(** [map_exn s] returns a stream that captures all exceptions raised
    by the source of the stream (the function passed to {!from}).

    Note that for push-streams (as returned by {!create}) all
    elements of the mapped streams are values.

    If the stream source keeps raising the same exception [e] each time the
    stream is read, the stream produced by [map_exn] is unbounded. Reading it
    will produce [Lwt_stream.Error e] indefinitely.

    @deprecated Use {!wrap_exn}. *)

[@@@ocaml.warning "+3"]