After latency times

Blog posts are not my thing, at least not my usual thing. One per month is not a good number. Having said that, I can continue with this one.

I have been looking at how to measure the latency of asynchronous operations, that is the Serialization and Saving of the state of a Processing Element in the S4 platform.

At first I started using YCSB to do this. A simple interface layer could allow me to connect to S4 using an S4 driver, and by sending events to an adapter (100,00 of them), their latency, on insertion, could be measured. Yes… I know, YCSB is not the best tool to use for this, as it is better suited for measuring latency times of “Cloud serving systems” such as Hbase, Voldemort and Redis, but I found that its parameter config allows me to do some useful stuff.

So having done my S4 interface for YCSB I started throwing events at S4. Latency times were amazingly good. An average of 236 microseconds per operation. That seemed to be perfect. BUT, wait, I was not interested in measuring the time it takes for an event to get inserted into the adapter or event to the S4 cluster, I was interested in the time it takes for each of these events to get safely serialized and stored in reliable storage, call it FileSystem, key/value store, etc.

Design choices

There is this moment where you have to start thinking on how to do it. There were two options.

  1. Log the start of the serialization+checkpointing operation right before the correspondent StateStorage component save operation is called, and on an independent way,  log the finish time using the a modified callback class. Getting the results would be a matter of going thorugh the logs, and reading the different lines. One by one and counting.
    • What if events are dropped by the platform? This would be ugly, the count should identify each of the elements.
    • What if an event is not checkpointed due to disk cache being full at the moment?
    • How can I do this for 100,000 events per second, using only one disk. Bash scripting  + Error hunting MADNESS!
  2. Create an intermediate component, in charge of collecting the start time and the end time of each event. Sweet, this sounds good.
    How can many threads access this component (which will have static methods, discouraged by everyone) and many concurrent accesses should be done with the lowest latency, i.e. no contention.

MapaDeHashes con una lista a.k.a. HashMap with a list

So I went after this, a common Hashmap for all  the checkpoint could be used. So each Processing Element( PE) could be stored using its id. However, one PE will have many checkpoints, so for each PE ID there will be a list of checkpoints (MD5 of the state. startTime, EndTime)

A list will have concurrency issues if it’s not handled using synchronized methods, so for this reason we can use the Collections.synchronizedList wrapper and everytime this list is accesed, it should be done inside a synchronized{} snippet.

The process goes pretty much like this:

  • Whenever the state of the PE is about to be saved to storage we identify the PE in the HashMap using its ID
  • We look for the size of the list, if 0 create a new one, else check the state ain’t the same, dog!.
  • Store the state into the list as an object with its startTime,  endtime is null for now.
  • Whenever the callback component is triggered, call the setEndTime static method to save the endTime, identifying the element inside the list, and inside the HashMap by the md5 of the state and the PE id respectively
  • Return the difference in time.

This sounds reasonable, and actually when using the YCSB to S4 interface it works pretty well, and I have created a script where I can specify the number of different PE ids the S4 platform will handle and I am throwing 100,000 events with 1500 events per second.
Now, another problem arises …

When using the disk storage, the latency is around 2ms with the max value being 50 ms (disk cache full) and no events are lost, they are all successfully checkpointed.

However, when using a key/value store like Redis,  85% of the the checkpoints are  discarded when there is only 1 PE id,  but the same does not happen when there are 10 or even 100 different PE ids.


Why is this happenening? Time for some ASCII art interpretation:

1 PE
|         PE id =1           |      HashMap ( [PE id=1, List ( [state,start,end] , ….]) //only 1 PE id

|state|_start_|_end _|

|state|_start_|_end _|

|__________________| n =100,000


PEs > 1
___________________      ___________________
|         PE ID =1           |    |         PE ID =1           |
|__________________|    |__________________|   HashMap ( [ PE id=1, List ( [state,start,end] ] , …., [ PE id = 100,000/Number of PEs, List[state, start,end] ] )

|state|_start_|_end _|    |state|_start_|_end _|

|state|_start_|_end _|    |state|_start_|_end _|
|__________________|    |__________________|

|__________________|   |__________________| n = 100,000 / Number of PEs

So what’s happening is that when the size of the list grows to 100,000 the memory of my desktop computer (4GB) is exhausted and swap begins and all that. But when the size of the list is evenly split between 10 or 100 HashMap IDs, the size of the list becomes smaller and no events are lost.

What can I do?

Well one way would be to change the synchronized list for a synchronized Map and check if that helps me. Another idea is to only have 1 Map and make a common identifier using both the PE id and the md5 of the state.

Suggestions, questions and comments are more than welcome, this is one of the main reasons for writing this post.


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s