An attempt to implement the Raft distributed protocol using OCaml 5 Eio, Effect Handlers, and capnp-rpc, which is a RPC library based on Eio(This is still in a branch
waiting to be merged). Everything involved in this has to be learned.
Simple Eio Client/Server and RPC
Client
The details of the setup using dune to build this will eventually be added to the README of the Git repository. Code is now directly posted here.
open Eio . Std
open Capnp_rpc_lwt
open Lwt . Infix
(* Prefix all trace output with "client: " *)
let traceln fmt = traceln ( "client: " ^^ fmt )
module Read = Eio . Buf_read
module Write = Eio . Buf_write
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
module Echo = Api . Client . Echo
let ping t msg =
let open Echo . Ping in
let request , params = Capability . Request . create Params . init_pointer in
Params . msg_set params msg ;
Capability . call_for_value_exn t method_id request >|= Results . reply_get
let rec loop clock to_server from_server =
Write . string to_server "Request" ;
Write . char to_server ' ' ;
Write . string to_server "from client \n " ;
Eio . Time . sleep clock 0 . 5 ;
traceln "Waiting for server " ;
let reply = Read . line from_server in
traceln "Got reply %S" reply ;
Eio . Time . sleep clock 0 . 5 ;
loop clock to_server from_server
let run ~ net ~ clock ~ addr =
Eio . Time . sleep clock 0 . 5 ;
traceln "Connecting to server at %a..." Eio . Net . Sockaddr . pp addr ;
Switch . run @@ fun sw ->
let flow = Eio . Net . connect ~ sw net addr in
Write . with_flow flow @@ fun to_server ->
let from_server = Read . of_flow flow ~ max_size : 100 in
loop clock to_server from_server
Server
open Eio . Std
(* Prefix all trace output with "server: " *)
let traceln fmt = traceln ( "server: " ^^ fmt )
module Read = Eio . Buf_read
module Write = Eio . Buf_write
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
open Capnp_rpc_lwt
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
module Echo = Api . Client . Echo
let local =
let module Echo = Api . Service . Echo in
Echo . local @@ object
inherit Echo . service
method ping_impl params release_param_caps =
let open Echo . Ping in
let msg = Params . msg_get params in
release_param_caps () ;
let response , results = Service . Response . create Results . init_pointer in
Results . reply_set results ( "echo:" ^ msg );
Service . return response
end
(* Read one line from [client] and respond with "OK". *)
let rec handle_client to_client from_client =
traceln "Received: %S" ( Read . line from_client );
Write . string to_client "OK \n " ;
Write . flush to_client ;
traceln "Written to client " ;
handle_client to_client from_client
let rec server_loop socket =
Switch . run @@ fun sw ->
let flow , addr = Eio . Net . accept ~ sw socket in
traceln "Accepted connection from %a" Eio . Net . Sockaddr . pp addr ;
Fiber . fork ~ sw ( fun () ->
Write . with_flow flow @@ fun to_client ->
let from_client = Read . of_flow flow ~ max_size : 100 in
handle_client to_client from_client
);
server_loop socket
(* Accept incoming client connections on [socket].
We can handle multiple clients at the same time.
Never returns (but can be cancelled). *)
let run socket =
server_loop socket
main.ml
This file has code that implements
A busy wait loop to continuously send and receive messages.( Commented now )
A RPC call.( This would be the relevant design for implementing the Raft distributed consensus protocol )
open Eio . Std
open Lwt . Infix
let addr = `Tcp ( Eio . Net . Ipaddr . V4 . loopback , 8080 )
module Api = Echo_api . MakeRPC ( Capnp_rpc_lwt )
module Echo = Api . Client . Echo
(* Run a server and a test client, communicating using [net]. *)
let main ~ net ~ clock =
Switch . run @@ fun sw ->
(* We create the listening socket first so that we can be sure it is ready
as soon as the client wants to use it. *)
let listening_socket = Eio . Net . listen net ~ sw ~ reuse_addr : true ~ backlog : 5 addr in
(* Start the server running in a new fiber.
Using [fork_daemon] here means that it will be stopped once the client is done
(we don't wait for it to finish because it will keep accepting new connections forever). *)
Fiber . fork_daemon ~ sw ( fun () -> Server . run listening_socket );
(* Test the server: *)
Fiber . fork ~ sw ( fun () -> Client . run ~ net ~ clock ~ addr );
()
let () =
Logs . set_level ( Some Logs . Warning );
Logs . set_reporter ( Logs_fmt . reporter () )
let () =
(* Eio_main.run @@ fun env -> *)
(* let net = Eio.Stdenv.net env in *)
(* let clock = Eio.Stdenv.clock env in *)
(* main ~net ~clock *)
Lwt_main . run begin
let service = Server . local in
Client . ping service "foo" >>= fun reply ->
Fmt . pr "Got reply %S@." reply ;
Lwt . return_unit
end
(* main.ml *)
Consensus module
This is the current state of this module.
(* open Eio.Std *)
module type CONSENSUS = sig
type ' cms cmstate =
[ `Leader
| `Follower
| `Candidate
| `Dead
]
val election_reset_time : unit -> Mtime . t
end
module RaftConsensus ( Params : CONSENSUS ) =
struct
let get_state = function
| `Leader -> "leader."
| `Follower -> "follower."
| `Candidate -> "candidate."
| `Dead -> " dead."
type ' a cmid = int
type ' a peerids = ' a list
type ' a currentterm = int
type ' a votedfor = int
type ' cmstate state
type election_reset_time = Mtime . t
let election_reset_time =
Eio_main . run @@ fun env ->
let clock = Eio . Stdenv . clock env in
let current_time = Eio . Time . now clock in
Mtime . of_uint64_ns ( Int64 . of_float ( current_time *. 1_000_000_000 . ))
end