Redis HyperLogLog and KMinHash performance

The last few blog posts explored the topic of counting unique items efficiently using two specific sketching techniques – HyperLogLogs (HLLs) and KMinHash. The underlying motivation of these techniques was to use probabilistic data structures for counting high cardinality data sets, with a focus on being efficient in both time and space, trading off some accuracy in the counts. For high cardinality data sets, this is a reasonable tradeoff in some domains. We saw how HLLs in Redis provided unique counts with an error of about 0.18% with a bounded 12KB memory size per key. We also saw how well additional operations like unions and intersections fared with HLLs, and how KMinHash provided a more accurate measure over HLLs for intersection operations.

One of the strong advantages of sketching techniques is their efficiency vis-a-vis time and space system measures. Therefore, while we concluded that KMinHash provided more accurate results over HLLs for intersections, it would be good to set it in context alongside a performance comparison so that tradeoffs can be made between accuracy and performance. The purpose of this blog post is to cover the system performance measures of the two methods, using Redis as a store for the counts.

Context for the performance tests

All tests involved two sets: Set A: 175,000 elements, Set B: 10,000 elements, and their intersection Set A n B: 7,500 elements. The elements were added to Redis data structures using Python client code.

The test code operated in two phases. The first added the elements to keys representing the HLL and KMinHash sets. Once all additions were completed, the second phase computed the intersection cardinality using HLL Inclusion/Exclusion principle and the KMinHash method, respectively. The details of the HLL based implementation have been covered in the first and second posts. The KMinHash algorithm has been covered in the third post. Readers can review those posts to familiarise themselves with the details.

The tests were performed on a MacBook Pro 1.6 GHz Intel Core i5 processor, 4 GB 1600 MHz DDR3 RAM. Redis version was 3.0.3 compiled from source, and started with default configuration (at least, as far as the performance related configuration goes). The test code used Python Redis client 2.10.3.

Implementation details

Counting with HLLs

For HLLs, the add phase used PFADD with Redis pipeline mechanism, and a pipeline batch size of 10,000. The compute phase merged the HLL keys using PFMERGE to compute A u B and then computed the intersection count using the Inclusion/Exclusion principle.

Here’s how the add phase looks like:

Counting with KMinHash

For KMinHash, recall that the algorithm was implemented using Redis sorted sets storing the IDs as items in the set sorted according to their hashes (which acted as scores). The add phase added/updated elements in the Redis sorted sets. The compute phase computed the Jaccard coefficient estimate using the algorithm described in post 3, and from there computed A n B cardinality.

At the time of computing the Jaccard coefficient, we should only consider the ‘k’ minimum values of the MinHash sets. However, the sets may have more than ‘k’ elements during the add phase. This gives us a knob to tradeoff between time and memory. For example, we could either keep exactly ‘k’ elements at all times thereby making sure that memory is bounded. This does require more operations in the add phase to ensure the cardinality is maintained (via a ZREM operation, for instance). The motivation to keep the memory bounded might come if we need to maintain a lot of such MinHash sets (say, for different dimensions being measured) and cumulatively, the amount of memory might shoot up very high. On the other hand, we could allow the memory to be slightly unbounded, but make the add operation very fast. This could be a valid strategy if the number of additions is going to happen very fast and saving on time is crucial.

Based on the above choices, I tried three different approaches for implementing KMinHash.

  • Optimise for time (time-optimised): Add multiple elements as a batch using the Redis pipeline mechanism, and truncate the batch to the size of ‘k’ once we have added the batch. Note that in this approach, the MinHash set’s size could grow beyond ‘k’ (depending on the size of the batch).

Here’s how the add phase looks using batch addition. Note the cardinality adjustment at the end of the batch.

  • Optimise for memory (mem-optimised): Bound the cardinality of the KMinHash sets to ‘k’ at add time itself. We do this by truncating the MinHash set to ‘k’ elements after any addition that potentially increases the set’s size. We maintain some state on the client – the current cardinality of the sorted set and the current max MinHash value. This state acts as a cache to help avoid some calls to the Redis server.

This is how the mem-optimised version looks. Note the cardinality adjustment after every add post ‘k’ elements. The local state is maintained in variables like elements_added and max_min_hash

  • Server side scripting (scripting): Redis has a mechanism to execute something akin to stored procedures of a database, by writing them using the LUA scripting language. In this method, define a LUA script that updates the KMinHash set keeping the cardinality bounded to ‘k’. Call this script in pipeline mode during the add phase using the Redis command EVAL or EVALSHA.

Here’s the Lua script that is loaded and executed in the Redis server process. Note how the cardinality is adjusted after every addition post ‘k’ elements. The difference with the mem-optimised approach is that all state is maintained in Redis itself.

The native support for HLL in Redis acts as an advantage and it should be intuitively clear that HLLs score better than KMinHash overall. So, this is not really to show whether KMinHash is better than HLL (which it is not), but to illustrate the comparative system measures for similar cardinality sets in both approaches, as also among the various KMinHash implementation strategies.

Time comparison

Things to measure here included the time indicators between HLL and KMinHash implementations, and also across the various KMinHash implementations. To compare various KMinHash implementations, the high level ‘time’ command was used. The tests were run multiple times to see stability of the time measures across different data sets. The results of the same are as below:

  • KMinHash – time-optimised: 8.5 seconds (average real time)
  • KMinHash – mem-optimised: 13.85 seconds (average real time)
  • KMinHash – scripting: 11.2 seconds (average real time)

Note that this time includes the add phase and compute phase; however, since the HLL addition and cardinality computation is fixed, the time difference is only accounted for by the various KMinHash strategies used.

To compare times between HLL and KMinHash specifically, the Python profiler cProfile was used and the cumulative time measured across individual calls. The results are as below:

  • HLL addition: 6.1 seconds
  • KMinHash addition – time-optimised: 7.2 seconds
  • KMinHash addition – mem-optimised: 12.9 seconds
  • KMinHash addition – scripting: 10.2 seconds
  • HLL intersection: 0
  • KMinHash intersection – time-optimised: 0.03 seconds
  • KMinHash intersection – mem-optimised: 0.04 seconds
  • KMinHash intersection – scripting: 0.02 seconds

Note that the times in the profiled runs don’t add up exactly to the measurements using the ‘time’ command. I suspect this could be due to the profiler overhead.

From the above, we can draw the following conclusions:

  • As expected, HLL performance is the best among all approaches in terms of time measures.
  • The best performance among KMinHash approaches is from the time-optimised approach, followed by the Lua scripting approach and finally by the mem-optimised approach. This is as expected.
  • The time-optimised approach is slower than the HLL approach by about 18%. In comparison, the slowest KMinHash approach (mem-optimised) is almost 100% slower.
  • The time difference for intersection computation is not significant to consider and hence additions is what should be considered for selecting an approach.

Memory comparison

In terms of memory, HLL is a very efficient data structure compared to sorted sets. There are probably parameters that can be tuned for optimising set memory as well, but these will likely cause some increased load on processing time. I did not consider this in my tests.

One thing to note is that the memory is bounded in both cases after the addition of all elements: 12KB for HLL, memory for max ‘k’ elements in KMinHash. The length of the objects used can be determined using the redis command DEBUG OBJECT <key-name>. The results after adding elements from a representative dataset to both HLL and KMinHash keys are as follows:

  • HLL key 175000 size: Serialized length 10491 bytes
  • HLL Key 10000 size: Serialized length 8526 bytes
  • KMinHash key 1 size: Serialized length 187530 bytes
  • KMinHash key 2 size: Serialised length 179048 bytes

One other thing that is relevant is how the memory grows as elements are added to the sets, as these spikes can cause pressure on Redis server memory when there are a lot of such sets. Referring to the various approaches for KMinHash, we see that the time-optimised approach can add more than ‘k’ elements as they are added in batches. To measure this, I ran the redis command INFO periodically and monitored the used_memory_peak_human value as the add phase was in progress. The results are as follows:

  • KMinHash – time-optimised: 67.2 MB
  • KMinHash – mem-optimised: 62.2 MB
  • KMinHash – scripting: 62.1 MB

With the Lua scripting technique, there is also an increase in the Lua memory used by Redis (used_memory_lua) to about 50176 bytes compared to the default of 36864 bytes.

My first implementation of the time-optimised technique adjusted the cardinality of the KMinHash set to ‘k’ only when the intersection cardinality was computed (sort of a lazy approach), instead of adjusting it after every batch addition. With this approach, the used_memory_peak_human value rose as high as 125.86 MB.

From the above, we can draw the following conclusions:

  • Memory used by KMinHash is an order of magnitude more than that used by HLL.
  • The mem-optimised approach is only marginally better in used_memory_peak compared to the time-optimised approach.
  • A lazy time-optimised approach that clears memory only at the end of the add phase does significantly increase memory consumption – almost 100% more than the optimised cases.

Conclusion

HLL is superior to KMinHash based implementations by a reasonable margin from a performance perspective, which is expected given that it is a highly optimised implementation in the Redis server. However, for the accuracy gains of KMinHash, the penalty doesn’t seem too high. Given that the time-optimised approach gives the best time performance, with only marginally weaker memory performance, it is possibly the best implementation overall for KMinHash. So, it could well be something that is implemented along side HLLs to provide an efficient and accurate unique value counting solution in a BigData analytics system.

While I have tried to optimise the code as much as I could, I might not have got everything completely right, as my Redis knowledge isn’t too high. If anyone has suggestions to improve this implementation, or alternate ideas, I request readers to please post those in comments for the benefit of all.

facebooktwittergoogle_plusredditpinterestlinkedinmail

Counting unique items fast – Better intersections with MinHash

This is the third post in a series that is exploring sketching techniques to count unique items. In the first post, I explored the HyperLogLog (HLL) data structure and its implementation in Redis. In the second post, I expanded on the topic of unions and intersections of sets using HyperLogLogs.

Regarding intersections, I showed how the Inclusion/Exclusion principle could be used to compute intersection cardinalities. However, in studies conducted by others, the method has been found to produce inaccurate results in some conditions.

In this post, I explore a different sketching technique that claims to improve the accuracy of these results. I also provide the results of tests I did comparing the accuracy of the two methods. Since I will be using terminology introduced in the last post, I request readers to familiarise themselves with those first before continuing here.

Intersection counts using the MinHash sketch

In my research to solve the problem of improving accuracy of intersection cardinalities, I found an effort by AdRoll who introduced a different approach to solve this problem using a new sketching method called k-MinHash.

The MinHash (MH) sketch is a way to estimate a quantity called the Jaccard coefficient of sets, which measures how similar two sets are. Mathematically, given two sets A and B, Jaccard coefficient is defined as | A n B | / | A u B |.  So, MH(A, B) is approximately | A n B | / | A u B |. Hence,

| A n B | is approximately equal to MH(A, B) x | A u B |.

Note that we can compute | A u B | by merging individual HLLs, as unions are lossless.

The MinHash sketch

I found this blog to be an excellent introduction to MinHash, including a proof of how it approximates to the Jaccard coefficient. The MinHash sketch involves computing the hash of every element in the set, and maintaining k elements which have the smallest hashes from which the Jaccard coefficient is derived. Since k will be very small compared to the set’s cardinality in our cases, this is also a space-efficient sketching technique.

Understanding MinHash

The intuitive understanding for MinHash is as follows, summarised from the blog above:

  • Define hmin(S) as the element with smallest hash in S.
  • Given two sets A and B, if hmin(A) = hmin(B) (say, an element ‘x’), then it can be proved that x = hmin(A u B) and x is in A n B. This can be proved by contradiction. Say x is not in hmin(A u B), there must be an element ‘y’ = hmin(A u B) that has a smaller hash than x, but that element should have been either hmin(A) or hmin(B). Hence, by contradiction, x = hmin(A u B). Now, since hmin(A) = hmin(B), assuming a good hash function, x is in A & B, i.e. x is in A n B.
  • If h is a good random hashing function, x can be assumed to be a random element in A u B.
  • Probability(hmin(A) = hmin(B)) = Probability of a random element of A u B that is also present in A n B. The latter quantity is | A n B | / | A u B |, which is the Jaccard coefficient. So we see that the probability of an element with the smallest hash being in two sets can be linked to the Jaccard Coefficient.

To make this argument stronger and avoid a freak hashing function accident, we look for not 1, but k min values in the MinHash set that could belong to both A and B. I find this approach somewhat similar to how we used stochastic averaging in the HLL case. There seem to be two ways of getting k values. The first is to have k different hash functions and use the same element in the set. However, since it is going to be difficult to find so many good hashing functions, we instead use the same hash function and compare not 1 but k different elements in the two MinHash sets. The blog above describes how the proof can be extended to k values, and the way to compute the probabilities in such a case. I leave that out of this blog and the interested readers can get it from there.

Although the blog mentions a value of k as small as 400, in my experiments I have found good results only with k being 4096 or 8192 (which are closer to the numbers mentioned in the AdRoll blog).

MinHash Algorithm

The algorithm for computing MinHash is as follows:

Given 2 sets A & B, and a fixed ‘k':

  • Define a hash function that maps A & B elements to hashed integer values.
  • Define hmin(S, k) = elements of S with smallest k hashes.
  • As elements are seen from A and B, maintain hmin(A, k) and hmin(B, k) by keeping the k smallest hashes in each set.
  • Compute hmin(hmin(A, k) u hmin(B, k), k). This is the same as hmin(A u B, k). This can be deduced using similar logic to the proof shown above when k=1. Let hmin(A u B, k) = X
  • Compute X n hmin(A, k) n hmin(B, k) = Y. These are elements with smallest hashes that belong to A u B and A n B.
  • Jaccard coefficient (A, B) = approximately | Y | / k

Implementing the MinHash sketch using Redis

To implement the MinHash algorithm, we can use a state store that:

  • can store a set of the actual items and their hash values  together
  • is able to sort these items on the hash values so as to maintain the smallest k values
  • can intersect and merge these sets

As it happens, Redis has a very suitable data structure that supports these operations – the sorted set. Each MinHash structure can be a sorted set with the item as the member and the hash as its score on which Redis sorts and maintains the order of the set. This feels ideal because given the high throughput, low latency characteristics of Redis, we can use it as a shared state store and manage the MinHash sets as part of a streaming application.

Using a Redis sorted set, we can do the following:

  • Add the first k items along with their hashes to the set, using ZADD
  • From then, update the set by replacing the member with the highest rank with the incoming item provided the incoming item’s hash is smaller. ZRANGE and ZREM family of functions provide these capabilities.
  • We can use ZUNIONSTORE and ZINTERSTORE to get the required intermediate sets X and Y mentioned above.
  • ZCARD gets the cardinality of the sets required for computing the Jaccard coefficient.

Since my goal for the time being is to compare accuracy, I have not considered performance characteristics of this algorithm too much, and improvements could be possible.

Comparing Inclusion/Exclusion principle and MinHash for intersection accuracy

Test Setup

To evaluate how the two methods we have seen so far fare against each other in terms of accuracy, I used the same test design as what Neustar followed, that I spoke about in the last post, (although I have certainly not been as exhaustive as them). Specifically, the test parameters were driven by the two measures:

  • overlap(A, B) = | A n B | / | B |, where B is the smaller set.
  • cardinality_ratio(A, B) = | A | / | B |, where B is the smaller set.

I kept the value of | B | fixed and then varied | A | and | A n B |. Once the cardinalities of A, B and A n B were decided, I generated random numbers between 1 and 1B such that the required cardinality constraints were met. For each combination of A, B and A n B, I generated 100 such sets and ran the tests. The results were averaged over the 100 tests. Each run for one such combination of A, B and A n B did the following:

  • Added elements of A and B to respective HyperLogLog keys in Redis
  • Added elements of A and B to respective MinHash keys (backed by Redis sorted sets as described above)
  • Computed A n B using HLLs and the Inclusion / Exclusion method.
  • Computed A n B using KMinHash

The results for each run were logged, then compared for accuracy.

Remember that there were thresholds for overlap and cardinality within which Neustar found Inclusion/Exclusion to be satisfactory. I divided the test data into two categories – one which satisfied the thresholds (the good case) and one which didn’t (the bad case). For the HLL register size in Redis, the good case was when:

  • overlap(A, B) >= 0.05, AND
  • cardinality_ratio(A, B) < 20

For the good case, I fixed | B | as 10000, and created values for | A | and | A n B | that satisfied the above criteria. So for e.g. I took values of 0.05, 0.1, 0.2, 0.5, 0.75 for overlap. This gives the required | A n B | values. Similarly, I took values of 2, 5, 10 and 17.5 for cardinality ratio. This gives the required values for | A |.

For the bad case, I fixed | B | as 1000 (a smaller value). The values for B and | A n B | were created such that they violated either both the criteria or just the cardinality_ratio. I was motivated by a use case where one of the sets was very small compared to the other. Remember a use case where a publisher may be looking to see which people from a particular locality visit a particular web page. In this, the number of users visiting a popular web page would be very large, but the number of people in a locality could get very small.

Given | A |, | B | and | A n B |, I ran the tests for all combinations of these. In each case, I computed the % error of | A n B | when computed using both Inclusion / Exclusion and KMinHash.

Test Results

The results below are aggregated and averaged by cardinality ratio and overlap individually. For example, when averaged by cardinality ratio, it computes the average of error percentages across all chosen overlap values.

The following are the results from the runs with good threshold parameters:

good-results-overlap

good-results-cardinality-ratio

The following points can be made from the results:

  • The error % of intersection counts through both methods seem to be significantly higher than the erro % of HLLs itself, irrespective of the sketching method used. Not shown here, but observed in my tests is that the error % of the individual HLLs are always quite small – < 0.5%.
  • The accuracy of both KMinHash and Inclusion/Exclusion improves as the overlap increases (i.e. the intersection count is larger than any individual error terms) or when cardinality ratio decreases (i.e. the set sizes are comparable to each other, thereby one set doesn’t contribute a huge error term compared to the intersection count). This is as expected.
  • KMinHash performs better than Inclusion / Exclusion for almost all cases except where the overlap is quite high, when Inclusion / Exclusion seems to be marginally better.
  • KMinHash errors seem to be more linear in nature compared to Inclusion/Exclusion errors.

The following are the test results from runs with bad threshold parameters:

bad-results-overlap

bad-results-cardinality-ratio

Note: The scale for the y-axis is switched to log scale for showing the results clearly.

The following points can be made from the test results:

  • The error percentages for the pathological cases is quite bad for Inclusion / Exclusion (in the order of 1000s), whereas KMinHash seems to be performing quite a lot better.
  • Even otherwise, KMinHash is performing significantly better than Inclusion / Exclusion in most cases.
  • As seen for the good cases, with increasing overlap and decreasing cardinality ratio, the error percentages improve as usual.
  • The really bad performance of the Inclusion Exclusion method for low overlaps (very small A n B) when taken in context may not appear substantially bad.We are talking about intersection counts of 50 and less sometimes and it may not be too bad to just predict these as 0. However, what is interesting to note is the KMinHash is able to perform better even for such very small cases and predict good values.
  • Values of overlap higher than 0.05 are within threshold limits for the overlap, however values of the cardinality ratio are outside of the threshold limits. The results show that the error values are high even when one of the two measures is outside the threshold values.
  • Again, high overlap cases seem to perform well in Inclusion / Exclusion compared to KMinHash, although again, the difference is marginal.
  • Another point not shown here is that the standard deviation of the error percentages for KMinHash is much lesser than Inclusion / Exclusion method in all cases – thereby indicating more reliable performance.

Conclusion

In conclusion, we can say that getting intersection counts through sketching techniques do carry a reasonable error percentage that needs to be considered when exposing analytics using them. KMinHash is an effective sketching technique that gives more accurate results for intersection counts compared to the Inclusion/Exclusion method of HLLs. The method I have discussed here is probably not as space or time efficient as the HLL Inclusion/Exclusion method. So, as of now, it is a tradeoff between accuracy and performance characteristics that should be considered when picking an implementation. In a future post, I will try and discuss the performance characteristics of the KMinHash method in more detail.

facebooktwittergoogle_plusredditpinterestlinkedinmail

Counting unique items fast – Unions and Intersections

In the last blog, I covered HyperLogLog (HLL) in Redis and spoke about how it can enable counting cardinalities of very large sets very efficiently. I concluded the blog by broadening the scope to counting the results of set operations. In this blog, I will expand on this scope.

To recap, some of the use cases of counting the results of set operations are as follows:

  • Imagine we are maintaining Daily unique users in a set. Can I combine these sets to get weekly or monthly unique users? (akin to a rollup operation)
  • Imagine I have a set of users who have visited a specific web page. And another who are from a particular locality. Can I combine these two sets to see which users from that locality visited the web page? (akin to a slice operation)

The first one is a set union operation and the second an intersection operation. These are the two basic operations we can expect to perform given two sets. So, how does HLL fare with these?

Unions in HLL

Unions are said to be ‘loss-less’ in HLL. In other words, they work extremely well. So, to get the count of the union of two sets, we can ‘merge’ the HLL data structures representing the two sets to get a new HLL data structure and get the count of the result. The merge operation for two HLLs with equal number of buckets involves taking the maximum value of each pair of buckets and assigning that as the value of the same bucket in the resultant HLL.

To see the intuition behind this, remember that the HLL algorithm only maintains the maximum number of consecutive zeros seen in the hashes of the items for a given bucket. So, if two items are hashing to the same bucket, the one with the maximum number of zeros contributes the value to be stored in the bucket. Hence, our algorithm for merging HLLs described above will be equivalent to replaying the stream by merging the original items.

In Redis, there is explicit support for this operation using a command called PFMERGE

  • PFMERGE <result> <key1> <key2> … – Merges the HLLs stored at key1, key2, etc into result.

One can issue PFCOUNT <result> to get the cardinality of the union.

Some interesting points about unions:

  • The merge operation is associative – hence we can merge multiple keys together into one single result, as indeed the PFMERGE command allows in Redis.
  • The merge operation is parallelizable with respect to the buckets and hence can be implemented very fast.
  • The merge operation assumes that the number of buckets is fixed between the sets being merged. When sizes are different, there is a way to ‘fold’ the HLL data structures with larger number of buckets into a HLL with the smallest number of buckets. This is described well here. I suppose the scenario of different bucket sizes arises if we are using some library where we could create a HLL with a specified bucket size. Of course, this is not applicable for Redis.

Intersections in HLL

Intersections in HLL are not loss-less. Again seeking into our intuitive explanation for unions, imagine we replay a merged set where only common items in 2 sets are included. If we add the distinct elements of the sets into this HLL,  it is easy to see that the value of a bucket could be masked by a larger distinct value present in either of the original sets.

One possible way to workaround this limitation is to explore if it will be ok to maintain another key to just manage the intersection. For example, to satisfy the use case above, we could maintain one HLL for users who visited the web page, say users:<page>, and another for users from every locality, like users:<page>:<locality>. A stream processing framework will update both keys for an incoming event.

The good parts about this approach are:

  • Number of updates will be bounded by the number of combinations of dimensions we want to count for. This can be done in a streaming manner for a small number of combinations.
  • Reads of intersection counts will be fast and accurate too.

The issues with this approach are:

  • It is easy to see that this can become a combinatorial nightmare with many different combinations of dimensions to maintain.
  • Each intersection key would be more storage space, and hence causes more load, particularly for in-memory systems like Redis.
  • The method would only work if all the dimensional information came in the same event. For e.g. if we got information about users visiting pages from one source and user-locality information from another, there would no way of updating the intersection key in a straightforward manner without doing a join.

Intersection using the Inclusion / Exclusion Principle

This is yet another approach talked about for computing intersections with HLLs. Dig into the set algebra, Venn diagrams and other such topics in your child’s math textbooks and you’ll find the Inclusion/Exclusion principle there. It just says:

| A u B | = | A | + | B | – | A n B |  – where | A | means cardinality of set A.

So, can we say | A n B | = HLL(A) + HLL(B) – HLL(AuB)?

Folks at Neustar conducted experiments on this method and were kind enough to publish the results in a very detailed blog post. Again, recommended reading – for understanding how to frame an experiment for answering such a question.

To paraphrase the results, they defined two measures of sets:

  • overlap(A, B) = | A n B | / | B |, where B is the smaller set.
  • cardinality_ratio(A, B) = | A | / | B |, where B is the smaller set.

Using these measures, they formulated some empirical rules that determine when this method gave somewhat acceptable results. The results are within a reasonable error range when:

  • overlap(A, B) >= overlap_cutoff, AND
  • cardinality_ratio(A, B) < cardinality_ratio_cutoff

If one of these conditions is violated, they found the error % of the intersection could be high in the order of thousands. In their results, the cutoffs are given based on the bucket size. For the Redis bucket size (16384), this becomes:

  • overlap(A, B) >= 0.05
  • cardinality_ratio(A, B) < 20

I reasoned the intuition behind these results from this diagram based off the Neustar blog (hopefully, I am not too far off):

hll-blog-2

A high error value of | A n B | results from both the sets contributing to the error almost equally or one of them dominating. In the case where both sets contribute the error almost equally, this can happen if the true value of intersection itself is very small compared to the size of the sets – measured by the overlap factor. In the cases where one of the sets contributes largely to the error, this can happen when the other set is very small compared to the size of the larger set – measured by the cardinality ratio.

Cases like these can certainly happen in real world. For e.g. our own use case of people who have visited a web page can be very large compared to the people who live in a certain locality – if the locality is very small.

So, to summarise, using the Inclusion / Exclusion principle allows us to compute an estimate for the intersection cardinality with no additional state or storage. It can also be done reasonably fast due to the low latency of APIs like PFCOUNT and PFMERGE. However, for certain commonly occurring use cases, its error ratios are likely to be unusably large.

In the next post, I will explore an alternative approach to intersections using a different sketching algorithm that is said to provide better results. I will also discuss results of my own tests comparing the accuracy of the two algorithms.

facebooktwittergoogle_plusredditpinterestlinkedinmail

Counting unique items fast

Analytics systems count. Trivial as this may sound, implementing one is far from easy. Indeed, Nathan Marz, creator of Apache Storm, tweeted thus:
90% of analytics startups: 1. Find something new to count that no one else is counting 2. Raise $10M

In this blog post and the next, I will try and summarise a couple of ways I have learnt to do this efficiently at scale for the specific use case of counting unique items.

Consider a standard use case for several ad-tech companies – counting the number of unique devices they get to see, commonly referred to as audience count. Or a web analytics use case – counting number of users who have seen a particular web page. One can also slice and dice these counts to provide more context. For e.g. how many unique users are viewing an article from browser vs mobile app and so on. Typically, these counts get collected from events captured by an analytics system. They then get exposed by analytics products for their customers along with context, trends, etc to form a basis for decision making. A recent example that illustrates this well is the Parse.ly blog on their new Analytics features for publishers.

Technically, the difficulty in counting uniques as opposed to counting occurrences in events is that while the latter is a simple counter, the former is, at the core, a set cardinality operation. A user visiting a web page twice should be counted as only one user, as opposed to two views. To get the set cardinality, one needs to manage the set. At today’s BigData scale:

  • Sets are getting larger. Hundreds of millions of unique users and upwards does not raise eyebrows that much anymore.
  • Sets are updated very fast. Sites are processing several thousands and upwards of queries per second each of which need to update the set.
  • Users are demanding more. They expect to see the updates to these counts as quickly as possible.

Technical Approaches

In technical terms, the unique counts exposed by these analytics can be treated as views of the events that drive these analytics. Data warehousing or the batch mode BigData processing solutions powered by frameworks like Hadoop have traditionally separated the collection of event streams and the generation of these views. More contemporary approaches have been proposing a change to this approach, in which the processing of the event streams results in the creation of these ‘materialised views’ directly.

At the core of these more recent approaches are streaming solutions and techniques that operate on incoming event streams and update state for views that expose this information in near real time. There are established stream processing engines that provide the framework and API to consume large event streams in a scalable fashion – such as Apache Storm, Apache Spark, Apache Samza and more recently Apache Flink. Generating the view though, is still a solution the developers need to solve themselves. And their solution needs to still meet all requirements the stream processing frameworks themselves meet.

In order to solve our uniques problem, we need to maintain a set of ’n’ identities, where n is very large and is updated very fast. Also, counting of these n identities needs to happen with very low (sub second) latencies. It is easy to see that for large ’n’s the time and/or space complexity of solving this problem conventionally is going to be large.

HyperLogLog

One novel approach to address these constraints has been available for a good amount of time, albeit it has not been very well known. This is the concept of maintaining sketches. Informally, a ‘sketch’ is a data structure that summarises large volumes of data into very small amounts of space so as to provide approximate answers to queries about the data with extremely low latency and well-defined error percentages. There are several different sketches, and one set of them deal with counting unique or distinct values.

The specific one I discuss here is a sketch called HyperLogLog (HLL). There are several great articles on the web that describe a HLL. The one I found most intuitive to follow from a layman perspective was from Neustar, previously AggregateKnowledge.

Here is my attempt to summarise and para-phrase the black magic of HLL (although I strongly recommend reading the original article):

  • Say we have a good hash function that converts the item we want to add to a set, into a binary bit stream.
  • By counting the number of consecutive zero bits in the hash, we can *estimate* the size of the set. The intuition mentioned in the Neustar link above is that counting a consecutive stream of zeros is somewhat like counting the number of consecutive heads we get when tossing a coin. The larger the number of heads, the more number of times we can guess we have tossed the coin.
  • We improve the estimate using a procedure called Stochastic averaging, in which we maintain not one, but many such estimates and take a harmonic mean of these. In order to maintain multiple estimates, we split the hash into two parts: a prefix that indexes into a bucket to hold an estimate and the suffix that is used to count the consecutive zero bits.
  • There are also some corrections to make the estimates more accurate in cases where the buckets are too empty or too full.

HyperLogLog in Redis

There are several libraries and systems that implement a HLL algorithm (so we are spared from having to implement one ourselves). However, the one I have used is an implementation in the awesome in-memory data structure server – Redis. The blog on the Redis implementation of HyperLogLog is a classic in itself and is also a highly recommended read. The level of thought and work that went into an efficient implementation of HLL in Redis is great learning. In the blog, the Redis HLL standard error has been mentioned as 0.81%. I have actually seen lower in my tests.

Redis exposes the following APIs for manipulating the HLL set:

  • PFADD <key> <item> – Adds item to HLL represented by the key
  • PFCOUNT <key> – gives the estimate of the number of items added to the key.

The PF prefix stands for Philippe Flajolet who is credited with a lot of work on HLL.

Concerning the internals of HLLs in Redis, the following points are interesting:

  • The length of the prefix of the binary hash in Redis is 14 bits. That means, it uses 16384 buckets.
  • The hash is 64-bit, hence the remaining 50 bits is where we look for the consecutive zero bits. This means we can represent the value of the counter using at most 6 bits (2^6 = 64 > 50). Hence the total space required for a HLL is bounded by 16384 x 6 bits = 96 Kbits = 12 KB, which is amazingly small for storing a very large cardinality number. (Note that it is possible to lay out a bit array and index into the specific offset in this array representing a bucket)
  • Although 12KB is the maximum amount of memory required for a HLL, for smaller sets, the sizes are much smaller. Specifically, like with many other things in Redis, the underlying data structure has a dual representation, a memory efficient one for smaller sets (called ’sparse’ representation) and the 12 KB one for larger sets (‘dense’ representation). This is particularly useful if you need to store lots of HLLs with low cardinality.
    • You can use the command PFDEBUG ENCODING <key> to see what representation Redis is currently using for the key.
    • The switch from sparse to dense encoding is controlled via a configuration parameter – hll-sparse-max-bytes – default 3000 bytes. The Redis documentation has more details on how to tune this parameter.

The Good Parts with HLLs in Redis

There are some obvious benefits we can see with HLLs in Redis:

  • The 12KB bounded size for a practically unbounded set (read billions of items) is extremely memory efficient.
  • The operation PFCOUNT is fast enough for real time queries. Reading directly from this for front end dashboards is totally possible.

There are some subtler benefits, too:

  • The operation PFADD is quite fast too, as can be expected from the low latency high throughput performance of Redis, in general. This means that updates to the set represented by the HLL can happen in a streaming fashion. I have built data pipelines using Storm that add IDs to Redis HLL keys and operate with sub second latencies (doing a lot of other work too)
  • Since Redis is single threaded, adding the same element to a HLL from different threads works correctly.
  • Adding an element to a HLL is idempotent. Hence, when your stream processing framework follows at least once semantics and replays can cause duplicate execution of the PFADD commands, we do not need to worry about consistency.
  • The value of a Redis HLL is an encoded String. Hence, it is possible to retrieve or dump the value as a set of bytes and load it into a different Redis server to get identical results. One can imagine that this would be staggeringly fast compared to having to re-add a billion items to another server.

Set Operations in HLLs

As mentioned above, HLLs are sketches for sets. When modelled like this, one could naturally think if set operations are possible. From a use case perspective, this certainly makes sense.

  • Imagine we are maintaining Daily unique users in a set. Can I combine these sets to get weekly or monthly unique users? (akin to a rollup operation)
  • Imagine I have a set of users who have visited a specific web page. And another who are from a particular locality. Can I combine these two sets to see which users from that locality visited the web page? (akin to a slice operation)

In set theoretic terms, the first of these would be a union of existing HLLs, while the second is an intersection. It turns out that unions of HLLs is possible, but intersections need more work. I will explore these operations in a following blog post.

facebooktwittergoogle_plusredditpinterestlinkedinmail

Taking memory dumps of Hadoop tasks

This post originally appeared in my older blog. I am carrying over a few of those posts which seem to be popular.


 

On the Hadoop users mailing list, I was recently working with a user on a recurring problem faced by users of Hadoop. The user’s tasks were running out of memory and he wanted to know why. The typical approach to debug this problem is to use a diagnostic tool like jmap, or hook up a profiler to the running task. However, for most Hadoop users, this is not feasible. In real applications, the tasks run on a cluster to which users do not have login access and they cannot debug the task as it runs. I was aware of an option that is provided by the JVM – -XX:+HeapDumpOnOutOfMemoryError that can be passed to the task’s Java command line. This option makes the JVM to take a dump when the task runs out of memory. However, the dump is saved by default to the current working directory of the task, which in Hadoop’s case is a temporary scratch space managed by the framework and it would become inaccessible once the task completes.

At this point, when we were pretty much giving up on options, Koji Noguchi, an ex-colleague of mine and a brilliant systems engineer and Hadoop debugger, responded with a way out. You can read about it here. The few lines mentioned feel almost like magic, so I thought I would write an explanation about how it is working, for people who are interested.

The requirement is to save the dump to a location which is accessible by the user running the task. Such a location, in Hadoop’s case, is … HDFS. So, what we want is to be able to save the dump to HDFS when the scenario happens.

To get there, the first observation is that the JVM offers hooks that can be passed to the task’s command line, for us to act on an OutOfMemory scenario. The options Koji used are:

These options instruct the JVM to take a dump on OutOfMemory, save it with a name of our choice, and importantly, run a custom script . It should be obvious now that the custom script can copy the generated dump to DFS, using regular DFS commands.

There are a couple of gaps to plug though – as the devil is in the details. How does the script (copy-dump.sh, in the example above) get to the cluster nodes ? For that, we use Hadoop’s feature – Distributed Cache. This feature allows arbitrary files to be packaged and distributed to cluster nodes where tasks require them – using HDFS as an intermediate store. The cached files are usually available to tasks via a Java API. However, since we need this outside the task’s context, we use a powerful option of the Distributed Cache – creating symlinks. This option not only makes the files available to the tasks via the API, but also creates a symbolic link of that file into the current working directory of the task. Hence, when we refer to the script in the task’s command line, we can refer to it relative to the current working directory of the task, i.e. ‘.’.

The specific options to set up all of the above are as follows:

The last detail in the whole solution is about how to save the dump on HDFS with a name that is unique. Because, realize that multiple tasks are running together at the same time, and more than one of them could run out of memory. Koji’s scripting brilliance for solving this problem was to use the following script:

The expression ${PWD//\//_} takes the current working directory of the task (from the environment), and replaces every occurrence of ‘/’ with an ‘_’. Nice !

So, using these options, features and diagnostics of Hadoop and the JVM, users can now get memory dumps of their tasks to locations that they can easily access and analyse. Thanks a lot to Koji for sharing this technique, and happy debugging !!!

 

 

facebooktwittergoogle_plusredditpinterestlinkedinmail

Using native Hadoop shell and UI on Amazon EMR

This post is carried over from my earlier blog site here. I am migrating posts that seem to have gathered most hits there.


Amazon’s Elastic MapReduce (EMR) is a popular Hadoop on the cloud service. Using EMR, users can provision a Hadoop cluster on Amazon AWS resources and run jobs on them. EMR defines an abstraction called the ‘jobflow’ to submit jobs to a provisioned cluster. A jobflow contains a set of ‘steps’. Each step runs a MapReduce job, a Hive script, a shell executable, and so on. Users can track the status of the jobs from the Amazon EMR console.

Users who have used a static Hadoop cluster are used to the Hadoop CLI for submitting jobs and also viewing the Hadoop JobTracker and NameNode user interfaces for tracking activity on the cluster. It is possible to use EMR in this mode, and is documented in the extensive EMR documentation. This blog collates the information for using these interfaces into one place for such a usage mode, along with some experience notes. The examples given here have been tested on OS X MountainLion. The details will vary for other operating systems, but should be similar.

When a cluster is provisioned, a node called the ‘master’ node is created on the EMR cluster, that runs the JobTracker and the NameNode. In short, the mechanism to access the Hadoop CLI is to ssh into the master node and use the installed Hadoop software. Likewise, for accessing the UI, an SSH tunnel needs to be set up to the web interfaces that also run on the master node.

Before we start

  • This blog assumes you already have signed up for an Amazon account.
  • Next, you need the Amazon EMR CLI. The CLI is a useful way to access Amazon EMR that provides a good balance between completeness of functionality and ease of use, compared to the Amazon EMR console.

Note: The EMR CLI is a Ruby application requiring Ruby 1.8.x. If you have installed Ruby 1.9 on your system, you have two options:

    • Use RVM, install Ruby 1.8.x. via RVM and switch the Ruby version. 
    • Alternatively, you can use a fork of the Amazon CLI that works with Ruby 1.9 called elastic-mapreduce-ruby.
  • Configure the EMR client by creating a credentials.json file. The file should be created in the home directory of your elastic-mapreduce client. A sample file’s contents is mentioned here:

  • In the example above, access_id and private_key are security credentials connected with your AWS account. The keypair is a resource associated to your Amazon EC2 service. The private key details of the keypair can be downloaded to a PEM file locally, whose path is specified in key-pair-file. You can refer to this link for details on how to get these various security parameters.

Launching a cluster

Once set up, we are now ready to launch an EMR cluster and access it to submit Hadoop jobs.

Typical deployments of a Hadoop cluster comprise of three types of nodes – the masters (JobTracker and NameNode), the slaves (TaskTrackers and DataNodes) and client nodes (typically called access nodes or gateways) from where users submit jobs. An EMR cluster too has three types of nodes – a single master node (that runs both the JobTracker and NameNode), core nodes (that run both TaskTrackers and DataNodes) and task nodes (that run only the TaskTrackers). As you can see, the categories of nodes are slightly different. But we could double up the master node in EMR to be a client node as well. Since the master node in an EMR cluster is used only by you, it is assumed it will have enough capacity to run the Hadoop CLI.

Start an EMR cluster with the CLI in this fashion:

The ‘elastic-mapreduce’ program will be in the home directory of the EMR client. The above command creates a cluster with 3 instances – 1 master and 2 slaves. The –alive flag ensures that the launched cluster stays alive until it is manually terminated by the user. This option is required to login into the master node and submit Hadoop jobs to the cluster directly. The output of this command will be a ‘jobflow’ ID – something that will look like j-FTW6FLQ1P2G0. Make a note of this ID, as we will use it in other commands below.

Note: You will be charged according to the EMR rates for your usage of the cluster, depending on the type and number of instances chosen.

You can also get more details about the launched cluster using the following command:

Of specific interest in the output of the above command will be the “State” attribute of the ExecutionStatusDetail node. You need to wait until the value of this attribute becomes “WAITING”. Once this state is reached, your cluster is ready for further action. Similarly, the “MasterPublicDnsName” attribute gives the DNS name of the machine running the Hadoop JobTacker and NameNode.

Browsing Hadoop web UI on the cluster

At this point, it will be useful to check out how the cluster looks like from the familiar JobTracker and NameNode web UI.

The JobTracker web server runs on port 9100 on the master node. In order to browse this UI, you can set up an SSH tunnel that will work as a SOCKS proxy server from your machine to the master node that dynamically forwards HTTP requests. The Amazon CLI provides a command to set up this proxy:

Note: This command helpfully prints out the SSH command that is invoked to set up the SOCKS proxy server. When facing problems with connectivity, I have been able to take this SSH command, modify it (for instance to add the -v option to ssh for enabling debug output) and run it directly for debugging / resolving issues.

After the socks proxy server is started, you can configure your browser’s or system’s proxy settings to use this SOCKS proxy for HTTP traffic. For Chrome, I did this by launching Settings > Change Proxy Settings > Select “SOCKS Proxy” > Enter the server address as 127.0.0.1 and the port as 8157.

socks-proxy

If things are fine here, you should be able to hit the URL http://<MasterPublicDnsName>:9100/ on the browser and see the JobTracker UI page and http://<MasterPublicDnsName>:9101/ for the NameNode UI.

The method described here routes all HTTP traffic from your browser through the tunnel. A better option would be to set up a rule that routes only traffic to the EMR clusters through the tunnel. Such a method is described in the EMR documentation, using the FoxyProxy Firefox browser plugin.

Submitting Jobs

The next step is to submit jobs to the EMR cluster using the Hadoop CLI. As described above, the master node doubles up as a Hadoop client node as well. So, we should SSH into the master node using the following command:

This command should drop you into the home directory of the ‘hadoop’ user on the master node. A quick listing of the home directory will show a full Hadoop installation, including bin and conf directories and all the jars that are part of the Hadoop distribution.

You can execute your Hadoop commands as usual from here. For e.g.

 

Using the UI that we set up in the previous step, you can also browse the job pages as usual.

Terminating the cluster

Once you are done with the usage, you must remember to terminate the cluster, as otherwise you will continue to accrue cost on an hourly basis irrespective of usage. The command to terminate is:

Note: Terminating the cluster will result in loss of data stored in the HDFS of the cluster. If you need to store data permanently, you can do so by providing Amazon S3 paths as output directories of your jobs.

In conclusion, tools like the elastic mapreduce CLI make it easy to not only provision a Hadoop cluster and run jobs on it, but also to provide the familiar look and feel of the Hadoop client and the Web UI.

 

facebooktwittergoogle_plusredditpinterestlinkedinmail

Comparing Apache Tez and Microsoft Dryad

This post is carried over from my earlier blog site here. I am migrating posts that seem to have gathered most hits there.


Hortonworks has been blogging about a framework called Tez, a general purpose data processing framework. Reading through the posts, I was reminded of a similar framework that had come from Microsoft Research a while back called Dryad. This blog post is an attempt at comparing them.

In order to structure the comparison, I am trying to express the points under the following topics: historical perspective, features, concepts, and architecture.

Historical Perspective

Both Tez and Dryad define distributed, data parallel computing frameworks that lay an emphasis on modelling data flow. A data processing ‘job’ in either is defined as a graph. The vertices of the graph represent computational processes, with the edges connecting them describing input they receive and output they send out from / to other computational vertices or data sources / sinks. Both systems attempt to provide an efficient execution environment for running these jobs, abstracting users away from needing to handle common distributed computing requirements such as communication, fault tolerance, etc.

At the time of its introduction, Dryad was possibly Microsoft’s view on how to build such a framework from ground up. In contrast to Hadoop, Dryad attempted even then to provide a framework that wasn’t restricted to just one model (MapReduce) of computation. Dryad was inspired by a variety of data processing systems including MPP databases, data parallel programs on GPUs, and MapReduce as well. It attempted to build a system that could express all these kinds of computation.

Tez was introduced as a generalisation of the MapReduce paradigm that had dominated Hadoop computation for several years. However, it seems to be inspired more by data flow frameworks like Dryad. It was enabled immensely by the separation of concerns brought to the Hadoop MapReduce layer in the form of Apache YARN, that separated cluster resource management from distributed job management, enabling more models than just MapReduce. A direct motivation for Tez was the Stinger initiative, launched to build a faster version of Apache Hive. Specifically, the idea was to enable expressing a HQL query as a single Tez job, rather than multiple MapReduce jobs, thereby avoiding the overhead of launching multiple jobs and also incurring the I/O overhead of having to store data between jobs on HDFS.

Features

Tez and Dryad share several features, such as:

  • The DAG model being the specification choice for a job
  • A flexible / pluggable system where the framework tries to give the user control of the computation, nature of input and output, etc.
  • Supporting multiple inputs and outputs for a vertex (that enable SQL like joins to be expressed, and various forms of data partitioning like the shuffle sort phase of Hadoop MapReduce)
  • An ability to modify the DAG at runtime based on feedback from executing part of the graph. The runtime modification is primarily used for improving the efficiency of execution in both systems. For e.g. in Dryad, this was used to introduce intermediate aggregator nodes (akin to the combiner concept in Hadoop MapReduce), while in Tez, this is being used as a way to optimise the number of reducers or when they would get launched.

Dryad was built from ground up without a supporting resource management or scheduling framework, and some of its ‘features’ are present in or shared by other layers of the Hadoop stack like YARN. In addition to those, Dryad allowed one specific optimisation through which processing nodes can execute concurrently, co-located and connected via  shared memory or pipes.

Tez on its hand, expands on learnings from the Hadoop MapReduce framework. For example, it expands on a feature available with MapReduce called JVM reuse, whereby ‘containers’ launched to run the vertex programs of Tez can be reused for multiple Tez tasks. It even allows sharing data between these tasks via an ‘Object Registry‘ without needing to have them run concurrently.

Concepts

Naturally, the core concepts of a Graph are common between the systems.

In Tez:

  • A vertex is defined by the input, output and processor associated with it.
  • The logical and physical manifestations of a graph are explicitly separated. Specifically, the inputs and outputs are of two types – a physical type and a logical type. The logical type describes the connection between a vertex pair as per the DAG definition, while the physical type will represent the connection between a vertex pair at runtime. The Tez framework automatically determines the number of physical instances of a vertex in a logical graph.
  • Edges are augmented with properties that relate to data movement (for e.g. multicast output between connected vertices), scheduling (co-schedule, or in sequence) and data source (persistence guarantees on the vertex’s output). Tez expects that by using a combination of these properties, one can replicate existing patterns of computation like MapReduce.
  • In addition to the graph concepts, there is also the concept of an ‘event’. Events are a means for the vertices and the framework to communicate amongst themselves. Events can be used to handle failures, learn about the runtime characteristics of the data or processing, or indicate the availability of data.

In Dryad:

  • Inputs and outputs are considered vertices just like processing vertices.
  • Dryad represents the logical representation of the DAG as a set of ‘stages’. However, this does not seem to be a first class concept to specify the DAG at definition time. Specifically, Dryad expects the specific number of instances of  a vertex at runtime to be defined at definition time.
  • A lot of operators are defined which help to build a graph. For instance:
    • Cloning: is an operation by which a given Vertex is replicated. Such a cloning operation is used to define a physical manifestation of a graph.
    • Composition: is used to define types of data movement patterns (akin to the edge property in Tez)  like round robin data transfer, scatter-gather etc.
    • Merge: is used for defining operations like fork/join etc.
    • Encapsulation: is a way of collapsing a graph into a single vertex, which makes it execute on a single node – used to express concurrent, co-located execution.
  • It appears the idea behind the operators is again to try and define patterns of computation like MapReduce.
  • A ‘channel’ is an abstraction of how data is transferred along an edge. There is support for different types of channels like File, Shared Memory, Pipes etc. This is similar to the physical Input/Output types in Tez.

Architecture

Tez is a YARN application. A Tez job is coordinated by the Tez Application Master (AM). It is comprised of Tez tasks. Each task encapsulates a processor (vertex) of the DAG and all inputs and outputs connected to it. A Tez task is launched within a YARN container. However, in the interest of providing good performance, a single YARN container could be reused for multiple Tez tasks. This is managed by a ‘TezTask’ host. The host also manages a store of objects that can be shared between Tez tasks that run within the host.

The Tez Application Master has a Vertex Manager plugin (that can be customised by the developer) for every type of Vertex. In addition, the AM also maintains a Vertex State Machine. As the state of the DAG changes, the Vertex Manager is invoked by the Application Master, who can then act on the State machine to customise the graph execution.

Another point to note is that Tez relies on YARN’s resource manager and scheduler for initial assignment of containers, etc. However, it has the ability to make the scheduling a two level activity. For example, Tez does come with scheduling capabilities, which it uses for features like container reuse.

Dryad’s architecture includes components that do resource management as well as the job management. A Dryad job is coordinated by a component called the Job Manager. Tasks of a job are executed on cluster machines by a Daemon process. Communication with the tasks from the job manager happens through the Daemon, which acts like a proxy. In Dryad, the scheduling decisions are local to an instance of the Dryad Job Manager – i.e. it is decentralised.

The logical plan for a Dryad DAG results in each vertex being placed in a ‘Stage’. The stages are managed by a ‘Stage manager’ component that is part of the job manager, similar to the Vertex Manager in Tez. The Stage manager is used to detect state transitions and implement optimisations like Hadoop’s speculative execution.

Conclusion

Dryad was discontinued by Microsoft in late 2011. Microsoft has since been contributing to Hadoop. Given the similarities between the two systems, a question is about how Tez’s prospects are going to be different from Dryad. A few points that seem to favour Tez, IMO:

  • Tez rides on years of learning from Hadoop MapReduce and other systems including Dryad. Microsoft recently posted that it contributes to Tez. The expectation then would be that the insights and learnings from systems (including what did not work) will help build a better system.
  • The separation of concerns brought about by YARN potentially helps Tez to focus on problems specific to the graph processing model, while delegating resource management and scheduling decisions to another layer – at least partially.
  • The API for Graph construction in Tez appears a lot simpler and intuitive to understand than the corresponding one in Dryad. Hence, it seems easier to adopt the model from a programmer perspective.
  • Given Tez was launched with a specific initiative of making Hive faster, there is a goal it is working towards, and there seems to already be evidence that Tez is enabling improvements in Hive as shown here.

Personally, I feel it would be good to have Tez succeed and several people who have invested in Hive will be able to see huge improvements in performance from their existing applications.

Acknowledgements

Most of the information for this post has come from the publicly available knowledge in blog posts and published paper. If there is any omission or mis-representation, please do let me know !

An initial draft of this post was reviewed by a few committers at Hortonworks: Siddharth Seth, Bikas Saha, Hitesh Shah and Vinod Kumar Vavilapalli. I am thankful to them for their feedback. While I have incorporated some of it, I felt some others are best explained from their end, possibly as comments. I will notify them once the blog is published.

Specifically calling out two points:

  • Both Sid and Hitesh have called out that there are going to be additional changes to the architecture and features in Tez that will soon be published. As this blog was being written, a new post came out from Hortonworks mentioning a new concept called Tez Sessions. So, be sure to watch out for Hortonworks blogs on Tez for more information.
  • Bikas provided feedback about Tez’s motivation being closer not just to systems like Dryad, but also other data flow systems like Hyracks and Nephele. It may be a good academic exercise to see these other systems as well from a perspective of learning.
facebooktwittergoogle_plusredditpinterestlinkedinmail