Skip to end of metadata
Go to start of metadata

You are viewing an old version of this content. View the current version.

Compare with Current View Version History

« Previous Version 3 Next »

Show a server is available by periodically sending a message to all the other servers : https://martinfowler.com/articles/patterns-of-distributed-systems/heartbeat.html

Problem

When multiple servers form a cluster, the servers are responsible for storing some portion of the data, based on the partitioning and replication schemes used. Timely detection of server failures is important to make sure corrective actions can be taken by making some other server responsible for handling requests for the data on failed servers.

On this page.

Solution

Periodically send a request to all the other servers indicating liveness of the sending server. Select the request interval to be more than the network round trip time between the servers. All the servers wait for the timeout interval, which is multiple of the request interval to check for the heartbeats.

Timeout Interval > Request Interval > Network round trip time between the servers : It is useful to know the network round trip times within and between datacenters when deciding values for heartbeat interval and timeouts.

e.g. If the network round trip time between the servers is 20ms, the heartbeats can be sent every 100ms, and servers check after 1 second to give enough time for multiple heartbeats to be sent and not get false negatives.

Both the servers, the one sending the heartbeat and the one receiving it, have a scheduler and has to be executed at regular interval.

On the server side : class HeartBeatScheduler…

public class HeartBeatScheduler implements Logging {
      private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
  
      private Runnable action;
      private Long heartBeatInterval;
      public HeartBeatScheduler(Runnable action, Long heartBeatIntervalMs) {
          this.action = action;
          this.heartBeatInterval = heartBeatIntervalMs;
      }
  
      private ScheduledFuture<?> scheduledTask;
      public void start() {
          scheduledTask = executor.scheduleWithFixedDelay(new HeartBeatTask(action), heartBeatInterval, heartBeatInterval, TimeUnit.MILLISECONDS);
      }

On the sending server side, the scheduler executes a method to send heartbeat messages : class SendingServer…

private void sendHeartbeat() throws IOException {
      socketChannel.blockingSend(newHeartbeatRequest(serverId));
  }

On the receiving server, the failure detection mechanism has a similar scheduler started. At regular intervals, it checks if the heartbeat was received or not : class AbstractFailureDetector…

private HeartBeatScheduler heartbeatScheduler = new HeartBeatScheduler(this::heartBeatCheck, 100l);

abstract void heartBeatCheck();
abstract void heartBeatReceived(T serverId);

On the receiving server, failure detector needs 2 methods :

  • A method to be called whenever the receiving server receives the heartbeat, to tell the failure detector that heartbeat is received

class ReceivingServer…

  private void handleRequest(Message<RequestOrResponse> request) {
      RequestOrResponse clientRequest = request.getRequest();
      if (isHeartbeatRequest(clientRequest)) {
          HeartbeatRequest heartbeatRequest = JsonSerDes.deserialize(clientRequest.getMessageBodyJson(), HeartbeatRequest.class);
          failureDetector.heartBeatReceived(heartbeatRequest.getServerId());
          sendResponse(request);
      } else {
          //processes other requests
      }
  }

  • A method to periodically check the heartbeat status and detect possible failures.

In general, the smaller the heartbeat interval, the quicker the failures are detected, but then there is higher probability of false failure detections.

To overcome the high probability of false failure detection, there 2 categories of implementations :

  1. Small clusters - e.g consensus base systems like RAFT, Zookeeper, etc.

  2. Large clusters - Gossip based protocols

Small Clusters - Consensus Based Systems

In all the consensus implementations, Heartbeats are sent from the leader server to all followers servers. Every time a heartbeat is received, the timestamp of heartbeat arrival is recorded.

class TimeoutBasedFailureDetector…
 @Override
  void heartBeatReceived(T serverId) {
      Long currentTime = System.nanoTime();
      heartbeatReceivedTimes.put(serverId, currentTime);
      markUp(serverId);
  }

If no heartbeat is received in a fixed time window, the leader is considered crashed, and a new server is elected as a leader. There are chances of false failure detection because of slow processes or networks. Log generation is used to detect the stale leader : it’s suitable for smaller clusters, typically 3 to 5 node setup which is observed in most consensus implementations like Zookeeper or Raft.

class TimeoutBasedFailureDetector…

  @Override
  void heartBeatCheck() {
      Long now = System.nanoTime();
      Set<T> serverIds = heartbeatReceivedTimes.keySet();
      for (T serverId : serverIds) {
          Long lastHeartbeatReceivedTime = heartbeatReceivedTimes.get(serverId);
          Long timeSinceLastHeartbeat = now - lastHeartbeatReceivedTime;
          if (timeSinceLastHeartbeat >= timeoutNanos) {
              markDown(serverId);
          }
      }
  }

Large Clusters - Gossip Based Protocols

In large clusters, two things need to be considered:

  • Fixed limit on the number of messages generated per server.

  • The total bandwidth consumed by the heartbeat messages. It should not consume a lot of network bandwidth. There should be an upper bound of a few hundred kilo bytes, making sure that too many heartbeat messages do not affect actual data transfer across the cluster.

Failure detectors, along with Gossip protocols for propagating failure information across the cluster are typically used in these situations.

  • No labels