Module Picos_sync

Basic communication and synchronization primitives for Picos.

This library essentially provides a conventional set of communication and synchronization primitives for concurrent programming with any Picos compatible scheduler.

For the examples we open some modules:

open Picos_structured
open Picos_sync


module Mutex : sig ... end

A mutex implementation for Picos.

module Condition : sig ... end

A condition implementation for Picos.

module Lazy : sig ... end

A lazy implementation for Picos.

module Event : sig ... end

An implementation of first-class synchronous communication for Picos.


A simple bounded queue

Here is an example of a simple bounded (blocking) queue using a mutex and condition variables:

module Bounded_q : sig
  type 'a t
  val create : capacity:int -> 'a t
  val push : 'a t -> 'a -> unit
  val pop : 'a t -> 'a
end = struct
  type 'a t = {
    mutex : Mutex.t;
    queue : 'a Queue.t;
    capacity : int;
    not_empty : Condition.t;
    not_full : Condition.t;

  let create ~capacity =
    if capacity < 0 then
      invalid_arg "negative capacity"
    else {
      mutex = Mutex.create ();
      queue = Queue.create ();
      not_empty = Condition.create ();
      not_full = Condition.create ();

  let is_full_unsafe t =
    t.capacity <= Queue.length t.queue

  let push t x =
    let was_empty =
      Mutex.protect t.mutex @@ fun () ->
      while is_full_unsafe t do
        Condition.wait t.not_full t.mutex
      Queue.push x t.queue;
      Queue.length t.queue = 1
    if was_empty then
      Condition.signal t.not_empty

  let pop t =
    let elem, was_full =
      Mutex.protect t.mutex @@ fun () ->
      while Queue.length t.queue = 0 do
          t.not_empty t.mutex
      let was_full = is_full_unsafe t in
      Queue.pop t.queue, was_full
    if was_full then
      Condition.signal t.not_full;

The above is definitely not the fastest nor the most scalable bounded queue, but we can now demonstrate it with the cooperative Picos_fifos scheduler:

# @@ fun () ->

  let bq =
    Bounded_q.create ~capacity:3

  Bundle.join_after begin fun bundle ->
    Bundle.fork bundle begin fun () ->
      while true do
        Printf.printf "Popped %d\n%!"
          (Bounded_q.pop bq)

    for i=1 to 5 do
      Printf.printf "Pushing %d\n%!" i;
      Bounded_q.push bq i

    Printf.printf "All done?\n%!";

    Control.yield ();

    Bundle.terminate bundle

  Printf.printf "Pushing %d\n%!" 101;
  Bounded_q.push bq 101;

  Printf.printf "Popped %d\n%!"
    (Bounded_q.pop bq)
Pushing 1
Pushing 2
Pushing 3
Pushing 4
Popped 1
Popped 2
Popped 3
Pushing 5
All done?
Popped 4
Popped 5
Pushing 101
Popped 101
- : unit = ()

Notice how the producer was able to push three elements to the queue after which the fourth push blocked and the consumer was started. Also, after canceling the consumer, the queue could still be used just fine.


The optional padded argument taken by several constructor functions, e.g. Mutex.create and Condition.create, defaults to false. When explicitly specified as ~padded:true the object is allocated in a way to avoid false sharing. For relatively long lived objects this can improve performance and make performance more stable at the cost of using more memory. It is not recommended to use ~padded:true for short lived objects.