Raft

An attempt to implement the Raft distributed protocol using OCaml 5 Eio, Effect Handlers, and capnp-rpc-lwt, which is a RPC library based on Eio . Eio allows integration of Lwt framework.

  1. The Eio capnp-rpc library is still in a branch waiting to be merged. This will be a RPC library entirely based on eio.
  2. The older LWT capnp-rpc-lwt is used as Eio can integrate with it. This is done because the code started using Lwt-Eio integration.
  3. The newer capnp-rpc based entirely on Eio will be used later.

    Everything involved in this has to be learned. Code is incrementally developed and updated as new logic is added.

Simple Eio Client/Server and LWT RPC

Client

The details of the setup using dune to build this will eventually be added to the README of the Git repository. Code is now directly posted here. A simple echo RPC call is included to test the capability of the capnp-rpc_lwt library.

open Eio.Std
open Capnp_rpc_lwt
open Lwt.Infix
(* Prefix all trace output with "client: " *)
let traceln fmt = traceln ("client: " ^^ fmt)

module Read = Eio.Buf_read
module Write = Eio.Buf_write
module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)
module Echo = Api.Client.Echo

let ping t msg =
  let open Echo.Ping in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.msg_set params msg;
  Capability.call_for_value_exn t method_id request >|= Results.reply_get

let rec loop clock to_server from_server =
   Write.string to_server "Request";
   Write.char to_server ' ';
   Write.string to_server "from client\n";
   Eio.Time.sleep clock 0.5;
   traceln "Waiting for server " ;
   let reply =  Read.line from_server  in
   traceln "Got reply %S" reply;
   Eio.Time.sleep clock 0.5;
   loop clock to_server from_server

let run ~net ~clock ~addr  =
  Eio.Time.sleep clock 0.5;
  traceln "Connecting to server at %a..." Eio.Net.Sockaddr.pp addr;
  Switch.run  @@ fun sw ->
  let flow = Eio.Net.connect ~sw net addr in
  Write.with_flow flow @@ fun to_server ->
  let from_server = Read.of_flow flow ~max_size:100 in
  loop clock to_server from_server

Server

open Eio.Std

(* Prefix all trace output with "server: " *)
let traceln fmt = traceln ("server: " ^^ fmt)

module Read = Eio.Buf_read
module Write = Eio.Buf_write
module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)

open Capnp_rpc_lwt

module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)
module Echo = Api.Client.Echo
let local =
  let module Echo = Api.Service.Echo in
  Echo.local @@ object
    inherit Echo.service

    method ping_impl params release_param_caps =
      let open Echo.Ping in
      let msg = Params.msg_get params in
      release_param_caps ();
      let response, results = Service.Response.create Results.init_pointer in
      Results.reply_set results ("echo:" ^ msg);
      Service.return response
  end
(* Read one line from [client] and respond with "OK". *)
let rec handle_client to_client from_client =
  traceln "Received: %S" (Read.line from_client);
  Write.string to_client "OK\n";
  Write.flush to_client;
  traceln "Written to client ";
  handle_client to_client from_client


let rec server_loop socket =

  Switch.run @@ fun sw ->
  let flow,addr = Eio.Net.accept ~sw socket in
  traceln "Accepted connection from %a" Eio.Net.Sockaddr.pp addr;
  Fiber.fork ~sw (fun () ->
    Write.with_flow flow @@ fun to_client ->
    let from_client = Read.of_flow flow ~max_size:100 in
    handle_client to_client from_client
  );
  server_loop socket

(* Accept incoming client connections on [socket].
   We can handle multiple clients at the same time.
   Never returns (but can be cancelled). *)
let run socket =
 server_loop  socket

 

main.ml

This file has code that implements

  1. A busy wait loop to continuously send and receive messages.( Commented now )

  2. A RPC call.( This would be the relevant design for implementing the Raft distributed consensus protocol )

open Eio.Std
open Lwt.Infix

let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8080)

module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)
module Echo = Api.Client.Echo

(* Run a server and a test client, communicating using [net]. *)
let main ~net ~clock =
  Switch.run  @@ fun sw ->
  (* We create the listening socket first so that we can be sure it is ready
     as soon as the client wants to use it. *)
  let listening_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
  (* Start the server running in a new fiber.
     Using [fork_daemon] here means that it will be stopped once the client is done
     (we don't wait for it to finish because it will keep accepting new connections forever). *)
  Fiber.fork_daemon ~sw (fun () -> Server.run  listening_socket);
  (* Test the server: *)
  Fiber.fork ~sw ( fun () -> Client.run ~net ~clock ~addr);
  ()

let () =
  Logs.set_level (Some Logs.Warning);
  Logs.set_reporter (Logs_fmt.reporter ())

let () =
(* Eio_main.run @@ fun env -> *)

  (* let net = Eio.Stdenv.net env in *)
  (* let clock = Eio.Stdenv.clock env in *)
  (* main ~net ~clock *)

  Lwt_main.run begin
    let service = Server.local in
    Client.ping service "foo" >>= fun reply ->
    Fmt.pr "Got reply %S@." reply;
    Lwt.return_unit
  end
(* main.ml *)

Consensus module

This module holds the core logic to elect a leader. This is the current state of this module.

open Eio.Std


module  RaftConsensus =
struct

  let get_state = function
    | `Leader -> "leader."
    | `Follower -> "follower."
    | `Candidate -> "candidate."
    | `Dead -> " dead."

  type 'a cmid =  int

  type 'a peerids = 'a list


  (* Mutable term tracker *)
  type  'a currentterm = int ref

  type 'a votedfor =  int

  type 'cms cmstate =
  [ `Leader
  | `Follower
  | `Candidate
  | `Dead
  ]
  type election_reset_time = Mtime.t


  let election_reset_time =
    Eio_main.run @@ fun env ->
    let clock = Eio.Stdenv.clock env in
    let current_time = Eio.Time.now clock in
    Mtime.to_uint64_ns (Mtime.of_uint64_ns (Int64.of_float(current_time *. 1_000_000_000.)))

  let election_timeout =
    let time = Random.int 150 in
    time + Int64.to_int election_reset_time

  let currentterm = ref 0

  let setterm mutex value=
      let termstarted = ref 0 in
      Eio.Mutex.use_rw ~protect:false mutex (fun () -> termstarted := !value);
      termstarted

let periodic_timer () =
  Eio_main.run @@ fun env ->
  let cond = Eio.Condition.create () in
  Fiber.fork (fun () ->
    let rec loop () =
      let timeout = Eio.Time.Timeout.seconds (Eio.Stdenv.mono_clock env) (float_of_int election_timeout) in
      Eio.Time.Timeout.run_exn timeout (fun () ->
        Eio.Condition.broadcast cond;
        Printf.printf "Broadcast\n"
      );
      Fiber.yield ();
      loop ()
    in
    loop ()
  )
end

Timer events

My first attempt to raise a timer event and wait for it needs two loops, a mutex and a Eio.Condition. I did explore other ways to set up a reactive, event-driven program but that doesn’t seem to be built into eio. We have to develop a framework on top of eio. The code does seem complicated and the mutex may not be required. I need to investigate the exact reason for a mutex( Go code I drew inspiration from uses mutexes but eio works with Fibers and domains). I will take a look at this later.

Fibers are lightweight-threads.

  let await_timeout timeout_mutex =
    Eio.Condition.await_no_mutex timeout_mutex;
    traceln "Timeout condition is set"

  
  let periodic_timer () =
   Eio_main.run @@ fun env ->
   let cond = Eio.Condition.create () in
   let clock = Eio.Stdenv.clock env in
   Fiber.both (fun () ->
    let rec loop () =
        Eio.Time.sleep clock 1.0;

        Eio.Condition.broadcast cond;

      Fiber.yield ();
      loop ()
    in
    loop ()
  )
  (fun () ->
    let rec loop () =
       traceln "Waiting for timeout event ";
       await_timeout cond;
       traceln "timeout event ";
      Fiber.yield ();
      loop ()
    in
    loop ()
  );

Election Timer

So the function that executes the election timer based on some conditions is this.

  let checkelection_state mutex timeout_duration term_started =

   Eio.Mutex.use_rw ~protect:false mutex (fun () ->
    (match current_state with
      | state when state <> `Candidate && state <> `Follower  ->
			   traceln  "in election timer state is %s" (get_state state);
      | state ->
			   traceln  "in election timer state is %s" (get_state state);
    );
    traceln "checkelection_state";
	  if term_started <> current_term.currentterm then(
		  traceln "in election timer term changed from %d to %d" term_started
                                                              current_term.currentterm);
    let elapsed_time =  Mtime.span(  now()  ) (Mtime.of_uint64_ns election_reset_event_time) in
    if (Mtime.Span.compare elapsed_time (Mtime.Span.of_uint64_ns (Int64.of_int timeout_duration))  < 0) then
      traceln " timeout_duration  %.3f < elapsed_time %.3f"
        (Mtime.Span.to_float_ns (Mtime.Span.of_uint64_ns (Int64.of_int timeout_duration)))
        (Mtime.Span.to_float_ns elapsed_time)
    else
      traceln " Start election";

    )


  let periodic_timer () =
   Eio_main.run @@ fun env ->
   let timeout_duration = election_timeout in
   let term_started = current_term.currentterm in
   let mutex = Eio.Mutex.create() in
   let cond = Eio.Condition.create () in
   let clock = Eio.Stdenv.clock env in
   Fiber.both (fun () ->
    let rec loop () =
        Eio.Time.sleep clock 1.0;

        Eio.Condition.broadcast cond;

      Fiber.yield ();
      loop ()
    in
    loop ()
  )
  (fun () ->
    let rec loop () =
       traceln "Waiting for timeout event ";
       await_timeout cond;
       checkelection_state mutex timeout_duration term_started;
       traceln "timeout event ";
      Fiber.yield ();
      loop ()
    in
    loop ()
  );

Refactoring timer event code

The key requirement that lead to this is a mechanism to pause and restart the timer at various stages. Further testing may disprove this hypothesis but for now we will try to reuse the timer instead of discarding and starting a new one.

The OCaml eio library has just such a facility. It is cleaner than the original timer shown above. I have shown only that part of the code below.

  let unpaused = ref (Promise.create_resolved ())
  let unpaused_resolver = ref None

  let pause () =
    let p, r  = Promise.create() in
     unpaused := p;
     unpaused_resolver := Some r

 let unpause () =
   match !unpaused_resolver with
   | Some r ->
    Promise.resolve r ()
   | None -> ()

 let periodic_timer env =
   Eio.Switch.run @@ fun _ ->
   let timeout_duration = election_timeout in
   let term_started = current_term.currentterm in
   let mutex = Eio.Mutex.create() in
   let cond = Eio.Condition.create () in
   let clock = Eio.Stdenv.clock env in
   Fiber.both  (fun () ->
     while true do
        Promise.await !unpaused;
        Eio.Condition.broadcast cond;
        Eio.Time.sleep clock 0.5;
      done
  )

  (fun () ->
    let rec loop () =
       traceln "Waiting for timer event ";
       await_timeout cond;
       checkelection_state mutex timeout_duration term_started;

       traceln "timer event ";
      Fiber.yield ();
      loop ()
    in
    loop ()
  );

This code facilitates a mechanism that allows use to pause the timer when we don’t need it and start it again.

Written on November 5, 2024