Lwt interface for Picos

This package provides an alternative direct style interface to programming with Lwt via Picos.

Libraries

Examples

Perhaps one of the main reasons someone might want to use a Picos based direct style interface to programming with Lwt is for the purpose of working with an existing codebase and gradually porting the codebase to be effects based and to use an effects based scheduler. So, as an example, let's construct a program that runs both Lwt and, in another domain, an effects based scheduler.

As an aside, Picos is specifically designed to allow an application to run multiple schedulers and for code running on those schedulers to be able to communicate and synchronize. Specifically, Picos is an interface to communicating with schedulers. A concurrent abstraction implemented in terms of the Picos interface automatically works with any Picos compatible scheduler. Furthermore, a correctly implemented scheduler allows certain operations, such as operations that cause fibers running on the scheduler to be resumed, to work across schedulers.

For the example, we first open a couple of libraries:

# open Picos_std_finally (* let@, finally, lastly *)
# open Picos_std_sync (* Stream, Ivar *)

The FIFO scheduler we will use normally automatically checks that the IO event loop library it uses for timeouts has been configured. However, as we will be spawning a new domain for the scheduler, we need to make sure to configure the IO library from the main thread of the application:

Picos_io_select.check_configured ()

Below is our example program:

let main () =
  let stream = Stream.create () in

  let@ _ =
    finally Domain.join @@ fun () ->
    let cursor = Stream.tap stream in
    Domain.spawn @@ fun () ->
    Picos_mux_fifo.run @@ fun () ->
    let rec loop cursor =
      let ((who, out), cursor) =
        Stream.read cursor
      in
      Printf.sprintf "Hello, %s!" who
      |> Ivar.fill out;
      loop cursor
    in
    try loop cursor with Exit -> ()
  in

  let@ _ = lastly @@ fun () ->
    Stream.poison stream Exit
  in

  ["Mii"; "Uuu"]
  |> List.iter (fun who ->
     let reply = Ivar.create () in
     Stream.push stream (who, reply);
     Ivar.read reply
     |> Lwt_io.printl
     |> Picos_lwt.await);

  Picos_lwt.await Lwt_io.(flush stdout)

The above program first creates a stream for communication. Then it spawns a domain for running the FIFO scheduler making sure that the domain will be joined and that a cursor to the stream is obtained before the rest of the program pushes messages to the stream. The loop running on the FIFO scheduler reads messages from the stream and responds to them until the stream is poisoned. The rest of the program runs on Lwt on the main thread. It first makes sure to poison the stream at the end. Then it runs a loop that sends a couple of messages to the stream, reads the responses, and prints them using Lwt.

Finally we run the program with Lwt:

# Picos_lwt_unix.run_main main
Hello, Mii!
Hello, Uuu!
- : unit = ()

Importantly, the above program shows that one can use communication and synchronization primitives like Stream and Ivar across schedulers.