/usr/lib/ocaml/equeue/uq_engines.mli is in libocamlnet-ocaml-dev 4.1.2-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 | (*
* $Id$
*)
(** An {b engine} performs a certain task in an autonomous way. Engines
* are attached to a {!Unixqueue.event_system}, and do their task by
* generating events for resources of the operating system, and
* by handling such events. Engines are in one of four states: They
* may be still {b working}, they may be {b done}, they may be
* {b aborted}, or they may be in an {b error} state. The three latter
* states a called {b final states}, because they indicate that the
* engine has stopped operation.
*
* It is possible to ask an engine to notify another object when it
* changes its state. For simplicity, notification is done by invoking
* a callback function, and not by issuing notification events.
*
* Effectively, engines provide a calculus for cooperative microthreading.
* This calculus includes combinators for sequential execution and
* synchronization. Moreover, it is easy to connect it with callback-style
* microthreading - one can arrange callbacks when an engine is done, and
* one can catch callbacks and turn them into engines.
*)
(** {1 Exceptions} *)
exception Closed_channel
(** Raised when a method of a closed channel object is called (only channel
* methods count).
*
* This exception should be regarded as equivalent to
* [Netchannels.Closed_channel], but need not be the same exception.
*)
exception Broken_communication
(** Some engines indicate this error when they cannot continue because the
* other endpoint of communication signals an error.
*
* This exception is not raised, but used as argument of the [`Error]
* state.
*)
exception Watchdog_timeout
(** Used by the watchdog engine to indicate a timeout.
*
* This exception is not raised, but used as argument of the [`Error]
* state.
*)
exception Timeout
(** Used by [input_engine] and [output_engine] to indicate timeouts *)
exception Addressing_method_not_supported
(** Raised by [client_endpoint_connector] and [server_endpoint_acceptor] to
* indicate that the passed address is not supported by the class.
*)
exception Cancelled
(** The callback function of a [multiplex_controller] is invoked with this
* exception if the operation is cancelled.
*)
(** {1 Engine definition} *)
type 't engine_state =
[ `Working of int
| `Done of 't
| `Error of exn
| `Aborted
]
(** The type of states with result values of type ['t]:
* - [`Working n]: The engine is working. The number [n] counts the number
* of events that have been processed.
* - [`Done arg]: The engine has completed its task without errors.
* The argument [arg] is the result value of the engine
* - [`Error exn]: The engine has aborted because of an error. The
* argument [exn] describes the error as an exception.
* - [`Aborted]: The engine has aborted because the [abort] method
* was called
*)
(* `Done, `Error, and `Aborted are final states, i.e. the state will
* not change again.
* CHECK: This is a bit strict, and hard to implement. At least `Done
* must be final, but it is ok when `Error and `Aborted change, however
* they must not change back to `Working.
*)
;;
type 't final_state =
[ `Done of 't
| `Error of exn
| `Aborted
]
(** Same as [engine_state] without [`Working]. These are only the final
states.
*)
val string_of_state : 'a engine_state -> string
(** For debug purposes: Returns a string describing the state *)
(** This class type defines the interface an engine must support. The
* class parameter ['t] is the type of the result values (when the
* engine goes to state [`Done]).
*)
class type [ 't ] engine = object
(** Requirements for engines *)
method state : 't engine_state
(** Returns the state of the engine *)
method abort : unit -> unit
(** Forces that the engine aborts operation. If the state is already
* [`Done ], [`Aborted], or [`Error], this method must do nothing (you
* cannot abort an already finished engine).
*)
method request_notification : (unit -> bool) -> unit
(** Requests notification about state changes.
*
* After the notification has been requested, the passed function must
* be called whenever [state] changes its value (or might change
* its value; it is allowed to call the notification function more
* frequently than necessary). The function returns [true] if there
* is still interest in notification, and [false] if notification must
* be disabled; the function must not be called any longer in this
* case.
*
* There can be any number of parallel active notifications. It is
* allowed that a notification callback function requests further
* notifications.
*
* If the callback raises an exception, this exception is
* propagated to the caller of {!Unixqueue.run}.
*)
method request_proxy_notification : ('t engine -> bool) -> unit
(** Requests to call back the function when there is another engine
that can be used as proxy for this object. Note that this is a pure
optimization for [qseq_engine], and is normally not implemented
for any other engine construction. It is ok to define this method
as a no-op.
*)
method event_system : Unixqueue.event_system
(** Returns the event system the engine is attached to *)
end
;;
class ['t] delegate_engine : 't #engine -> ['t] engine
(** Turns an engine value into a class *)
(** {1 Engines and callbacks} *)
val when_state : ?is_done:('a -> unit) ->
?is_error:(exn -> unit) ->
?is_aborted:(unit -> unit) ->
?is_progressing:(int -> unit) ->
'a #engine ->
unit
(** Watches the state of the argument engine, and arranges that one of
* the functions is called when the corresponding state change is done.
* Once a final state is reached, the engine is no longer watched.
* Note that [when_state] only observes future state changes.
*
* If one of the functions raises an exception, this exception is
* propagated to the caller of {!Unixqueue.run}.
*
* @param is_done The state transitions to [`Done]. The argument of
* [is_done] is the argument of the [`Done] state.
* @param is_error The state transitions to [`Error]. The argument of
* [is_error] is the argument of the [`Error] state.
* @param is_aborted The state transitions to [`Aborted].
* @param is_progressing This function is called when the [`Working]
* state changes. The int argument is the new [`Working] arg.
*)
class ['a] signal_engine : Unixqueue.event_system ->
object
inherit ['a] engine
method signal : 'a final_state -> unit
end
(** [let se = new signal_engine esys]: The engine [se] remains in
[`Working 0] until the method [se # signal x] is called. At this point
[e] transitions to [x]. Any further call of [signal] does not
have any effect.
Also, if [se] is aborted, [signal] does not have any effect.
The function [signal] may be called from a different thread.
The signalling event is forwarded to the thread running the
event system.
*)
val signal_engine : Unixqueue.event_system ->
'a engine * ('a final_state -> unit)
(** [let (se, signal) = signal_engine esys]: Same as function *)
(** {1 Combinators} *)
(** The following combinators serve as the control structures to connect
primitive engines with each other.
*)
class ['a,'b] map_engine : map_done:('a -> 'b engine_state) ->
?map_error:(exn -> 'b engine_state) ->
?map_aborted:(unit -> 'b engine_state) ->
?propagate_working : bool ->
'a #engine ->
['b] engine
(** The [map_engine] observes the argument engine, and when the
* state changes to [`Done], [`Error], or [`Aborted], the corresponding
* mapping function is called, and the resulting state becomes the state
* of the mapped engine. If the engine is already in one of the
* mentioned states, the map functions are also called (unlike
* [when_state]).
*
* After the state change to [`Done], [`Error], or [`Aborted] has been
* observed, the map engine detaches from the argument engine,
* and no further state changes are recognized.
*
* The state [`Working] cannot be mapped to another state. It is an
* error to map final states to [`Working].
* The result type of the [map_*] functions is [engine_state]
* and not [final_state] because of historic reasons.
*
* If the mapped engine is aborted, this request will be forwarded
* to the argument engine.
*
* If one of the mapping functions raises an exception, this causes
* a transiton to [`Error].
*
* @param map_done Maps the [`Done] state of the argument engine to
* another state. The argument of [map_done] is the argument of the
* [`Done] state. Note that [map_done] is non-optional only because
* of typing. If it were optional, the type checker would infer ['a = 'b].
* @param map_error Maps the [`Error] state of the argument engine to
* another state. The argument of [map_error] is the argument of the
* [`Error] state.
* @param map_aborted Maps the [`Aborted] state of the argument engine to
* another state.
* @param propagate_working Specifies whether changes of the [`Working]
* state in the argument engine are propagated. Defaults to [true].
* If set to [false], the mapped engine remains in [`Working 0] until
* it transitions to a final state.
*
*)
val map_engine : map_done:('a -> 'b engine_state) ->
?map_error:(exn -> 'b engine_state) ->
?map_aborted:(unit -> 'b engine_state) ->
?propagate_working : bool ->
'a #engine ->
'b engine
(** Same as function *)
class ['a,'b] fmap_engine : 'a #engine ->
('a final_state -> 'b final_state) ->
['b] engine
(** Similar to [map_engine] but different calling conventions: The
mapping function is called when the argument engine reaches a
final state, and this state can be mapped to another final state.
*)
val fmap_engine : 'a #engine ->
('a final_state -> 'b final_state) ->
'b engine
(** Same as function
After opening {!Uq_engines.Operators}, this is also available
as operator [>>], e.g.
{[
e >>
(function
| `Done r -> ...
| `Error error -> ...
| `Aborted -> ...
)
]}
*)
class ['a] meta_engine : 'a #engine -> ['a final_state] engine
(** maps the final state [s] to [`Done s] *)
val meta_engine : 'a #engine -> 'a final_state engine
(** Same as function *)
class ['t] epsilon_engine :
't engine_state -> Unixqueue.event_system -> ['t] engine
(** This engine transitions from its initial state [`Working 0] in one
* step ("epsilon time") to the passed constant state. During this time
* event processing will continue, so concurrently running engines can
* make progress. For performance reasons, however, external resources
* like file descriptors are not watched for new events.
*
* In previous versions of this library the class was called [const_engine].
* However, this is not a constant thing. In particular, it is possible
* that this engine is aborted, so the passed state is not reached.
* To avoid programming errors because of the misnomer, this class has been
* renamed.
*)
val epsilon_engine :
't engine_state -> Unixqueue.event_system -> 't engine
(** Same as function *)
class ['a, 'b] seq_engine : 'a #engine -> ('a -> 'b #engine) -> ['b] engine
(** This engine runs two engines in sequential order. It is called
*
* {[ let eng_s = new seq_engine eng_a f ]}
*
* When [eng_a] goes to the state [`Done arg], the function [f] is called to
* obtain
*
* {[ let eng_b = f arg ]}
*
* [eng_b] runs until it is also in state [`Done].
*
* If [eng_a] or [eng_b] go to states [`Aborted] or [`Error], the
* sequential engine [eng_s] does so, too. If [eng_s] is aborted,
* this request will be forwarded to the currently active engine,
* [eng_a] or [eng_b].
*
* If calling [f] results in an exception, this is handled as if [eng_a]
* signaled an exception.
*)
val seq_engine : 'a #engine -> ('a -> 'b #engine) -> 'b engine
(** Same as function.
*
* After opening {!Uq_engines.Operators}, this is also available
* as operator [++], e.g.
* {[ e1 ++ (fun r1 -> e2) ]}
* (when [e1] and [e2] are engines, and [r1] is the result of [e1]).
*)
class ['a, 'b] qseq_engine : 'a #engine -> ('a -> 'b #engine) -> ['b] engine
val qseq_engine : 'a #engine -> ('a -> 'b #engine) -> 'b engine
(** Almost the same as [seq_engine], but this version does not
propagate working state (i.e. no progress reporting).
[qseq_engine] should be preferred for recursive chains of engines.
*)
class ['a] stream_seq_engine : 'a -> ('a -> 'a #engine) Stream.t ->
Unixqueue.event_system -> ['a] engine
(** [let se = new stream_seq_engine x0 s esys]: The constructed engine [se]
* fetches functions [f : 'a -> 'a #engine] from the stream [s], and
* runs the engines obtained by calling these functions [e = f x] one
* after the other. Each function call gets the result of the previous
* engine as argument. The first call gets [x0] as argument.
*
* If one of the engines [e] transitions into an error or aborted state,
* [se] will also do that. If [se] is aborted, this is passed down to
* the currently running engine [e].
*)
val stream_seq_engine : 'a -> ('a -> 'a #engine) Stream.t ->
Unixqueue.event_system -> 'a engine
(** Same as function *)
class ['a, 'b] sync_engine : 'a #engine -> 'b #engine -> ['a * 'b] engine
(** This engine runs two engines in parallel, and waits until both
* are [`Done] (synchronization). The product of the two [`Done] arguments
* is taken as the combined result.
*
* If one of the engines goes to the states [`Aborted] or [`Error],
* the combined engine will follow this transition. The other,
* non-aborted and non-errorneous engine is aborted in this case.
* [`Error] has higher precedence than [`Aborted].
*
* If the combined engine is aborted, this request is forwarded
* to both member engines.
*)
val sync_engine : 'a #engine -> 'b #engine -> ('a * 'b) engine
(** Same as function *)
class ['a,'b] msync_engine : 'a #engine list ->
('a -> 'b -> 'b) ->
'b ->
Unixqueue.event_system ->
['b] engine
(** Multiple synchronization:
[let me = new msync_engine el f x0 esys] - Runs the engines in [el] in
parallel, and waits until all are [`Done]. The result of [me] is
then computed by folding the results of the part engines using
[f], with an initial accumulator [x0].
If one of the engines goes to the states [`Aborted] or [`Error],
the combined engine will follow this transition. The other,
non-aborted and non-errorneous engines are aborted in this case.
[`Error] has higher precedence than [`Aborted].
If calling [f] results in an exception, this is handled as if
the part engine signals an error.
If the combined engine is aborted, this request is forwarded
to all member engines.
*)
val msync_engine : 'a #engine list ->
('a -> 'b -> 'b) ->
'b ->
Unixqueue.event_system ->
'b engine
(** Same as function *)
class ['a ] delay_engine : float -> (unit -> 'a #engine) ->
Unixqueue.event_system ->
['a] engine
(** [let de = delay_engine d f esys]: The engine [e = f()] is created
after [d] seconds, and the result of [e] becomes the result of [de].
*)
val delay_engine : float -> (unit -> 'a #engine) ->
Unixqueue.event_system ->
'a engine
(** Same as function *)
class ['a] timeout_engine : float -> exn -> 'a engine -> ['a] engine
(** [timeout_engine d x e]: If the engine [e] finishes within [d]
seconds, the result remains unchanged. If the engine takes longer,
though, it is aborted, and the state transitions to
[`Error x]
*)
val timeout_engine : float -> exn -> 'a engine -> 'a engine
(** Same as function *)
class watchdog : float ->
'a #engine ->
[unit] engine
(** A watchdog engine checks whether the argument engine makes
* progress, and if there is no progress for the passed number of
* seconds, the engine is aborted, and the watchdog state changes
* to [`Error Watchdog_timeout].
*
* The current implementation is not very exact, and it may take
* a little longer than the passed period of inactivity until the
* watchdog recognizes inactivity.
*
* If the argument engine terminates, the watchdog changes its state to
* [`Done ()]
*
* Important note: The watchdog assumes that the [`Working] state
* of the target engine really counts events that indicate progress.
* This does not work for:
* - [poll_process_engine]: there is no way to check whether a subprocess
* makes progress
* - [connector]: It is usually not possible to reflect the progress
* on packet level
* - [listener]: It is usually not possible to reflect the progress
* on packet level
*)
val watchdog : float -> 'a #engine -> unit engine
(** Same as function *)
(** A serializer queues up engines, and starts the next engine when the
previous one finishes.
*)
class type ['a] serializer_t =
object
method serialized : (Unixqueue.event_system -> 'a engine) -> 'a engine
(** [let se = serialized f]: Waits until all the previous engines reach
a final state, and then runs [e = f esys].
[se] enters a final state when [e] does.
*)
end
class ['a] serializer : Unixqueue.event_system -> ['a] serializer_t
(** Creates a serializer *)
val serializer : Unixqueue.event_system -> 'a serializer_t
(** Same as function *)
(** A prioritizer allows to prioritize the execution of engines: At any
time, only engines of a certain priority [p] can be executed. If an
engine with a higher priority [ph] wants to start, it prevents further
engines with priority level [p] from being started until the higher
prioritized engines with level [ph] are done. On the same priority level,
there is no limit for the number of executed engines.
Here, higher priorities have lower numbers.
*)
class type ['a] prioritizer_t =
object
method prioritized : (Unixqueue.event_system -> 'a engine) -> int -> 'a engine
(** [let pe = prioritized f p]: Queues up [f] on priority level [p].
The engine [e = f esys] can start when there is no waiting
engine on a higher priority level (i.e. with a number less than
[p]), and all running engines on lower priority levels are done.
[pe] enters a final state when [e] does.
*)
end
class ['a] prioritizer : Unixqueue.event_system -> ['a] prioritizer_t
(** Creates a prioritizer *)
val prioritizer : Unixqueue.event_system -> 'a prioritizer_t
(** Same as function *)
(** A cache contains a mutable value that is obtained by running an
engine.
*)
class type ['a] cache_t =
object
method get_engine : unit -> 'a engine
(** Requests the value. If it is not already in the cache,
the engine for getting the value is started, and it is waited
until the value is available.
*)
method get_opt : unit -> 'a option
(** Returns the cached value if available *)
method put : 'a -> unit
(** Puts a value immediately into the cache. It replaces an existing
value. If it is currently tried to obtain a new value by running
an engine, this engine is kept running, and [get_engine] will
return its result. Only future calls of [get_engine] will return
the value just put into the cache.
*)
method invalidate : unit -> unit
(** Invalidates the cache - if a value exists in the cache, it is removed.
If in the future the cache value is requested via [get_engine]
the engine will be started anew to get the value.
Note that (as for [put]) any already running [get_engine] is not
interrupted.
*)
method abort : unit -> unit
(** Any engine running to get the cache value is aborted, and the contents
of the cache are invalidated. Note that also the engines returned
by [get_engine] are aborted.
*)
end
class ['a] cache : (Unixqueue.event_system -> 'a engine) ->
Unixqueue.event_system ->
['a] cache_t
(** [new cache f esys]: A cache that runs [f esys] to obtain values *)
val cache : (Unixqueue.event_system -> 'a engine) ->
Unixqueue.event_system ->
'a cache_t
(** Same as function *)
class ['t] engine_mixin : 't engine_state -> Unixqueue.event_system ->
object
method state : 't engine_state
method private set_state : 't engine_state -> unit
method request_notification : (unit -> bool) -> unit
method request_proxy_notification : ('t engine -> bool) -> unit
method private notify : unit -> unit
method event_system : Unixqueue.event_system
end
(** A useful class fragment that implements [state] and
* [request_notification].
*)
(** Handy operators: [++], [>>], and [eps_e] *)
module Operators : sig
(** The most important operators. This module should be opened. *)
val ( ++ ) : 'a #engine -> ('a -> 'b #engine) -> 'b engine
(** Another name for [qseq_engine]. Use this operator to run engines in
sequence:
{[
e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ...
]}
Here [rK] is the result of engine [eK].
Change in OCamlnet-3.6.4: [++] is now [qseq_engine], and no longer
[seq_engine], and hence it does not support progress reporting anymore.
Redefine [++] as [seq_engine] in your own code if you need the old
behavior.
*)
val ( >> ) : 'a #engine ->
('a final_state -> 'b final_state) ->
'b engine
(** Another name for [fmap_engine]. Use this operator to map the
final value of an engine:
{[
e >> (function `Done x -> ... | `Error e -> ... | `Aborted -> ...)
]}
*)
val eps_e : 't engine_state -> Unixqueue.event_system -> 't engine
(** Same as [epsilon_engine] *)
end
(** {1 Basic I/O engines} *)
class poll_engine : ?extra_match:(exn -> bool) ->
(Unixqueue.operation * float) list ->
Unixqueue.event_system ->
object
inherit [Unixqueue.event] engine
(** {1 Additional methods} *)
method restart : unit -> unit
(** Activate the engine again when it is already in a final state.
* This method violates the engine protocol, and should be used
* with care; it is not allowed to leave a final state.
*
* The notification lists are kept, but note that observers often
* detach when final states are reached. This may cause problems.
*)
method group : Unixqueue.group
(** Returns the group the engine is member of *)
end ;;
(** This engine waits until one of the passed operations can be
* carried out, or until one of the operations times out.
* In these cases, the state of the engine changes to [`Done ev], where
* [ev] is the corresponding event.
*
* The argument list enumerates the operations to watch for. For every
* operation there may be a positive timeout value, or a negative number
* to indicate that no timeout is specified.
*
* After one event has been caught, the engine terminates operation.
* The method [restart] can be called to activate it again (with the
* same event condition, and the same notification list). See the
* description of [restart] for possible problems.
*
* @param extra_match This function is called when an [Extra] event is
* found. If the function returns [true] for the argument exception
* of [Extra], the event is caught; otherwise it is rejected.
*)
class ['a] input_engine : (Unix.file_descr -> 'a) ->
Unix.file_descr -> float -> Unixqueue.event_system ->
['a] engine
(** Generic input engine for reading from a file descriptor:
[let e = new input_engine f fd tmo] - Waits until the file descriptor
becomes readable, and calls then [let x = f fd] to read from the
descriptor. The result [x] is the result of the engine.
If the file descriptor does not become readable within [tmo] seconds,
the resulting engine transitions to [`Error Timeout].
Use this class to construct engines reading via [Unix.read] or
comparable I/O functions:
{[
let read_engine fd tmo esys =
new input_engine (fun fd ->
let buf = String.create 4096 in
let n = Unix.read fd buf 0 (String.length buf) in
String.sub buf 0 n
)
fd tmo esys
]}
This engine returns the read data as string.
See also {!Uq_io.input_e} for a more generic way of reading with
engines.
*)
class ['a] output_engine : (Unix.file_descr -> 'a) ->
Unix.file_descr -> float -> Unixqueue.event_system ->
['a] engine
(** Generic output engine for writing to a file descriptor:
[let e = new output_engine f fd tmo] - Waits until the file descriptor
becomes writable, and calls then [let x = f fd] to write to the
descriptor. The result [x] is the result of the engine.
If the file descriptor does not become writable within [tmo] seconds,
the resulting engine transitions to [`Error Timeout].
Use this class to construct engines writing via [Unix.single_write] or
comparable I/O functions:
{[
let write_engine fd s tmo esys =
new output_engine (fun fd ->
Unix.single_write fd s 0 (String.length s)
)
fd tmo esys
]}
This engine returns the number of written bytes.
See also {!Uq_io.output_e} for a more generic way of writing with
engines.
*)
class poll_process_engine : ?period:float ->
pid:int ->
Unixqueue.event_system ->
[Unix.process_status] engine ;;
(** {b This class is deprecated!} Use the classes in {!Shell_uq} instead.
*
* This engine waits until the process with the ID [pid] terminates.
* When this happens, the state of the engine changes to
* [`Done], and the argument of [`Done] is the process status.
*
* The engine does not catch stopped processes.
*
* The engine checks the process status every [period] seconds, and
* whenever there is a [Signal] event on the queue. The idea of the
* latter is that the user of this engine can increase the responsiveness
* by defining a signal handler for SIGCHLD signals (the handler need
* not to perform any special action, it must just be defined). When
* the sub process terminates, a SIGCHLD signal is sent to the current
* process. If the event loop happens to wait for new conditions (which
* is usually very likely), a [Signal] event will be generated, and
* the engine will check the process status very soon. Note that it is
* not guaranteed that a terminating process triggers a [Signal] event,
* although it is very likely.
*
* You can define an empty SIGCHLD handler with:
*
* {[ Sys.set_signal Sys.sigchld (Sys.Signal_handle (fun _ -> ())) ]}
*
* @param period Every [period] seconds the process status is checked.
* Defaults to 0.1 seconds.
*)
(** {2 More I/O}
The module {!Uq_io} provides a bunch of functions to read and write
data via various "devices". All these functions return engines, and
are easy to use. Devices can be file descriptors, but also other
data structures. In particular, there is also support for buffered I/O
and for reading line-by-line from an input device.
*)
(** {1 Recursion} *)
(** When programming with engines, it is normal to use recursion for any
kind of loops. For example, to read the lines from a file:
{[
open Uq_engines.Operators (* for ">>" and "++" *)
let fd =
Unix.openfile filename [Unix.O_RDONLY] 0 in
let d =
`Buffer_in(Uq_io.create_in_buffer(`Polldescr(`Read_write,fd,esys))) in
let rec read_lines acc =
Uq_io.input_line_e d >>
(function (* catch exception End_of_file *)
| `Done line -> `Done(Some line)
| `Error End_of_file -> `Done None
| `Error error -> `Error error
| `Aborted -> `Aborted
) ++
(function
| Some line ->
read_lines (line :: acc)
| None ->
eps_e (`Done (List.rev acc)) esys
) in
let e = read_lines []
]}
There is generally the question whether this style leads to stack
overflows. This depends on the mechanisms that come into play:
- The engine mechanism passing control from one engine to the next is
not tail-recursive, and thus the stack can overflow when the
recursion becomes too deep
- The event queue mechanism, however, does not have this problem.
Control falls automatically back to the event queue whenever I/O
needs to be done.
In this example, this means that only the engine mechanism is used
as long as the data is read from the buffer. When the buffer needs
to be refilled, however, control is passed back to the event queue
(so the stack is cleaned), and the continuation of the execution
is only managed via closures (which only allocate memory on the
heap, not on the stack). Usually, this is a good compromise: The
engine mechnism is a lot faster, but I/O is an indicator for using
the better but slower technique.
Also note another difference: The event queue mechanism allows that
other asynchronous code attached to the same event queue may run
(control maybe yielded to unrelated execution contexts). The
pure engine mechanism does not allow that. This may be handy when
exclusive access to variables is needed. (But be careful here -
this is very sensitive to minimal changes of the implementation.)
Certain engines enforce using the event queue mechanisms although they
are unrelated to I/O. Especially {!Uq_engines.delay_engine} is
useful here: A "delay" of 0 seconds is already sufficient to
go back to the event queue. If recursions sometimes lead to
stack overflows the solution is to include such a zero delay
before doing the self call.
*)
(** {1 More Engines} *)
(**
Pointers to other modules related to engines:
- {!Uq_client}
- {!Uq_server}
- {!Uq_multiplex}
- {!Uq_transfer}
- {!Uq_datagram}
- {!Uq_io}
- RPC clients: The function {!Rpc_proxy.ManagedClient.rpc_engine} allows
to call an RPC via an engine. When the call is done, the engine transitions
to [`Done r], and [r] is the result of the remote call.
- Subprograms: The class {!Shell_uq.call_engine} allows to start an
external program, and to monitor it via an engine.
*)
(** {1 Moved} *)
(** OCamlnet-4.0 moves a number of definitions to the modules
- {!Uq_transfer}
- {!Uq_multiplex}
For convenience, the types are still also exported here, but
functions and classes are now defined in these modules.
See also the module {!Uq_engines_compat}.
*)
(** Moved to {!Uq_transfer.async_out_channel} *)
class type async_out_channel = object
method output : Bytes.t -> int -> int -> int
method close_out : unit -> unit
method pos_out : int
method flush : unit -> unit
method can_output : bool
method request_notification : (unit -> bool) -> unit
end
(** Moved to {!Uq_transfer.async_in_channel} *)
class type async_in_channel = object
method input : Bytes.t -> int -> int -> int
method close_in : unit -> unit
method pos_in : int
method can_input : bool
method request_notification : (unit -> bool) -> unit
end
(** Moved to {!Uq_transfer.async_out_channel_engine} *)
class type async_out_channel_engine = object
inherit [ unit ] engine
inherit async_out_channel
end
(** Moved to {!Uq_transfer.copy_task} *)
type copy_task =
[ `Unidirectional of (Unix.file_descr * Unix.file_descr)
| `Uni_socket of (Unix.file_descr * Unix.file_descr)
| `Bidirectional of (Unix.file_descr * Unix.file_descr)
| `Tridirectional of (Unix.file_descr * Unix.file_descr * Unix.file_descr)
]
(** Moved to {!Uq_transfer.async_in_channel_engine} *)
class type async_in_channel_engine = object
inherit [ unit ] engine
inherit async_in_channel
end
(** This definition has now been moved to {!Uq_multiplex.multiplex_controller}
*)
class type multiplex_controller =
object
method alive : bool
method mem_supported : bool
method event_system : Unixqueue.event_system
method tls_session_props : Nettls_support.tls_session_props option
method tls_session : (string * string) option
method tls_stashed_endpoint : unit -> exn
method reading : bool
method start_reading :
?peek:(unit -> unit) ->
when_done:(exn option -> int -> unit) -> Bytes.t -> int -> int -> unit
method start_mem_reading :
?peek:(unit -> unit) ->
when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int ->
unit
method cancel_reading : unit -> unit
method writing : bool
method start_writing :
when_done:(exn option -> int -> unit) -> Bytes.t -> int -> int -> unit
method start_mem_writing :
when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int ->
unit
method supports_half_open_connection : bool
method start_writing_eof :
when_done:(exn option -> unit) -> unit -> unit
method cancel_writing : unit -> unit
method read_eof : bool
method wrote_eof : bool
method shutting_down : bool
method start_shutting_down :
?linger : float ->
when_done:(exn option -> unit) -> unit -> unit
method cancel_shutting_down : unit -> unit
method inactivate : unit -> unit
end
exception Mem_not_supported
(** Moved to {!Uq_multiplex.Mem_not_supported} *)
(** Moved to {!Uq_multiplex.datagram_multiplex_controller} *)
class type datagram_multiplex_controller =
object
inherit multiplex_controller
method received_from : Unix.sockaddr
method send_to : Unix.sockaddr -> unit
end
(** Moved to {!Uq_transfer.onshutdown_out_spec} *)
type onshutdown_out_spec =
[ `Ignore
| `Initiate_shutdown
| `Action of async_out_channel_engine -> multiplex_controller ->
unit engine_state -> unit
]
(** Moved to {!Uq_transfer.onshutdown_in_spec} *)
type onshutdown_in_spec =
[ `Ignore
| `Initiate_shutdown
| `Action of async_in_channel_engine -> multiplex_controller ->
unit engine_state -> unit
]
(** Moved to {!Uq_client.inetspec} *)
type inetspec =
[ `Sock_inet of (Unix.socket_type * Unix.inet_addr * int)
| `Sock_inet_byname of (Unix.socket_type * string * int)
]
(** Moved to {!Uq_client.sockspec} *)
type sockspec =
[ inetspec
| `Sock_unix of (Unix.socket_type * string)
]
(** Moved to {!Uq_client.connect_address} *)
type connect_address =
[ `Socket of sockspec * connect_options
| `Command of string * (int -> Unixqueue.event_system -> unit)
| `W32_pipe of Netsys_win32.pipe_mode * string
]
and connect_options =
{ conn_bind : sockspec option;
(** Bind the connecting socket to this address (same family as the
* connected socket required). [None]: Use an anonymous port.
*)
}
(** Moved to {!Uq_client.connect_status} *)
type connect_status =
[ `Socket of Unix.file_descr * sockspec
| `Command of Unix.file_descr * int
| `W32_pipe of Unix.file_descr
]
(** Moved to {!Uq_client.client_endpoint_connector} *)
class type client_endpoint_connector = object
method connect : connect_address ->
Unixqueue.event_system ->
connect_status engine
end
(** Moved to {!Uq_server.listen_address} *)
type listen_address =
[ `Socket of sockspec * listen_options
| `W32_pipe of Netsys_win32.pipe_mode * string * listen_options
]
and listen_options =
{ lstn_backlog : int;
lstn_reuseaddr : bool;
}
(** Moved to {!Uq_server.server_endpoint_acceptor} *)
class type server_endpoint_acceptor = object
method server_address : connect_address
method multiple_connections : bool
method accept : unit -> (Unix.file_descr * inetspec option) engine
method shut_down : unit -> unit
end
(** Moved to {!Uq_server.server_endpoint_listener} *)
class type server_endpoint_listener = object
method listen : listen_address ->
Unixqueue.event_system ->
server_endpoint_acceptor engine
end
(** Moved to {!Uq_datagram.datagram_type} *)
type datagram_type =
[ `Unix_dgram
| `Inet_udp
| `Inet6_udp
]
(** Moved to {!Uq_datagram.datagram_type} *)
class type wrapped_datagram_socket =
object
method descriptor : Unix.file_descr
method sendto :
Bytes.t -> int -> int -> Unix.msg_flag list -> sockspec -> int
method recvfrom :
Bytes.t -> int -> int -> Unix.msg_flag list -> (int * sockspec)
method shut_down : unit -> unit
method datagram_type : datagram_type
method socket_domain : Unix.socket_domain
method socket_type : Unix.socket_type
method socket_protocol : int
end;;
(** Moved to {!Uq_datagram.datagram_type} *)
class type datagram_socket_provider =
object
method create_datagram_socket : datagram_type ->
Unixqueue.event_system ->
wrapped_datagram_socket engine
end
(** {1 Debugging} *)
module Debug : sig
val enable : bool ref
(** Enables {!Netlog}-style debugging *)
end
|