After latency times
Posted: March 28, 2012 Filed under: Thesis work Leave a comment »Blog posts are not my thing, at least not my usual thing. One per month is not a good number. Having said that, I can continue with this one.
I have been looking at how to measure the latency of asynchronous operations, that is the Serialization and Saving of the state of a Processing Element in the S4 platform.
At first I started using YCSB to do this. A simple interface layer could allow me to connect to S4 using an S4 driver, and by sending events to an adapter (100,00 of them), their latency, on insertion, could be measured. Yes… I know, YCSB is not the best tool to use for this, as it is better suited for measuring latency times of “Cloud serving systems” such as Hbase, Voldemort and Redis, but I found that its parameter config allows me to do some useful stuff.
So having done my S4 interface for YCSB I started throwing events at S4. Latency times were amazingly good. An average of 236 microseconds per operation. That seemed to be perfect. BUT, wait, I was not interested in measuring the time it takes for an event to get inserted into the adapter or event to the S4 cluster, I was interested in the time it takes for each of these events to get safely serialized and stored in reliable storage, call it FileSystem, key/value store, etc.
Design choices
There is this moment where you have to start thinking on how to do it. There were two options.
- Log the start of the serialization+checkpointing operation right before the correspondent StateStorage component save operation is called, and on an independent way, log the finish time using the a modified callback class. Getting the results would be a matter of going thorugh the logs, and reading the different lines. One by one and counting.
- What if events are dropped by the platform? This would be ugly, the count should identify each of the elements.
- What if an event is not checkpointed due to disk cache being full at the moment?
- How can I do this for 100,000 events per second, using only one disk. Bash scripting + Error hunting MADNESS!
- Create an intermediate component, in charge of collecting the start time and the end time of each event. Sweet, this sounds good.
How can many threads access this component (which will have static methods, discouraged by everyone) and many concurrent accesses should be done with the lowest latency, i.e. no contention.
MapaDeHashes con una lista a.k.a. HashMap with a list
So I went after this, a common Hashmap for all the checkpoint could be used. So each Processing Element( PE) could be stored using its id. However, one PE will have many checkpoints, so for each PE ID there will be a list of checkpoints (MD5 of the state. startTime, EndTime)
A list will have concurrency issues if it’s not handled using synchronized methods, so for this reason we can use the Collections.synchronizedList wrapper and everytime this list is accesed, it should be done inside a synchronized{} snippet.
The process goes pretty much like this:
- Whenever the state of the PE is about to be saved to storage we identify the PE in the HashMap using its ID
- We look for the size of the list, if 0 create a new one, else check the state ain’t the same, dog!.
- Store the state into the list as an object with its startTime, endtime is null for now.
- Whenever the callback component is triggered, call the setEndTime static method to save the endTime, identifying the element inside the list, and inside the HashMap by the md5 of the state and the PE id respectively
- Return the difference in time.
This sounds reasonable, and actually when using the YCSB to S4 interface it works pretty well, and I have created a script where I can specify the number of different PE ids the S4 platform will handle and I am throwing 100,000 events with 1500 events per second.
Now, another problem arises …
When using the disk storage, the latency is around 2ms with the max value being 50 ms (disk cache full) and no events are lost, they are all successfully checkpointed.
However, when using a key/value store like Redis, 85% of the the checkpoints are discarded when there is only 1 PE id, but the same does not happen when there are 10 or even 100 different PE ids.
StackOverFlow
Why is this happenening? Time for some ASCII art interpretation:
1 PE
___________________
| PE id =1 | HashMap ( [PE id=1, List ( [state,start,end] , ….]) //only 1 PE id
|__________________|
|state|_start_|_end _|
|state|_start_|_end _|
|__________________|
|__________________| n =100,000
==================
PEs > 1
___________________ ___________________
| PE ID =1 | | PE ID =1 |
|__________________| |__________________| HashMap ( [ PE id=1, List ( [state,start,end] ] , …., [ PE id = 100,000/Number of PEs, List[state, start,end] ] )
|state|_start_|_end _| |state|_start_|_end _|
|state|_start_|_end _| |state|_start_|_end _|
|__________________| |__________________|
|__________________| |__________________| n = 100,000 / Number of PEs
So what’s happening is that when the size of the list grows to 100,000 the memory of my desktop computer (4GB) is exhausted and swap begins and all that. But when the size of the list is evenly split between 10 or 100 HashMap IDs, the size of the list becomes smaller and no events are lost.
What can I do?
Well one way would be to change the synchronized list for a synchronized Map and check if that helps me. Another idea is to only have 1 Map and make a common identifier using both the PE id and the md5 of the state.
Suggestions, questions and comments are more than welcome, this is one of the main reasons for writing this post.
Tools of trade?
Posted: February 23, 2012 Filed under: Thesis work Leave a comment »Seems interdependency from other projects will be my thing for the next few months.
Right now I’m just and not only looking and trying to grasp concepts from:
- Gradle[1]: as a build automation tool
- Giuce [2 ]: as a dependency injection framework
- s4-meter[3]: as the stress tool that’s currently being used.
- JMeter[4,5]: to try to and change the stress testing of the S4 platform
- JMeter plugins[6]: as suggested by Marcus (log.scalethis.se)
- Spring Framework[7]: as another dependency injection platform
- and some others include:
- Kryo[8]: serializer/deserializer
- ZooKeeper [9]: coordination tool.
[1] http://gradle.org/docs/current/userguide/tutorial_java_projects.html
[2] http://code.google.com/p/google-guice/wiki/GettingStarted
[3] https://github.com/matthieumorel/s4-meter/
[4] http://jmeter.apache.org/usermanual/boss.html
[5] http://jmeter.apache.org/usermanual/build-web-test-plan.html
[6] http://code.google.com/p/jmeter-plugins/
[7] http://www.springsource.org/
[8] http://code.google.com/p/kryo/wiki/BenchmarksAndComparisons
[9] http://dl.acm.org/citation.cfm?id=1855851
Agreement in Faulty Processes, another reason to toast to Leslie
Posted: February 20, 2012 Filed under: Thesis work | Tags: basics, distributed systems, lamport Leave a comment »Matters are complicated if we demand that a process goruo reaches an agereement. This is done for electinfg a coord, deciding or not whether to commit a Tx, and synchronizing. This is not that simple when comm. and procs. are not perfect. The general goal is to reach consensus, even with the faulty nodes in a finite set of steps.
The two-army problem
Byzantine generals problem
- Step 1(a): general 1 reports 1K troops, 2 reports 2K, 3 lies to everyone and 4 reports 4K troops
- Step 2(b): Results are collected as a vector
- Step 3(c): Every gral. passes his vector from (b) to every other gral. Every gral gets 3 vectors, one from each general. Here general 3 is still a filthy little lier and lies more than ever, with values a-l
- Step 4: Each gral examines the value of each ith value, if any value has a majority, that value is used. If no value has a majority, the position is marked UNKNOWN.
- Result: (1,2,UNKNOWN,4)
In a system with m faulty processes, agreement can be achieved only if 2m+1 correctly functioning processes are present for a total of 3m+1.

Back to basics. Today: Recovery
Posted: February 20, 2012 Filed under: Papers and bibliography | Tags: basics, distributed systems Leave a comment »- Backward recovery: Take a system from a current erroneous state to a previous correct state. In this scenario a checkpointing (record sys state from time to time) mechanism is needed.
- Fwd recovery: When the sys finds an erroneous state, an attempt is made to take the sys to a correct new state. Woo! This one is a hard one to implement. This needs the system to know what errors can occur in advance.
For example. In case of a lost packet in a communication process, bwd recovery would be to retransmit the packet, thus returning to the previous correct state. On the other hand, erasure coding would be the fwd recovery approach. (We, Lalith and me) worked in an erasure coding toy application as part of our “Implementation of Distributed Systems || Advanced Topics in Distributed Systems” in KTH, Sweden. Code is here)
No hay problema? Yeah, right.
- It is costly in terms of perfomance for a Dist. System to return to a previous state.
- As recovery mechanisms are independent of the systems, it is hard to guarantee that the error/s will not happen again.
- Some states cannot be rolled back. Imagine an error leading to a 1000EUR transfer off an account.
To balance some of these problems, some systems combine checkpointing with message logging, since this allows the system to completely recover from process crashes plus, it allows the checkpointing frequency to be lower. Two approaches can be taken regarding message logging:
- sender-based logging: after a checkpoint has been taken, a process logs its messages before sending them off.
- receiver-based logging: the receiving process first logs an incoming message before delivering to the app it’s executing.
A artistic ASCII art representation of the benefit of checkpointing + msg loggin is shown below:
—–0—|-|-|-X–
where:
0 : checkpointed state
| : logged messages
X : process crash.
It is easy to observe that when the system crashes, it can be restored, not only to the checkpointed state, but also to replay the logged messages and actions to recover almost completely.
A survey (of 44 pages!) is available if interested in exploring this topic.
To handle checkpointing there are two main techniques:
- Independent checkpointing: each process/ node handles their own independent checkpointing mechanism. This has the problem of handling inconsistent states when failures occur, since P1 may have checkpointed a state which is not consistent with the one saved by P2 and so on.
- Coordinated Checkpointing: All proces synchronize jointly and write their state to local stable storage. This gives a globally consistent state, where no cascaded rollback can happen (domino effect) at the cost of handling coordination in a distributed system, which gets its own chapter in the book.
Hola!
Posted: January 17, 2012 Filed under: Papers and bibliography | Tags: first post, hola, start Leave a comment »I hope I become used to writing a bit every day starting on Feb 2012.
Or at least, start posting interesting stuff.
Most of the inspiration for posting every now and then is from Lalith.
On the other hand, the idea of writing every day comes from Marcus.
So stay tuned, and I hope you enjoy.