An attempt to implement the Raft distributed protocol using OCaml 5 Eio, Effect Handlers, and capnp-rpc-lwt, which is a RPC library based on Eio
. Eio allows integration of Lwt framework.
The Eio capnp-rpc library is still in a branch waiting to be merged. This will be a RPC library entirely based on eio .
The older LWT capnp-rpc-lwt is used as Eio can integrate with it. This is done because the code started using Lwt-Eio integration.
The newer capnp-rpc based entirely on Eio will be used later.
Everything involved in this has to be learned. Code is incrementally developed and updated as new logic is added.
Simple Eio Client/Server and LWT RPC
Client
The details of the setup using dune to build this will eventually be added to the README of the Git repository. Code is now directly posted here.
A simple echo RPC call is included to test the capability of the capnp-rpc_lwt library.
open Eio . Std
open Capnp_rpc_lwt
open Lwt . Infix
(* Prefix all trace output with "client: " *)
let traceln fmt = traceln ( "client: " ^^ fmt )
module Read = Eio . Buf_read
module Write = Eio . Buf_write
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
module Echo = Api . Client . Echo
let ping t msg =
let open Echo . Ping in
let request , params = Capability . Request . create Params . init_pointer in
Params . msg_set params msg ;
Capability . call_for_value_exn t method_id request >|= Results . reply_get
let rec loop clock to_server from_server =
Write . string to_server "Request" ;
Write . char to_server ' ' ;
Write . string to_server "from client \n " ;
Eio . Time . sleep clock 0 . 5 ;
traceln "Waiting for server " ;
let reply = Read . line from_server in
traceln "Got reply %S" reply ;
Eio . Time . sleep clock 0 . 5 ;
loop clock to_server from_server
let run ~ net ~ clock ~ addr =
Eio . Time . sleep clock 0 . 5 ;
traceln "Connecting to server at %a..." Eio . Net . Sockaddr . pp addr ;
Switch . run @@ fun sw ->
let flow = Eio . Net . connect ~ sw net addr in
Write . with_flow flow @@ fun to_server ->
let from_server = Read . of_flow flow ~ max_size : 100 in
loop clock to_server from_server
Server
open Eio . Std
(* Prefix all trace output with "server: " *)
let traceln fmt = traceln ( "server: " ^^ fmt )
module Read = Eio . Buf_read
module Write = Eio . Buf_write
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
open Capnp_rpc_lwt
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
module Echo = Api . Client . Echo
let local =
let module Echo = Api . Service . Echo in
Echo . local @@ object
inherit Echo . service
method ping_impl params release_param_caps =
let open Echo . Ping in
let msg = Params . msg_get params in
release_param_caps () ;
let response , results = Service . Response . create Results . init_pointer in
Results . reply_set results ( "echo:" ^ msg );
Service . return response
end
(* Read one line from [client] and respond with "OK". *)
let rec handle_client to_client from_client =
traceln "Received: %S" ( Read . line from_client );
Write . string to_client "OK \n " ;
Write . flush to_client ;
traceln "Written to client " ;
handle_client to_client from_client
let rec server_loop socket =
Switch . run @@ fun sw ->
let flow , addr = Eio . Net . accept ~ sw socket in
traceln "Accepted connection from %a" Eio . Net . Sockaddr . pp addr ;
Fiber . fork ~ sw ( fun () ->
Write . with_flow flow @@ fun to_client ->
let from_client = Read . of_flow flow ~ max_size : 100 in
handle_client to_client from_client
);
server_loop socket
(* Accept incoming client connections on [socket].
We can handle multiple clients at the same time.
Never returns (but can be cancelled). *)
let run socket =
server_loop socket
main.ml
This file has code that implements
A busy wait loop to continuously send and receive messages.( Commented now )
A RPC call.( This would be the relevant design for implementing the Raft distributed consensus protocol )
open Eio . Std
open Lwt . Infix
let addr = `Tcp ( Eio . Net . Ipaddr . V4 . loopback , 8080 )
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
module Echo = Api . Client . Echo
(* Run a server and a test client, communicating using [net]. *)
let main ~ net ~ clock =
Switch . run @@ fun sw ->
(* We create the listening socket first so that we can be sure it is ready
as soon as the client wants to use it. *)
let listening_socket = Eio . Net . listen net ~ sw ~ reuse_addr : true ~ backlog : 5 addr in
(* Start the server running in a new fiber.
Using [fork_daemon] here means that it will be stopped once the client is done
(we don't wait for it to finish because it will keep accepting new connections forever). *)
Fiber . fork_daemon ~ sw ( fun () -> Server . run listening_socket );
(* Test the server: *)
Fiber . fork ~ sw ( fun () -> Client . run ~ net ~ clock ~ addr );
()
let () =
Logs . set_level ( Some Logs . Warning );
Logs . set_reporter ( Logs_fmt . reporter () )
let () =
(* Eio_main.run @@ fun env -> *)
(* let net = Eio.Stdenv.net env in *)
(* let clock = Eio.Stdenv.clock env in *)
(* main ~net ~clock *)
Lwt_main . run begin
let service = Server . local in
Client . ping service "foo" >>= fun reply ->
Fmt . pr "Got reply %S@." reply ;
Lwt . return_unit
end
(* main.ml *)
Consensus module
This module holds the core logic to elect a leader.
This is the current state of this module.
open Eio . Std
module RaftConsensus =
struct
let get_state = function
| `Leader -> "leader."
| `Follower -> "follower."
| `Candidate -> "candidate."
| `Dead -> " dead."
type ' a cmid = int
type ' a peerids = ' a list
(* Mutable term tracker *)
type ' a currentterm = int ref
type ' a votedfor = int
type ' cms cmstate =
[ `Leader
| `Follower
| `Candidate
| `Dead
]
type election_reset_time = Mtime . t
let election_reset_time =
Eio_main . run @@ fun env ->
let clock = Eio . Stdenv . clock env in
let current_time = Eio . Time . now clock in
Mtime . to_uint64_ns ( Mtime . of_uint64_ns ( Int64 . of_float ( current_time *. 1_000_000_000 . )))
let election_timeout =
let time = Random . int 150 in
time + Int64 . to_int election_reset_time
let currentterm = ref 0
let setterm mutex value =
let termstarted = ref 0 in
Eio . Mutex . use_rw ~ protect : false mutex ( fun () -> termstarted := ! value );
termstarted
let periodic_timer () =
Eio_main . run @@ fun env ->
let cond = Eio . Condition . create () in
Fiber . fork ( fun () ->
let rec loop () =
let timeout = Eio . Time . Timeout . seconds ( Eio . Stdenv . mono_clock env ) ( float_of_int election_timeout ) in
Eio . Time . Timeout . run_exn timeout ( fun () ->
Eio . Condition . broadcast cond ;
Printf . printf "Broadcast \n "
);
Fiber . yield () ;
loop ()
in
loop ()
)
end
Timer events
My first attempt to raise a timer event and wait for it needs two loops, a mutex and a Eio.Condition . I did explore
other ways to set up a reactive, event-driven program but that doesn’t seem to be built into eio. We have to develop a framework
on top of eio.
The code does seem complicated and the mutex may not be required. I need to investigate the exact reason for a mutex( Go
code I drew inspiration from uses mutexes but eio works with Fibers and domains). I will take a look at this later.
Fibers are lightweight-threads.
let await_timeout timeout_mutex =
Eio . Condition . await_no_mutex timeout_mutex ;
traceln "Timeout condition is set"
let periodic_timer () =
Eio_main . run @@ fun env ->
let cond = Eio . Condition . create () in
let clock = Eio . Stdenv . clock env in
Fiber . both ( fun () ->
let rec loop () =
Eio . Time . sleep clock 1 . 0 ;
Eio . Condition . broadcast cond ;
Fiber . yield () ;
loop ()
in
loop ()
)
( fun () ->
let rec loop () =
traceln "Waiting for timeout event " ;
await_timeout cond ;
traceln "timeout event " ;
Fiber . yield () ;
loop ()
in
loop ()
);
Election Timer
So the function that executes the election timer based on some conditions is this.
let checkelection_state mutex timeout_duration term_started =
Eio . Mutex . use_rw ~ protect : false mutex ( fun () ->
( match current_state with
| state when state <> `Candidate && state <> `Follower ->
traceln "in election timer state is %s" ( get_state state );
| state ->
traceln "in election timer state is %s" ( get_state state );
);
traceln "checkelection_state" ;
if term_started <> current_term . currentterm then (
traceln "in election timer term changed from %d to %d" term_started
current_term . currentterm );
let elapsed_time = Mtime . span ( now () ) ( Mtime . of_uint64_ns election_reset_event_time ) in
if ( Mtime . Span . compare elapsed_time ( Mtime . Span . of_uint64_ns ( Int64 . of_int timeout_duration )) < 0 ) then
traceln " timeout_duration %.3f < elapsed_time %.3f"
( Mtime . Span . to_float_ns ( Mtime . Span . of_uint64_ns ( Int64 . of_int timeout_duration )))
( Mtime . Span . to_float_ns elapsed_time )
else
traceln " Start election" ;
)
let periodic_timer () =
Eio_main . run @@ fun env ->
let timeout_duration = election_timeout in
let term_started = current_term . currentterm in
let mutex = Eio . Mutex . create () in
let cond = Eio . Condition . create () in
let clock = Eio . Stdenv . clock env in
Fiber . both ( fun () ->
let rec loop () =
Eio . Time . sleep clock 1 . 0 ;
Eio . Condition . broadcast cond ;
Fiber . yield () ;
loop ()
in
loop ()
)
( fun () ->
let rec loop () =
traceln "Waiting for timeout event " ;
await_timeout cond ;
checkelection_state mutex timeout_duration term_started ;
traceln "timeout event " ;
Fiber . yield () ;
loop ()
in
loop ()
);
Refactoring timer event code
The key requirement that lead to this is a mechanism to pause and restart the timer at various stages. Further testing
may disprove this hypothesis but for now we will try to reuse the timer instead of discarding and starting a new one.
The OCaml eio library has just such a facility. It is cleaner than the original timer shown above.
I have shown only that part of the code below.
let unpaused = ref ( Promise . create_resolved () )
let unpaused_resolver = ref None
let pause () =
let p , r = Promise . create () in
unpaused := p ;
unpaused_resolver := Some r
let unpause () =
match ! unpaused_resolver with
| Some r ->
Promise . resolve r ()
| None -> ()
let periodic_timer env =
Eio . Switch . run @@ fun _ ->
let timeout_duration = election_timeout in
let term_started = current_term . currentterm in
let mutex = Eio . Mutex . create () in
let cond = Eio . Condition . create () in
let clock = Eio . Stdenv . clock env in
Fiber . both ( fun () ->
while true do
Promise . await ! unpaused ;
Eio . Condition . broadcast cond ;
Eio . Time . sleep clock 0 . 5 ;
done
)
( fun () ->
let rec loop () =
traceln "Waiting for timer event " ;
await_timeout cond ;
checkelection_state mutex timeout_duration term_started ;
traceln "timer event " ;
Fiber . yield () ;
loop ()
in
loop ()
);
This code facilitates a mechanism that allows use to pause the timer when we don’t need it and start
it again.