After latency times

Blog posts are not my thing, at least not my usual thing. One per month is not a good number. Having said that, I can continue with this one.

I have been looking at how to measure the latency of asynchronous operations, that is the Serialization and Saving of the state of a Processing Element in the S4 platform.

At first I started using YCSB to do this. A simple interface layer could allow me to connect to S4 using an S4 driver, and by sending events to an adapter (100,00 of them), their latency, on insertion, could be measured. Yes… I know, YCSB is not the best tool to use for this, as it is better suited for measuring latency times of “Cloud serving systems” such as Hbase, Voldemort and Redis, but I found that its parameter config allows me to do some useful stuff.

So having done my S4 interface for YCSB I started throwing events at S4. Latency times were amazingly good. An average of 236 microseconds per operation. That seemed to be perfect. BUT, wait, I was not interested in measuring the time it takes for an event to get inserted into the adapter or event to the S4 cluster, I was interested in the time it takes for each of these events to get safely serialized and stored in reliable storage, call it FileSystem, key/value store, etc.

Design choices

There is this moment where you have to start thinking on how to do it. There were two options.

  1. Log the start of the serialization+checkpointing operation right before the correspondent StateStorage component save operation is called, and on an independent way,  log the finish time using the a modified callback class. Getting the results would be a matter of going thorugh the logs, and reading the different lines. One by one and counting.
    • What if events are dropped by the platform? This would be ugly, the count should identify each of the elements.
    • What if an event is not checkpointed due to disk cache being full at the moment?
    • How can I do this for 100,000 events per second, using only one disk. Bash scripting  + Error hunting MADNESS!
  2. Create an intermediate component, in charge of collecting the start time and the end time of each event. Sweet, this sounds good.
    How can many threads access this component (which will have static methods, discouraged by everyone) and many concurrent accesses should be done with the lowest latency, i.e. no contention.

MapaDeHashes con una lista a.k.a. HashMap with a list

So I went after this, a common Hashmap for all  the checkpoint could be used. So each Processing Element( PE) could be stored using its id. However, one PE will have many checkpoints, so for each PE ID there will be a list of checkpoints (MD5 of the state. startTime, EndTime)

A list will have concurrency issues if it’s not handled using synchronized methods, so for this reason we can use the Collections.synchronizedList wrapper and everytime this list is accesed, it should be done inside a synchronized{} snippet.

The process goes pretty much like this:

  • Whenever the state of the PE is about to be saved to storage we identify the PE in the HashMap using its ID
  • We look for the size of the list, if 0 create a new one, else check the state ain’t the same, dog!.
  • Store the state into the list as an object with its startTime,  endtime is null for now.
  • Whenever the callback component is triggered, call the setEndTime static method to save the endTime, identifying the element inside the list, and inside the HashMap by the md5 of the state and the PE id respectively
  • Return the difference in time.

This sounds reasonable, and actually when using the YCSB to S4 interface it works pretty well, and I have created a script where I can specify the number of different PE ids the S4 platform will handle and I am throwing 100,000 events with 1500 events per second.
Now, another problem arises …

When using the disk storage, the latency is around 2ms with the max value being 50 ms (disk cache full) and no events are lost, they are all successfully checkpointed.

However, when using a key/value store like Redis,  85% of the the checkpoints are  discarded when there is only 1 PE id,  but the same does not happen when there are 10 or even 100 different PE ids.

StackOverFlow

Why is this happenening? Time for some ASCII art interpretation:

1 PE
___________________
|         PE id =1           |      HashMap ( [PE id=1, List ( [state,start,end] , ….]) //only 1 PE id
|__________________|

|state|_start_|_end _|

|state|_start_|_end _|
|__________________|

|__________________| n =100,000

==================

PEs > 1
___________________      ___________________
|         PE ID =1           |    |         PE ID =1           |
|__________________|    |__________________|   HashMap ( [ PE id=1, List ( [state,start,end] ] , …., [ PE id = 100,000/Number of PEs, List[state, start,end] ] )

|state|_start_|_end _|    |state|_start_|_end _|

|state|_start_|_end _|    |state|_start_|_end _|
|__________________|    |__________________|

|__________________|   |__________________| n = 100,000 / Number of PEs

So what’s happening is that when the size of the list grows to 100,000 the memory of my desktop computer (4GB) is exhausted and swap begins and all that. But when the size of the list is evenly split between 10 or 100 HashMap IDs, the size of the list becomes smaller and no events are lost.

What can I do?

Well one way would be to change the synchronized list for a synchronized Map and check if that helps me. Another idea is to only have 1 Map and make a common identifier using both the PE id and the md5 of the state.

Suggestions, questions and comments are more than welcome, this is one of the main reasons for writing this post.


Tools of trade?

Seems interdependency from other projects will be my thing for the next few months.
Right now I’m just and not only looking and trying to grasp concepts from:

  • Gradle[1]: as a build automation tool
  • Giuce [2 ]: as a dependency injection framework
  • s4-meter[3]: as the stress tool that’s currently being used.
  • JMeter[4,5]: to try to and change the stress testing of the S4 platform
  • JMeter plugins[6]: as suggested by Marcus (log.scalethis.se)
  • Spring Framework[7]: as another dependency injection platform
  • and some others include:
    • Kryo[8]: serializer/deserializer
    • ZooKeeper [9]: coordination tool.

[1] http://gradle.org/docs/current/userguide/tutorial_java_projects.html

[2] http://code.google.com/p/google-guice/wiki/GettingStarted
[3] https://github.com/matthieumorel/s4-meter/

[4] http://jmeter.apache.org/usermanual/boss.html
[5] http://jmeter.apache.org/usermanual/build-web-test-plan.html

[6] http://code.google.com/p/jmeter-plugins/
[7] http://www.springsource.org/
[8] http://code.google.com/p/kryo/wiki/BenchmarksAndComparisons

[9] http://dl.acm.org/citation.cfm?id=1855851


Agreement in Faulty Processes, another reason to toast to Leslie

Matters are complicated if we demand that a process goruo reaches an agereement. This is done for electinfg a coord, deciding or not whether to commit a Tx, and synchronizing. This is not that simple when comm. and procs. are not perfect. The general goal is to reach consensus, even with the faulty nodes in a finite set of steps.

The two-army problem

Red army 5000 troops is in a valley, let’s say Val d’Nuria.
Two blue armies are camped on the hills overlooking the valley, and are 3000 strong
If both blue armies attack at the same time, they will result victorious, however if only one attacks, math says:
5000 vs. 3000 = Slaughter!
The problem is that the communication channel is unreliable, the blue army messenger can be captured by the red army on his way down the hill to the other side. It’s all about ACKs! They may never know when the ACK is the final ACK, or when this messenger got captured.

Byzantine generals problem

Now comm. channels are reliable but the processes are not. Now there are n blue armies around Val d’Nuria. Communication is done by cellphone, as the generals all like Smartphones and so. However, among the generals there are m traitors (faulty) among the generals, and these are continually feeding the line with contradictory information.
The goal is to exchange information about the size of each army. So each general exchanges its information and by the end, they will each have a vector of length n corresponding to all the armies. If gral i is loyal then the element i is his troop strength, otherwise it’s undefined.
An algorithm by good old Leslie Lamport et al. is shown below, with n=4 and m=1:
  • Step 1(a): general 1 reports 1K troops, 2 reports 2K, 3 lies to everyone and 4 reports 4K troops
  • Step 2(b):  Results are collected as a vector
  • Step 3(c): Every gral. passes his vector from (b) to every other gral. Every gral gets 3 vectors, one from each general. Here general 3 is still a filthy little lier and lies more than ever, with values a-l
  • Step 4: Each gral examines the value of each ith value, if any value has a majority, that value is used. If no value has a majority, the position is marked UNKNOWN.
  • Result: (1,2,UNKNOWN,4)

In a system with m faulty processes, agreement can be achieved only if 2m+1 correctly functioning processes are present for a total of 3m+1.

If you like this post and nerdy apparel. Check this out!
http://www.spreadshirt.com/never-trust-byzantine-generals-bags-C3376A7439092

Back to basics. Today: Recovery

My thesis work heavily involves dealing with the recovery process of the system.
Here on I have extracted some interesting points from Tanenbaum et al. “Distributed Systems
Recovery
Recovery, from an error (where the error is the part of the system that may lead to an failure), is fundamental to fault tolerant systems.
There can be two types of recovery mechanisms:
  • Backward recovery: Take a system from a current erroneous state to a previous correct state. In this scenario a checkpointing (record sys state from time to time) mechanism is needed.
  • Fwd recovery: When the sys finds an erroneous state, an attempt is made to take the sys to a correct new state. Woo! This one is a hard one to implement. This needs the system to know what errors can occur in advance.

For example. In case of a lost packet in a communication process, bwd recovery would be to retransmit the packet, thus returning to the previous correct state. On the other hand, erasure coding would be the fwd recovery approach. (We, Lalith and me) worked in an erasure coding toy application as part of our “Implementation of Distributed Systems || Advanced Topics in Distributed Systems” in KTH, Sweden. Code is here)

No hay problema? Yeah, right.

  • It is costly in terms of perfomance for a Dist. System to return to a previous state.
  • As recovery mechanisms are independent of the systems, it is hard to guarantee that the error/s will not happen again.
  • Some states cannot be rolled back. Imagine an error leading to a 1000EUR transfer off an account.

To balance some of these problems, some systems combine checkpointing with message logging, since this allows the system to completely recover from process crashes plus, it allows the checkpointing frequency to be lower. Two approaches can be taken regarding message logging:

  1. sender-based logging: after a checkpoint has been taken, a process logs its messages before sending them off.
  2. receiver-based logging: the receiving process first logs an incoming message before delivering to the app it’s executing.

A artistic ASCII art representation of the benefit of checkpointing + msg loggin is shown below:

—–0—|-|-|-X
where:
0 : checkpointed state
| : logged messages
X : process crash.

It is easy to observe that when the system crashes, it can be restored, not only to the checkpointed state, but also to replay the logged messages and actions to recover almost completely.

A survey (of 44 pages!) is available if interested in exploring this topic.

To handle checkpointing there are two main techniques:

  • Independent checkpointing: each process/ node handles their own independent checkpointing mechanism. This has the problem of handling inconsistent states when failures occur, since P1 may have checkpointed a state which is not consistent with the one saved by P2 and so on.
  • Coordinated Checkpointing: All proces synchronize jointly and write their state to local stable storage. This gives a globally consistent state, where no cascaded rollback can happen (domino effect) at the cost of handling coordination in a distributed system, which gets its own chapter in the book.

Hola!

I hope I  become used to writing a bit every day starting on Feb 2012.
Or at least, start posting interesting stuff.

Most of the inspiration for posting every now and then is from Lalith.

On the other hand, the idea of writing every day comes from Marcus.

So stay tuned, and I hope you enjoy.

:D


Follow

Get every new post delivered to your Inbox.