Raft

An attempt to implement the Raft distributed protocol using OCaml 5 Eio, Effect Handlers, and capnp-rpc, which is a RPC library based on Eio(This is still in a branch waiting to be merged). Everything involved in this has to be learned.

Simple Eio Client/Server and RPC

Client

The details of the setup using dune to build this will eventually be added to the README of the Git repository. Code is now directly posted here.

open Eio.Std
open Capnp_rpc_lwt
open Lwt.Infix
(* Prefix all trace output with "client: " *)
let traceln fmt = traceln ("client: " ^^ fmt)

module Read = Eio.Buf_read
module Write = Eio.Buf_write
module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)
module Echo = Api.Client.Echo

let ping t msg =
  let open Echo.Ping in
  let request, params = Capability.Request.create Params.init_pointer in
  Params.msg_set params msg;
  Capability.call_for_value_exn t method_id request >|= Results.reply_get

let rec loop clock to_server from_server =
   Write.string to_server "Request";
   Write.char to_server ' ';
   Write.string to_server "from client\n";
   Eio.Time.sleep clock 0.5;
   traceln "Waiting for server " ;
   let reply =  Read.line from_server  in
   traceln "Got reply %S" reply;
   Eio.Time.sleep clock 0.5;
   loop clock to_server from_server

let run ~net ~clock ~addr  =
  Eio.Time.sleep clock 0.5;
  traceln "Connecting to server at %a..." Eio.Net.Sockaddr.pp addr;
  Switch.run  @@ fun sw ->
  let flow = Eio.Net.connect ~sw net addr in
  Write.with_flow flow @@ fun to_server ->
  let from_server = Read.of_flow flow ~max_size:100 in
  loop clock to_server from_server

Server

open Eio.Std

(* Prefix all trace output with "server: " *)
let traceln fmt = traceln ("server: " ^^ fmt)

module Read = Eio.Buf_read
module Write = Eio.Buf_write
module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)

open Capnp_rpc_lwt

module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)
module Echo = Api.Client.Echo
let local =
  let module Echo = Api.Service.Echo in
  Echo.local @@ object
    inherit Echo.service

    method ping_impl params release_param_caps =
      let open Echo.Ping in
      let msg = Params.msg_get params in
      release_param_caps ();
      let response, results = Service.Response.create Results.init_pointer in
      Results.reply_set results ("echo:" ^ msg);
      Service.return response
  end
(* Read one line from [client] and respond with "OK". *)
let rec handle_client to_client from_client =
  traceln "Received: %S" (Read.line from_client);
  Write.string to_client "OK\n";
  Write.flush to_client;
  traceln "Written to client ";
  handle_client to_client from_client


let rec server_loop socket =

  Switch.run @@ fun sw ->
  let flow,addr = Eio.Net.accept ~sw socket in
  traceln "Accepted connection from %a" Eio.Net.Sockaddr.pp addr;
  Fiber.fork ~sw (fun () ->
    Write.with_flow flow @@ fun to_client ->
    let from_client = Read.of_flow flow ~max_size:100 in
    handle_client to_client from_client
  );
  server_loop socket

(* Accept incoming client connections on [socket].
   We can handle multiple clients at the same time.
   Never returns (but can be cancelled). *)
let run socket =
 server_loop  socket

 

main.ml

This file has code that implements

  1. A busy wait loop to continuously send and receive messages.( Commented now )

  2. A RPC call.( This would be the relevant design for implementing the Raft distributed consensus protocol )

open Eio.Std
open Lwt.Infix

let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 8080)

module Api = Echo_api.MakeRPC(Capnp_rpc_lwt)
module Echo = Api.Client.Echo

(* Run a server and a test client, communicating using [net]. *)
let main ~net ~clock =
  Switch.run  @@ fun sw ->
  (* We create the listening socket first so that we can be sure it is ready
     as soon as the client wants to use it. *)
  let listening_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
  (* Start the server running in a new fiber.
     Using [fork_daemon] here means that it will be stopped once the client is done
     (we don't wait for it to finish because it will keep accepting new connections forever). *)
  Fiber.fork_daemon ~sw (fun () -> Server.run  listening_socket);
  (* Test the server: *)
  Fiber.fork ~sw ( fun () -> Client.run ~net ~clock ~addr);
  ()

let () =
  Logs.set_level (Some Logs.Warning);
  Logs.set_reporter (Logs_fmt.reporter ())

let () =
(* Eio_main.run @@ fun env -> *)

  (* let net = Eio.Stdenv.net env in *)
  (* let clock = Eio.Stdenv.clock env in *)
  (* main ~net ~clock *)

  Lwt_main.run begin
    let service = Server.local in
    Client.ping service "foo" >>= fun reply ->
    Fmt.pr "Got reply %S@." reply;
    Lwt.return_unit
  end
(* main.ml *)

Consensus module

This is the current state of this module.

(* open Eio.Std *)

module type CONSENSUS = sig

type 'cms cmstate =
  [ `Leader
  | `Follower
  | `Candidate
  | `Dead
   ]
  val election_reset_time : unit -> Mtime.t

end

module  RaftConsensus(Params : CONSENSUS) =
struct

  let get_state = function
    | `Leader -> "leader."
    | `Follower -> "follower."
    | `Candidate -> "candidate."
    | `Dead -> " dead."

  type 'a cmid =  int

	type 'a peerids = 'a list


	type 'a currentterm = int
	type 'a votedfor =  int

	type 'cmstate state
	type election_reset_time = Mtime.t

  let election_reset_time =
    Eio_main.run @@ fun env ->
    let clock = Eio.Stdenv.clock env in
    let current_time = Eio.Time.now clock in
    Mtime.of_uint64_ns (Int64.of_float(current_time *. 1_000_000_000.))

end
Written on November 5, 2024