Elasticsearch Distributed Consistency Principles Analysis (3) — Data

  1. Current issues
  2. Data write process
  3. PacificA algorithm
  4. SequenceNumber, Checkpoint, and failure recovery
  5. Comparing ES and PacificA
  6. Summary

Current Issues

Anyone who has ever used ES knows that each ES Index is divided into multiple Shards. Shards are distributed on different nodes to enable distributed storage and queries and support large-scale datasets. Each Shard has multiple copies, one of which is the Primary node, and the others are Replica nodes. Data is written to the Primary node first then synchronized with Replica nodes from the Primary node. When reading data, to improve read capability, both Primary node and Replica nodes accept read requests.

  1. High data reliability: The data has multiple copies.
  2. High service availability: If the Primary node crashes, a new Primary node can be chosen from the Replica nodes to continue offering services.
  3. Extended read capability: The Primary node and Replica nodes can take read requests.
  4. Failure recovery capability: If the Primary node or Replica nodes crash, there are not enough copies. New copies can be generated by copying the data from the new Primary node.
  1. How is data copied from Primary node to Replica nodes?
  2. Does it need to write to all copies to be successful?
  3. Do Primary node crashes cause data loss?
  4. Is the latest data always read when reading from Replica nodes?
  5. Do I need to copy all Shard data when performing failure recovery?

Data Write Process

First, let us take a look at the data write process.

From the Replication Perspective: Primary -> Replica

From the macro perspective, the ES write process involves writing data to the Primary node first, then concurrently writing it to Replica nodes and finally returning it to the Client. The process is as follows:

String activeShardCountFailure = checkActiveShardCount();
primaryResult = primary.perform(request);
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
private void decPendingAndFinishIfNeeded() {
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
if (pendingActions.decrementAndGet() == 0) {
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure ! = null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primaryResult = primary.perform(request);
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest ! = null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
// We must obtain the replication group after successfully indexing into the primary to follow recovery semantics.
// We must make sure that every operation indexed into the primary after recovery start is also replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// We also must make sure to obtain the global checkpoint before the replication group to ensure that the global checkpoint
// is valid for this replication group. If we sampled in the reverse direction, the global checkpoint might be based on a subset
// of the sampled replication group and advanced further than what the given replication group would allow.
// This would mean that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
successfulShards.incrementAndGet(); // mark primary as successful
"_shards" : {
"total" : 2,
"failed" : 0,
"successful" : 2
ReplicationOperation.java, OnFailure function for failure to write to Replica nodes:            public void onFailure(Exception replicaException) {
(org.apache.logging.log4j.util.Supplier<? >) () -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
if (TransportActions.isShardNotAvailableException(replicaException)) {
} else {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
call failShardIfNeeded: public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
logger.warn((org.apache.logging.log4j.util.Supplier<? >)
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
shardStateAction.remoteShardFailed sends the request to the Master, executes the ShardFailed logic of the Replica, and removes the Shard from InSyncAllocation. public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
if (failedShard.active() && unassignedInfo.getReason() ! = UnassignedInfo.Reason.NODE_LEFT) {
if (failedShard.primary()) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
if (failedShard.active() && failedShard.primary()) {

From the Perspective of the Primary

From the perspective of Primary, a write request is written to Lucene before it is written to translog.

PacificA Algorithm

Proposed by Microsoft Research Asia, PacificA is a distributed consistency algorithm used for log replication systems. The paper defining it was published in 2008 (PacificA paper). ES has officially stated that its Replication model is based on this algorithm.

  1. Has strong consistency.
  2. Synchronizes the data from a single Primary node with multiple Secondary nodes.
  3. Uses additional consistency components for Configuration maintenance.
  4. Supports writes even when a minority of Replica nodes are available.

Glossary Terms

First, let us take a look at some terms used by this algorithm:

  1. Replica Group: A dataset in which each piece of data is a copy of another, and each copy is a Replica node. Only one copy in a Replica Group is the Primary node; the rest are Secondary nodes.
  2. Configuration: Configuration of a Replica Group describes which copies are included in the Replica Group and which one is the Primary.
  3. Configuration Version: The version number of the Configuration. The version number increments by 1 whenever Configuration changes occur.
  4. Configuration Manager: This manages global Configuration components, which ensures the consistency of Configuration data. Configuration change requests are initiated by a Replica node and are then sent to Configuration Manager along with the Version. Configuration Manager verifies that the Version is correct. If not, the change request is rejected.
  5. Query & Update: There are two types of Replica Group operations, Query and Update. Query does not change the data, while Update does.
  6. Serial Number(sn): This represents the order of each Update operation execution. It increments by 1 for every Update operation, and it is a consecutive number.
  7. Prepared List: This is the preparation sequence for Update operations.
  8. Committed List: This is the commit sequence for Update operations. The operations in the commit sequence definitely take effect (unless all copies crash). On the same Replica node, Committed List must come before the Prepared List.

Primary Invariant

With the PacificA algorithm, an error detection mechanism is required to satisfy the following invariant.


The Query process is relatively simple. Queries can only be sent to the Primary node, and the Primary node returns the corresponding values based on the latest committed data. Since this algorithm requires the Primary Invariant condition to be met, Queries always read the latest committed data.


The update process is as follows:

  1. Primary node assigns a Serial Number (sn) to an UpdateRequest.
  2. The Primary node adds this UpdateRequest to its own Prepared List. Meanwhile, it sends the Prepare request to all Secondary nodes, requiring them to add this UpdateRequest to their Prepared Lists.
  3. When all Replica nodes complete Prepare, that is, when the Prepared Lists of all Replica nodes contain the Update request, the Primary node starts to commit the request, adding the UpdateRequest to Committed List and applying the Update. Note that, on the same Replica node, Committed List always comes before the Prepared List, so the Primary node increases the Committed Point when including the Update Request.
  4. The result is returned to the Client, and the Update operation is successful.

Reconfiguration: Secondary Failure, Primary Failure, Newly Added Node

1. Secondary failure

PacificA Algorithm Summary

PacificA is an algorithm with strong consistency that meets both read and write requirements. It separates data consistency from Configuration consistency and uses additional consistency components (Configuration Manager) to maintain configuration consistency. This way, when less than half of the copies of data are available, new data can still be written and strong consistency can be ensured.

SequenceNumber, Checkpoint, and Failure Discovery

PacificA, a consistency algorithm model used by ES, is described above. It is important to note that each PacificA Update operation has a corresponding Serial Number, which indicates the order of execution. In the previous versions of ES, some functionality was limited because each write operation lacked a Serial Number or similar mechanism. In 2015, ES officials began planning to add SequenceNumber for each write operation and assumed there would be many application scenarios.

Term and SequenceNumber

Each write operation is assigned two values: Term and SequenceNumber. Term increments by 1 whenever the Primary changes, which is similar to Configuration Version in the PacificA paper. SequenceNumber increments by 1 after each operation, which is similar to Serial Number in the PacificA paper.

LocalCheckpoint and GlobalCheckpoint

LocalCheckpoint indicates that all requests in this Shard with values less than this value have been processed.

Fast Failure Rcovery

When a Replica node fails, ES removes it. When the failure exceeds a specific period, ES assigns a new Replica node to the new Node. At this point, full data synchronization is needed. But, if the previously failed Replica node returns, simply repopulating the data after the failure recovery and adding the node back once catching up with the records result in fast failure recovery. There are two conditions that must be met to enable fast failure recovery: First, all the operations and their orders during the failure can be saved; second, the node that started data synchronization must be determined. The first condition can be met by saving Translog for a specific amount of time; the second condition can be met using Checkpoint, ultimately achieving fast failure recovery. This is the first important application scenario using SequenceNumber and Checkpoint.

Comparison between Elasticsearch and PacificA


  1. Meta consistency and Data consistency are handled separately: In PacificA, Configuration consistency is maintained through Configuration Manager; in ES, Meta consistency is maintained through Master.
  2. Maintain the copies collection in synchronization: In PacificA, Replica Group is maintained; in ES, InSyncAllocationIds is maintained.
  3. SequenceNumber: In both PacificA and ES, write operations use SequenceNumber to record the operation order.


The main difference is that ES complies with PacificA; however, its implementation still does not meet all the requirements of the algorithm, meaning strict strong consistency is not guaranteed. The key points are as follows:

  1. Meta consistency: We analyzed the Meta consistency issue in ES in the previous section, and we can see that ES cannot guarantee Meta consistency, so it certainly cannot strictly guarantee Data consistency.
  2. Prepare phase: PacificA has the Prepare phase, which ensure that the data is not committed until it is prepared successfully on all nodes and that the committed data is not lost. In ES, the data is written directly, as it lacks this phase.
  3. Read consistency: In ES, all InSync Replica nodes can be read, which improves data readability; however, legacy data may also be read. On the other hand, even if only the Primary node can be read, ES also needs a mechanism like Lease, so that the Old Primary is not read. Given that ES is a near real-time system, the requirement for read consistency may not be very strict.


This article analyzed the consistency issues of the data flow in Elasticsearch. While ES has made substantial progress addressing these issues recently, many issues remain. This article is the last of the Elasticsearch Distributed Consistency Principles Analysis series. This series covers the research, analysis, and summary for ES, with step-by-step details covering node discovery, Master election, Meta consistency, Data consistency, and other aspects.


  1. Index API | Elasticsearch Reference 6.2
  2. Reading and Writing documents | Elasticsearch Reference 6.2
  3. PacificA: Replication in Log-Based Distributed Storage Systems
  4. Add Sequence Numbers to write operations #10708
  5. Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You\



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com