Module Picos_std_structured

Basic structured concurrency primitives for Picos.

This library essentially provides one application programming interface for structuring fibers with any Picos compatible scheduler.

For the examples we open some modules:

open Picos_io
open Picos_std_event
open Picos_std_finally
open Picos_std_structured
open Picos_std_sync

Modules

module Control : sig ... end

Basic control operations and exceptions for structured concurrency.

module Promise : sig ... end

A cancelable promise.

module Bundle : sig ... end

An explicit dynamic bundle of fibers guaranteed to be joined at the end.

module Flock : sig ... end

An implicit dynamic flock of fibers guaranteed to be joined at the end.

module Run : sig ... end

Operations for running fibers in specific patterns.

Examples

Understanding cancelation

Consider the following program:

let main () =
  Flock.join_after @@ fun () ->
  let promise = Flock.fork_as_promise @@ fun () -> Control.block () in

  begin
    Flock.fork @@ fun () -> Promise.await promise
  end;

  begin
    Flock.fork @@ fun () ->
    let condition = Condition.create () and mutex = Mutex.create () in
    Mutex.protect mutex @@ fun () ->
    while true do
      Condition.wait condition mutex
    done
  end;

  begin
    Flock.fork @@ fun () ->
    let sem = Semaphore.Binary.make false in
    Semaphore.Binary.acquire sem
  end;

  begin
    Flock.fork @@ fun () ->
    let sem = Semaphore.Counting.make 0 in
    Semaphore.Counting.acquire sem
  end;

  begin
    Flock.fork @@ fun () -> Event.sync (Event.choose [])
  end;

  begin
    Flock.fork @@ fun () ->
    let latch = Latch.create 1 in
    Latch.await latch
  end;

  begin
    Flock.fork @@ fun () ->
    let ivar = Ivar.create () in
    Ivar.read ivar
  end;

  begin
    Flock.fork @@ fun () ->
    let stream = Stream.create () in
    Stream.read (Stream.tap stream) |> ignore
  end;

  begin
    Flock.fork @@ fun () ->
    let@ inn, out =
      finally Unix.close_pair @@ fun () ->
      Unix.socketpair ~cloexec:true PF_UNIX SOCK_STREAM 0
    in
    Unix.set_nonblock inn;
    let n = Unix.read inn (Bytes.create 1) 0 1 in
    assert (n = 1)
  end;

  begin
    Flock.fork @@ fun () ->
    let a_month = 60.0 *. 60.0 *. 24.0 *. 30.0 in
    Control.sleep ~seconds:a_month
  end;

  (* Let the children get stuck *)
  Control.sleep ~seconds:0.1;

  Flock.terminate ()

First of all, note that above the Mutex, Condition, and Semaphore modules come from the Picos_std_sync library and the Unix module comes from the Picos_io library. They do not come from the standard OCaml libraries.

The above program creates a flock of fibers and forks several fibers to the flock that all block in various ways. In detail,

Fibers forked to a flock can be canceled in various ways. In the above program we call Flock.terminate to cancel all of the fibers and effectively close the flock. This allows the program to return normally immediately and without leaking or leaving anything in an invalid state:

# Picos_mux_random.run_on ~n_domains:2 main
- : unit = ()

Now, the point of the above example isn't that you should just call terminate when your program gets stuck. 😅

What the above example hopefully demonstrates is that concurrent abstractions like mutexes and condition variables, asynchronous IO libraries, and others can be designed to support cancelation.

Cancelation is a signaling mechanism that allows structured concurrent abstractions, like the Flock abstraction, to (hopefully) gracefully tear down concurrent fibers in case of errors. Indeed, one of the basic ideas behind the Flock abstraction is that in case any fiber forked to the flock raises an unhandled exception, the whole flock will be terminated and the error will be raised from the flock, which allows you to understand what went wrong, instead of having to debug a program that mysteriously gets stuck, for example.

Cancelation can also, with some care, be used as a mechanism to terminate fibers once they are no longer needed. However, just like sleep, for example, cancelation is inherently prone to races, i.e. it is difficult to understand the exact point and state at which a fiber gets canceled and it is usually non-deterministic, and therefore cancelation is not recommended for use as a general synchronization or communication mechanism.

Errors and cancelation

Consider the following program:

let many_errors () =
  Flock.join_after @@ fun () ->
  let latch = Latch.create 1 in

  let fork_raising exn =
    Flock.fork @@ fun () ->
    begin
      Control.protect @@ fun () -> Latch.await latch
    end;
    raise exn
  in

  fork_raising Exit;
  fork_raising Not_found;
  fork_raising Control.Terminate;

  Latch.decr latch

The above program starts three fibers and uses a latch to ensure that all of them have been started, before two of them raise errors and the third raises Terminate, which is not considered an error in this library. Running the program

# Picos_mux_fifo.run many_errors
Exception: Errors[Stdlib.Exit; Not_found]

raises a collection of all of the errors.

A simple echo server and clients

Let's build a simple TCP echo server and run it with some clients.

We first define a function for the server:

let run_server server_fd =
  Flock.join_after @@ fun () ->
  while true do
    let@ client_fd =
      instantiate Unix.close @@ fun () ->
      Unix.accept ~cloexec:true server_fd |> fst
    in

    (* Fork a fiber for client *)
    Flock.fork @@ fun () ->
    let@ client_fd = move client_fd in
    Unix.set_nonblock client_fd;

    let bs = Bytes.create 100 in
    let n = Unix.read client_fd bs 0 (Bytes.length bs) in
    Unix.write client_fd bs 0 n |> ignore
  done

The server function expects a listening socket. For each accepted client the server forks a new fiber to handle it. The client socket is moved from the server fiber to the client fiber to avoid leaks and to ensure that the socket will be closed.

Let's then define a function for the clients:

let run_client server_addr =
  let@ socket =
    finally Unix.close @@ fun () ->
    Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0
  in
  Unix.set_nonblock socket;
  Unix.connect socket server_addr;

  let msg = "Hello!" in
  Unix.write_substring socket msg 0 (String.length msg) |> ignore;

  let bytes = Bytes.create (String.length msg) in
  let n = Unix.read socket bytes 0 (Bytes.length bytes) in

  Printf.printf "Received: %s\n%!" (Bytes.sub_string bytes 0 n)

The client function takes the address of the server and connects a socket to the server address. It then writes a message to the server and reads a reply from the server and prints it.

Here is the main program:

let main () =
  let@ server_fd =
    finally Unix.close @@ fun () ->
    Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0
  in
  Unix.set_nonblock server_fd;
  (* Let system determine the port *)
  Unix.bind server_fd Unix.(ADDR_INET (inet_addr_loopback, 0));
  Unix.listen server_fd 8;

  let server_addr = Unix.getsockname server_fd in

  Flock.join_after ~on_return:`Terminate @@ fun () ->
  (* Start server *)
  begin
    Flock.fork @@ fun () -> run_server server_fd
  end;

  (* Run clients concurrently *)
  Flock.join_after @@ fun () ->
  for _ = 1 to 5 do
    Flock.fork @@ fun () -> run_client server_addr
  done

The main program creates a socket for the server and configures it. The server is then started as a fiber in a flock terminated on return. Then the clients are started to run concurrently in an inner flock.

Finally we run the main program with a scheduler:

# Picos_mux_random.run_on ~n_domains:1 main
Received: Hello!
Received: Hello!
Received: Hello!
Received: Hello!
Received: Hello!
- : unit = ()

As an exercise, you might want to refactor the server to avoid moving the file descriptors and use a recursive accept loop instead. You could also terminate the whole flock at the end instead of just terminating the server.