Tuesday, September 29, 2015

SWIM Distributed Group Membership Protocol


Many distributed applications require reliable and scalable implementation of membership protocol. Membership protocol provides each process (“member”) of the group with a locally-maintained list of other non-faulty processes in the group. The protocol ensures that the membership list is updated with changes resulting from new members joining the group, or dropping out either voluntarily or through a failure. The SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol) is motivated by unscalability of traditional heart-beating protocols, which either impose network loads that grow quadratically with group size, or compromise response times or false positive frequency of failure detection. It is aiming to implement a membership protocol that provides stable failure detection time, stable rate of false positives and low message load per group member, thus allowing distributed applications that use it to scale well.

Applications which rely on reliability and scalability of the distributed membership maintenance protocol:
  • Reliable multicast
  • Epidemic-style information dissemination (gossip)
  • Distributed databases
  • Publish-subscribe systems
  • Large scale peer-to-peer systems
  • Large scale cooperative gaming
  • Other collaborative distributed applications

Requirements to protocol:
  • Low probability of false positive detection of process fail
  • Do not rely on central server
  • Low network and computational load
  • Low time to detect failure
  • Weakly consistent
Heartbeating-based protocol do not fit well under such requirements since it suffers from scalability limitations. The unscalability of the popular class of all-to-all heartbeating protocols arises from the implicit decision to join two principal functions of the membership problem:
  • Membership update dissemination
  • Failure detection

SWIM address non-scalability problems of traditional protocols for membership maintenance by:
  • Designing the failure detection and membership update dissemination components separately
  • Using a non-heartbeat based strategy for failure detection. It uses random-probing based failure detector protocol.

Properties of SWIM protocol:
  1. Imposes a constant message load per group member;
  2. Detects a process failure in an expected constant time at some non-faulty process in the group;
  3. Provides a deterministic bound (as a function of group size) on the local time that a non-faulty process takes to detect failure of another process;
  4. Propagates membership updates, including information about failures, in gossip-style; the dissemination latency in the group grows slowly (logarithmically) with the number of members;
  5. Provides a mechanism to reduce the rate of false positives by “suspecting” a process before “declaring” it as failed within the group.
While 1-2 are properties of the used failure detection protocol, 3-5 represents properties introduced in membership protocol.

SWIM Protocol Design

SWIM protocol consists of two main components:
  • Failure Detector Component
  • Dissemination Component

Failure Detector Component

Quality of service properties of failure detection:
  • Strong completeness
  • Speed of failure detection
  • Accuracy
  • Network message load
It is proved that strong completeness and strong accuracy is impossible to achieve at the same time over asynchronous unreliable network. However, since a typical distributed application relies on Strong Completeness (in order to maintain up to date information in dynamic groups), most failure detectors, including heartbeating-based solutions, guarantee this property while attempting to maintain a low rate of false positives. SWIM takes the same approach.

SWIM uses Failure detection protocol described at [2] (see Fig. 1).

Fig. 1 SWIM failure detection: Example protocol period at Mi. This shows all the possible messages that a protocol period may initiate. 

Dissemination Component

Basic implementation of Membership Dissemination component consist of:
  • On detecting failed member by failure detector process multicast it to the group. Other members delete failed member.
  • Joined or voluntarily leaving member multicast it in the same way. But on join need to know at least one contact member of the group. 
In order for a new member to join the group it can use one of the following approaches:
  • If the group is associated with a well known server or IP multicast address, all joins could be directed to the associated address.
  • In the absence of such infrastructure, join messages could be broadcasted. Group members will reply with some probability to this broadcast request.

The basic implementation described above shows the general idea of the protocol, but in order to make protocol robust and fault tolerant following improvements may be applied:
  • Do not use multicast to disseminate membership updates, but spread membership information by piggybacking it on top of failure detection protocol (ping, ping-req and ack messages). This approach results in an infection-style of membership updates dissemination, with the associated benefits of robustness to packet losses, and of low latency.
  • Use suspicion mechanism. Process that unresponsive to ping marked as “suspected”. Suspected member treated as normal for ping selection, but if respond then spread alive information. Only after predefined timeout suspected member declared as faulty. This timeout effectively trades off an increase in failure detection time for a reduction in frequency of false failure detections. Suspicion mechanism is applicable to any system with the distinct Failure Detector and Membership Dissemination components.
  • Basic implementation of SWIM protocol guarantee only eventual detection of the failure. Round-Robin probe target selection will guarantee Time Bounded Completeness property; the time interval between the occurrence of a failure and its detection at member Mj is no more than two times the group size in number of protocol periods. This can be solved by the following modification to the protocol. The failure detection protocol at member Mi selecting ping targets not randomly from the list of members, but in a round robin fashion. A newly joining member is inserted in the membership list at a position that is chosen uniformly at random. On completing a traversal of the entire list, Mi rearranges the membership list to a random reordering.

In order to implement gossip-style membership update dissemination each group member Mi maintains a buffer of recent membership updates with count of each buffered element how much it was piggybacked (sent) so far by Mi and is used to choose which element to piggyback next. Each membership update is piggybacked at most some a*log(n) [e.g. 3 * |log(n + 1)|] in order to ensure reliable arrival of gossip to each member of the group.

SWIM can be extended to a Wide-area network (WAN) or a virtual private network (VPN), by weighing ping target choices with topological information, thus reducing bandwidth usage on core network elements inside the network.

SWIM Implementations


Consul is a distributed, highly available, and datacenter-aware service discovery and configuration management system. It uses SWIM as a basis for implementing Gossip Protocol and for managing membership information.


ScaleCube (disclaimer: I am one of the authors) is an open source microservices framework for a rapid development of a distributed, resilient, reactive applications that scales. It uses a variant of SWIM protocol implementation in Java and uses it as a basis for managing the cluster of connected microservices. It uses suspicion mechanism over the failure detector events and also separate membership updates dissemination component. But it introduces separate Gossip Protocol component instead of piggybacking membership updates on top of failure detector messages. It is done in order to reuse gossip component for other platform events and have more fine grained control over time intervals used for gossiping and failure detection pinging. New members to the cluster joins via the configuration provided seed members addresses (it is an analogous of well known servers described above). And also it extends SWIM protocol with the introduction of periodic SYNC messages in order to improve recovery from network partitioning and message losses.

  1. SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol (2002)
  2. Scalable and Efficient Distributed Failure Detectors
  3. Quality of Service of Failure Detectors
  4. Consul
  5. ScaleCube

Thursday, September 10, 2015

Time, Clocks and Ordering in a Distributed System

A distributed system consist of a collection of distinct processes which are spatially separated, and which communicate via messaging. Here considered the concept of events ordering in a distributed systems. First, discussed the partial ordering defined by "happened before" relation and give a distributed algorithm for extending it to a consistent total ordering of all events.

Partial Ordering

The relation "happened before" (->) on the set of events of a system is the smallest relation satisfying the following three conditions:
  • If a and b are events in the same process, and a comes before b, then a -> b.
  • If a is the sending of a message by one process and b is the receipt of the same message by another process, then a -> b.
  • If a -> b and b -> c then a -> c.
Two distinct events a and b are said to be concurrent if a !-> b and b !-> a.

The "happens before" relation describes partial ordering of the events in the system.

Logical Clock can be represented by the following condition (it doesn't specifically means physical clock):

      Clock Condition. For any events a, b: if a -> b then C(a) < C(b). 

Note that converse condition doesn't hold so if C(a) < C(b) then we cannot expect that a -> b and two cases are possible either a -> b or a and b are concurrent events. Which means that by knowing just the clock values you cannot distinguish concurrent events.

From the definition of "happens before", Clock Condition is satisfied if the following two conditions hold:
  • C1. If a and b are events in process Pi, and a comes before b, then Ci(a) < Ci(b).
  • C2. If a is the sending of a message by process Pi and b is the receipt of that message by process Pj, then Ci(a) < Cj(b).

Implementation rules:
  • IR1. Each process Pi increments Ci between any two successive events.
  • IR2. (a) If event a is the sending of a message m by process Pi, then the message m contains a timestamp Tm = Ci(a). (b) Upon receiving a message m, process Pj sets Cj greater than or equal to its present value and greater than Tm.

Java implementation of Clock Condition as an illustration to this post can be found at Logical Clocks repository. The base classes are LogicalTimestamp which represent specific moment of time and LogicalClock class which provides thread-safe methods to store and update time according to implementation rules and conditions described above during events at each process of distributed system. 

Total Ordering

We can use a system of clocks that satisfying Clock Condition to define a total ordering (=>).
If a is an event in process Pi and b is an event in process Pj, then a => b if and only if either:
  • (i) Ci(a) < Cj(b) or 
  • (ii) Ci(a) = Cj(b) and Pi < Pj. 

We can use use the concept of total ordering to solve simple synchronization problem. We wish to find the algorithm for granting access to shared resource to the process which satisfy following conditions:
  • (I) A process which has been granted resource must release it before it can be granted to another process;
  • (II) Different requests for the resource must be granted in the order in which they are made;
  • (III) If every process which is granted the resource eventually release it, then every request is eventually granted.

Algorithm which satisfy conditions I-III and uses concept of total ordering can be described with the following 5 rules:
  1. To request the resource, process Pi sends the request resource message Tm:Pi to every other process, and puts that message on its request queue, where Tm is the timestamp of the message. 
  2. When process Pj receives the request resource message Tm:Pi, it places it on its request queue and sends a (timestamped) acknowledgment message to Pi. 
  3. To release the resource, process Pi removes any Tm:Pi request resource message from its request queue and sends a (timestamped) Pi release resource message to every other process.
  4. When process Pj receives a Pi release resource message, it removes any Tm:Pi request resource message from its request queue.
  5. Process Pi is granted the resource when the following two conditions are satisfied: (i) There is a Tm:Pi request resource message in its request queue which is ordered before any other request in its queue by the total ordering relation (=>); (ii) Pi has received a message from every other process timestamped later than Tm.
It is a distributed algorithm. Each process independently follows these rules to perform an emergent function (in this case granting synchronized access to the shared resource) , and there is no central coordinating process or central storage.

Such approach and principles can be generalized and allow to implement any desired form of multiprocess synchronization in a distributed system. But it depends from operating of all processes and failure of one process will halt other processes. The problem of failure not considered here. 

Anomalous Behavior

There can be an anomalous behaviour if some events which satisfy condition a -> b happens outside the system then system can assign resource access in the wrong order since from the point of view of the system a !-> b which can contradict to expected behaviour by the users. This can be prevented by the use of properly synchronized physical clocks. It can be formally shown how closely the clocks can be synchronized.

  1. Time, Clocks and the Ordering of Events in a Distributed System (1978)
  2. Logical Clocks github repository

Monday, September 7, 2015

Monolithic vs. Microservices Architecture

Monolithic Architecture

When developing a server-side application you can start it with a modular hexagonal or layered architecture which consists of different types of components:
  • Presentation - responsible for handling HTTP requests and responding with either HTML or JSON/XML (for web services APIs).
  • Business logic - the application’s business logic.
  • Database access - data access objects responsible for access the database.
  • Application integration - integration with other services (e.g. via messaging or REST API).
Despite having a logically modular architecture, the application is packaged and deployed as a monolith.

Benefits of Monolithic Architecture
  • Simple to develop.
  • Simple to test. For example you can implement end-to-end testing by simply launching the application and testing the UI with Selenium.
  • Simple to deploy. You just have to copy the packaged application to a server.
  • Simple to scale horizontally by running multiple copies behind a load balancer.
In the early stages of the project it works well and basically most of the big and successful applications which exist today were started as a monolith.

Drawbacks of Monolithic Architecture
  • This simple approach has a limitation in size and complexity. 
  • Application is too large and complex to fully understand and made changes fast and correctly. 
  • The size of the application can slow down the start-up time.
  • You must redeploy the entire application on each update.
  • Impact of a change is usually not very well understood which leads to do extensive manual testing.
  • Continuous deployment is difficult.
  • Monolithic applications can also be difficult to scale when different modules have conflicting resource requirements.
  • Another problem with monolithic applications is reliability. Bug in any module (e.g. memory leak) can potentially bring down the entire process. Moreover, since all instances of the application are identical, that bug will impact the availability of the entire application.
  • Monolithic applications has a barrier to adopting new technologies. Since changes in frameworks or languages will affect an entire application it is extremely expensive in both time and cost.

Microservices Architecture

The idea is to split your application into a set of smaller, interconnected services instead of building a single monolithic application. Each microservice is a small application that has its own hexagonal architecture consisting of business logic along with various adapters. Some microservices would expose a REST, RPC or message-based API and most services consume APIs provided by other services. Other microservices might implement a web UI.

The Microservice architecture pattern significantly impacts the relationship between the application and the database. Instead of sharing a single database schema with other services, each service has its own database schema. On the one hand, this approach is at odds with the idea of an enterprise-wide data model. Also, it often results in duplication of some data. However, having a database schema per service is essential if you want to benefit from microservices, because it ensures loose coupling. Each of the services has its own database. Moreover, a service can use a type of database that is best suited to its needs, the so-called polyglot persistence architecture.

Some APIs are also exposed to the mobile, desktop, web apps. The apps don’t, however, have direct access to the back-end services. Instead, communication is mediated by an intermediary known as an API Gateway. The API Gateway is responsible for tasks such as load balancing, caching, access control, API metering, and monitoring.

The Microservice architecture pattern corresponds to the Y-axis scaling of the Scale Cube model of scalability.

Benefits of Microservices Architecture
  • It tackles the problem of complexity by decomposing application into a set of manageable services which are much faster to develop, and much easier to understand and maintain.
  • It enables each service to be developed independently by a team that is focused on that service.
  • It reduces barrier of adopting new technologies since the developers are free to choose whatever technologies make sense for their service and not bounded to the choices made at the start of the project.
  • Microservice architecture enables each microservice to be deployed independently. As a result, it makes continuous deployment possible for complex applications.
  • Microservice architecture enables each service to be scaled independently.

Drawbacks of Microservices Architecture
  • Microservices architecture adding a complexity to the project just by the fact that a microservices application is a distributed system. You need to choose and implement an inter-process communication mechanism based on either messaging or RPC and write code to handle partial failure and take into account other fallacies of distributed computing.
  • Microservices has the partitioned database architecture. Business transactions that update multiple business entities in a microservices-based application need to update multiple databases owned by different services. Using distributed transactions is usually not an option and you end up having to use an eventual consistency based approach, which is more challenging for developers.
  • Testing a microservices application is also much more complex then in case of monolithic web application. For a similar test for a service you would need to launch that service and any services that it depends upon (or at least configure stubs for those services).
  • It is more difficult to implement changes that span multiple services. In a monolithic application you could simply change the corresponding modules, integrate the changes, and deploy them in one go. In a Microservice architecture you need to carefully plan and coordinate the rollout of changes to each of the services.
  • Deploying a microservices-based application is also more complex. A monolithic application is simply deployed on a set of identical servers behind a load balancer. In contrast, a microservice application typically consists of a large number of services. Each service will have multiple runtime instances. And each instance need to be configured, deployed, scaled, and monitored. In addition, you will also need to implement a service discovery mechanism. Manual approaches to operations cannot scale to this level of complexity and successful deployment a microservices application requires a high level of automation.


Building complex applications is inherently difficult. A Monolithic architecture better suits simple, lightweight applications. There are opinions which suggest to start from the monolith first and others which recommend not to start with monolith when your goal is a microservices architecture. But anyway it is important to understand Monolithic architecture since it is the basis for microservices architecture where each service by itself is implemented according to monolithic architecture. The Microservices architecture pattern is the better choice for complex, evolving applications. Actually the microservices approach is all about handling a complex system, but in order to do so the approach introduces its own set of complexities and implementation challenges.

  1. Introduction to Microservices
  2. The Scale Cube
  3. Microservices Architecture Pattern
  4. Monolithic Architecture Pattern
  5. Microservices Guide
  6. Monolith First
  7. Don't start with a monolith
  8. The Evolving Panorama of Data
  9. API Gateway Pattern
  10. Testing Strategies in a Microservice Architecture
  11. Microservice Premium 
  12. Distributed vs. Non-Distributed Computing
  13. Fallacies of Distributed Computing