Friday, August 21, 2015

Netty Best Practices Distilled

  • Pipeline optimizations. Write do not make syscalls. Flush makes syscalls to flush out to the socket all previous writes. Limit flushes as much as possible, but also limit writes as well since it need to traverse pipeline.
  • GS-Pressure. Use VoidChannelPromise to reduce object creation if not interested in future result and no need to write listener in any channel outbound handler.
  • Correctly write with respect to slow receivers. Make use of Channel.isWritable() to prevent out of memory error.
  • Configure low and high write watermarks.
  • Pass custom events through pipeline. Good fit for handshake notifications and more.
  • Prefer ByteToMessageDecoder over ReplayingDecoder. ReplayingDecoder is slower because of more overhead in methods and needs to handle ReplayingError. Use ByteToMessageDecoder if it is possible without making things complicated.  
  • Use pooled direct buffers.
  • Write direct buffers… always.
  • Use ByteBufProcessor when need to find pattern in a ByteBuf. It is faster because it can eliminate range checks, can be created and shared, easier to inline by the JIT.
  • Prefer alloc() over Unpooled.
  • Prefer slice() and duplicate() over copy. Since they do not create extra copy of the buffer.
  • Prefer bulk operations over loops. Because otherwise need range checks on each get.
  • Use DefaultByteBufHolder for messages with payload. Gets reference-counting and release resources for free.
  • File transfer. Use zero-memory-copy for efficient transfer of raw file content with DefaultFileRegion.
  • Never block or perform computationally intensive operations on the EventLoop.
  • EventLoop extends ScheduledExecturoService, so use it! Schedule and execute tasks in EventLoop.
  • Reuse EventLoopGroup if you can. Sharing the same EventLoopGroup allows to keep the resource usage (like Thread-usage) to a minimum. 
  • Share EventLoop for proxy like applications to reduce context-switching.
  • Combine operations when call outside EventLoop. To reduce overhead of wakeups and object creation.
  • Operations from inside ChannelHandler. Use shortest path on pipeline if possible.
  • Share ChannelHandlers if stateless.
  • Remove ChannelHandler once not needed anymore. This keeps the pipeline as short as possible and so eliminate overhead of traversing as much as possible.
  • Use proper buffer type in MessageToByteEncoder. This saves extra byte copies.
  • Use auto-read flag to control flow. This can also be quite useful when writing proxy like applications.
  • Don't use JDKs SSLEngine if performance matters. Use Twitters OpenSSL based SSLEngine. Netty will ship it by its own.
  • Prefer inner static classes over anonymous classes for channel future listeners. You never know when ChannelFuture will be notified so it can prevent objects from garbage collection.
  • Native Transport (epoll) for less GC and lower latency. Only works on Linus as epoll is supported atm.


  1. Netty Best Practices a.k.a. Faster == Better (slides) 
  2. Netty Best Practices with Norman Mauer (video)

Wednesday, August 19, 2015

Scalable and Efficient Distributed Failure Detectors

Failure detectors are a central component in fault-tolerant distributed systems running over unreliable asynchronous networks e.g., group membership protocols, supercomputers, computer clusters etc. The ability of the failure detector to detect process failures completely and efficiently, in the presence of unreliable messaging as well as arbitrary process crashes and recoveries, can have a major impact on the performance of these systems

Properties of failure detector:
  • {Strong/Weak} Completeness: is the guarantee that failure of group member will be eventually detected by {all/some} non-faulty members.
  • Strong Accuracy: no non-faulty group member is declared as failed by any other non-faulty member.
It is proved that achieving both strong completeness and strong accuracy is impossible on fault-prone networks. So if required strong completeness then need to accept weak accuracy (some possibility of false positive failure detection) while trying to reduce it to minimum.

Requirements to Failure Detector:
  1. Completeness: satisfy eventual Strong Completeness.
  2. Efficiency:
  • Speed: quick (within some given time T) detection of member failure by some (not all) non-faulty member
  • Accuracy: low probability (below given PM(T) which is much below of probability of message loss Pml) of failure detection mistakes.
 3. Scalabilityequal expected worst-case network load per member

Theorem. Any distributed failure detector algorithm for a group of size n (>> 1) that deterministically satisfies the Completeness, Speed and Accuracy requirements above, for a given values of T and PM(T) (<< Pml), imposes a minimal worst-case network load (messages per time unit, as defined above) of:

Furthermore, there is a failure detector that achieves this minimal worst-case bound while satisfying the Completeness, Speed, Accuracy requirements. L* is thus the optimal worst-case network load required to satisfy the Completeness, Speed, Accuracy requirements.

Heartbeat-based approaches provides completeness, but have shortcomings:
  • Centralized - creates hot-spots and prevent them from scaling
  • Distributed - inherently not very efficient and scalable.

Randomized Distributed Failure Detector Algorithm:
It takes as assumption that list of members are same and already known on the nodes. Member recovers from failure in distinguishable new incarnation. Each message also contains current incarnation number of the sender.

At each member Mi:

Integer pr; /* local period number */

Every T' time units at Mi:
0. pr := pr + 1
1. Select random member Mj from view.
     Send a ping (Mi, Mj, pr) message to Mj
     Wait for the worst case message round trip time for an ack(Mi, Mj, pr) message.
2. If not received ack yet:
     Select k members randomly from view.
     Send each of them a ping-req(Mi,Mj, pr) message
     Wait for an ack (Mi, Mj, pr) until the end of period pr.
3. If not received ack(Mi, Mj, pr) message yet then declare Mj as failed.

Anytime at Mi:
4. On receipt of a ping-req(Mm,Mj, pr) (Mj != Mi)
     Send a ping(Mi, Mj, Mm, pr) message to Mj
     On receipt of an ack(Mi, Mj, Mm, pr) message from Mj
     Send an ack(Mm, Mj, pr) message to Mm

Anytime at Mi:
5. On receipt of a ping(Mm, Mi, Ml, pr) message from member Mm
     Reply with an ack(Mm, Mi, Ml, pr) message to Mm

Anytime at Mi:
6. On receipt of a ping(Mm, Mi, pr) message from member Mm
    Reply with an ack(Mm, Mi, pr) message to Mm

Fig. 1 Example of failure detection protocol period at Mi. This shows all the possible messages that a protocol period may initiate.

Parameters T' and k of the algorithm can be formally calculated based on the given required quality of service parameters: average speed of failure detection T and accuracy PM(T). 

This algorithm has uniform expected network load at all members. The worst-case network load occurs when, every T' time units, each member initiates steps (1-6) in the algorithm. Steps (1, 6) involve at most 2 messages, while steps (2-5) involve at most 4 messages per ping-req target member. Therefore, the worst-case network load imposed by this protocol (in messages/time unit) is:

L = n * [2 + 4 * k] * 1/T'

Which means linear overall network load O(n) produced by running algorithm on all nodes and constant network load at one particular member independent from the group size n.

  1. On Scalable and Efficient Distributed Failure Detectors (2001)

Monday, August 10, 2015

A Gossip-Style Failure Detector

Failure Detector is a valuable component for system management, replication, load balancing, group communication services, various consensus algorithms and many other distributed services which rely on reasonably accurate detection of failed members.

  1. Each member maintains a list with each member's address and heartbeat counter. At each time period Tgossip it increments its own heartbeat counter and sends its list to one random member. Common communication delay (time for sending messages) is typically much smaller then interval between gossiping rounds (Tgossip).
  2. On receive of such gossip message, a member merges the list in the message with its own list by maximum heartbeat counter for each member.
  3. Each member occasionally broadcasts its list in order to be located initially and also recover from network partitions. Decides about broadcast in probabilistic way with increased probability after longer period of time from last received broadcast. Instead of broadcast can be used few gossip servers with well-known addresses. 
  4. Each member also maintains for each other member in the list, the last time that its corresponding heartbeat counter has increased.
  5. If the heartbeat counter wasn't increased for time Tfail the member is considered as failed. But failed members not removed from the list immediately.
  6. Member removed from membership lists after time period Tcleanup.
It is possible to analyse probabilities of gossiping and calculate parameters Tgossip and Tfail with given quality of service requirement using epidemic theory. Proposed algorithm may be optimized to utilize IP address structure in order to reduce an amount of traffics between sub networks.