Transcript Summingbird

CSCI5570 Large Scale Data
Processing Systems
Distributed Stream Processing Systems
James Cheng
CSE, CUHK
Slide Ack.: modified based on the slides from Oscar Boykin and Sam Ritchie
Summingbird: A Framework for Integrating Batch
and Online MapReduce Computations
Oscar Boykin, Sam Ritchie, Ian O’Connell, and Jimmy Lin
Twitter, Inc.
VLDB 2014
Scene: Internet company in Silicon
Valley
• Standard data science task (circa 2010):
• What have people been clicking on?
• This question can be simply answered by a MapReduce job
3
Scene: Internet company in Silicon
Valley
• Standard data science task (circa 2013):
• What have people been clicking on right now?
• Lack of a standardized online processing framework that can supports
both batch processing and stream processing
• Having to write everything twice (one for Hadoop on historical data
and another for Storm on real time data)
4
Introduction
• Summingbird proposes a domain-specific language (in Scala)
to integrate batch and online MapReduce computations
• Hadoop for batch processing
• Storm for streaming processing
• Same code (instead of two) works in two scenarios
• Summingbird proposes probabilistic data structures in
streaming manner to give approximate results efficiently
5
Design Goal
• Primary goal is developer productivity. Optimizations can come
later…
• Scope of “the easy problems”
•
•
•
•
•
Counting
Min
Max
Mean
Set membership histograms
6
Batch and Online MapReduce
“map”
flatMap[T, U](fn:
T => List[U]): List[U]
map[T, U](fn: T => U): List[U]
filter[T](fn:T => Boolean): List[T]
“reduce”
sumByKey
7
Integration of batch and online
processing
• Algebraic structures provide the basis for seamless integration
of batch and online processing
• Semigroup = ( M , ⊕ )
⊕ : M × M → M, s.t., ∀m1, m2, m3 ∋ M
(m1 ⊕ m2) ⊕ m3 = m1 ⊕ (m2 ⊕ m3)
• Monoid = Semigroup + identity
ε s.t., ε ⊕ m = m ⊕ ε = m, ∀m ∋ M
• Commutative Monoid = Monoid + commutativity
• ∀m1, m2 ∋ M, m1 ⊕ m2 = m2 ⊕ m1
• Simplest example: integers with + (addition)
8
Integration of batch and online
processing
• Summingbird values must be at least semigroups (most are
commutative monoids in practice)
• Power of associativity = you can put the parentheses anywhere
and the results are exactly the same
( a ⊕b ⊕c ⊕d ⊕e ⊕f )
((((( a ⊕ b ) ⊕ c ) ⊕ d ) ⊕ e ) ⊕ f )
(( a ⊕ b ⊕ c ) ⊕ ( d ⊕ e ⊕ f ))
Batch = Hadoop
Online = Storm Mini-batches
9
Word Count
def wordCount[P <: Platform[P]](
source: Producer[P, Tweet], store:
P#Store[String, Long]) = source
.flatMap { tweet => tweet.getText.split("\\s+").map(_ -> 1L) }
.sumByKey(store)
Producer[P, Tweet]
“Just setting
up my twttr”
”Bears up in
my room”
(”just” -> 1)
(“setting” -> 1)
(“up” -> 1)
(my” -> 1)
(“twttr” -> 1)
(”Bears” -> 1)
(“up” -> 1)
(“in” -> 1)
(my” -> 1)
(“room” -> 1)
P#Store[String,
(“Bears” -> (1))
(”just” -> (1))
(“setting” -> (1))
(“up” -> (1, 1))
(“in” -> (1))
(“my” -> (1, 1))
(“room” -> (1))
(“twttr” -> (1))
Long]
(“Bears” -> 1)
(”just” -> 1)
(“setting” -> 1)
(“up” -> 2)
(“in” -> 1)
(“my” -> 2)
(“room” -> 1)
(“twttr” -> 1)
10
Summingbird WordCount
Input source
Output destination
Def wordCount[P <: Platform[P]]
(source: Producer[P, String],
store: P#Store[String, Long]) =
source.flatMap {
sentence => toWords(sentence).map(_ -> 1L)
}.sumByKey(store)
Run on Scalding (Cascading/Hadoop)
Scalding.run {
wordCount[Scalding](
Scalding.source[Tweet]("source_data"),
Scalding.store[String, Long]("count_out")
)}
Streaming tweets as input source
HDFS as output destination
Run on Storm
Streaming tweets as input source
Storm.run {
wordCount[Storm](
Key/Value store as output destination
new TweetSpout(),
new MemcacheStore[String, Long]
)}
11
T => List[(K,V)]
Producer
flatMap
flatMap
(K, List[V]) => (K,V)
Store[K,V]
Reduce
Producer
flatMap
Producer
flatMap
Store
Reduce
Client
12
Producer
flatMap
flatMap
Hadoop
Reduce
Store
Producer
flatMap
Producer
run hourly Hadoop jobs to
process large volumes of log data
Reduce
flatMap
Client
Producer
flatMap
flatMap
Reduce
Store
Producer
flatMap
Producer
flatMap
Reduce
Storm
get real-time counts for the latest
hour with Storm to identify “hot
content” with minimal latency
13
Input
Input
Input
Map
Map
Map
Reduce
Reduce
Output
Output
Spout
Bolt
Bolt
Bolt
memcached
Bolt
Bolt
Batch Processing Delay
• Hadoop cluster is operating beyond capacity, jobs may not
generate results in a timely fashion
• Client continues gathering results from Storm and wait for
Hadoop to catch up
• Summingbird transparently handles these issues to provide an
integrated view of the data to querying clients
15
“ape” by Hour (Hadoop)
“ape” by Hour (Merged)
“ape” by Hour (Storm)
Hadoop is operating
beyond capacity
7
continues gathering
results from Storm
5
4
2
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
16
“ape” by Hour (Merged)
7
5
4
2
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Monoids and Semigroups
• Addition (e.g., counting)
• Multiplication (e.g., computing probabilities)
• Max
• Min
• Moments (Mean, Variance, etc.)
• Sets (e.g., keeping track of set membership)
• HashMaps with monoid values (e.g., computing histograms)
• Monoid minhash (a fast probabilistic alg for computing
similarity between objects)
18
Interesting Monoids
• For many tasks, close enough is good enough!
• Bloom filters (set membership)
• HyperLogLog counters (cardinality estimation)
• Count-min sketches (event counts)
• Common features
• Variations on hashing
• Bounded error
19
Interesting Monoids: Bloom filters
• Compact probabilistic data structure for keeping track of set membership
• False positive matches are possible, but false negatives are not possible
• App: keep track of users who have been exposed to a certain event (e.g.,
Tweet, recommendation, etc.) in order to avoid duplicate impressions – a
challenging task for Twitter as this involves keeping track of O(108)
objects/day for O(108) users
• Using Bloom filters: a user will never be exposed to the same treatment
twice
• Provide accuracy/space tradeoffs: given a desired error rate and a given
capacity (determined a priori by the developer based on different
application scenarios), we can appropriately set the size of the filter
20
Interesting Monoids: HyperLogLog
counters
• Compact probabilistic data structures for cardinality estimation (i.e.,
size of a set)
• App:
• keeping track of the number of unique users who have performed a
certain action, e.g., retweeted or favorited a Tweet, clicked on a link, etc. –
a challenging task for Twitter as a naive exact solution based on sets would be
impractical for O(108) users, particularly for events with high cardinalities
(e.g., retweets of celebrities‘ Tweets or Tweet impressions)
• computing graph statistics such as the size of a node's second-degree
neighborhood (e.g., followers of followers)
• A hyperloglog counter occupies O(log log n) space for cardinalities
of up to n, tunable within the (ε, δ) framework, i.e., achieving (1 ±
ε)-approximation with probability δ
21
Interesting Monoids: Count-min
sketches
• Compact probabilistic data structures for keeping track of
frequencies (i.e., counts) associated with events
• Based on hashing objects into a two dimension array of counts
using a series of hash functions
• Uses only sub-linear space, at the expense of overcounting some
events due to collisions
• App:
• keeping track of the number of times a query was issued to Twitter search
within a span of time
• building histograms of events
• Given a desired error bound in the (ε, δ) model based on the
application scenario, we can compute the size of the data structure
that underlies the count-min sketch
22
Cheat sheet
• For many tasks, exact answers are impractical and approximate
answers are good enough
Set membership
Exact
Approximate
set
Bloom filter
Cardinality estimation set
Hyperloglog counter
Frequency count
Count-min sketches
hashmap
23
Hourly counts of search queries by
hashmap and CMS (same logic)
• Exact with hashmaps (can’t scale due to linear memory space)
Def wordCount[P <: Platform[P]]
(source: Producer[P, String],
store: P#Store[Long, Map[String, Long]]) =
source.flatMap {
query => (query.getHour, Map(query.getQuery -> 1L))
}.sumByKey(store)
• Approximate with count-min sketches (sub-linear space)
Def wordCount[P <: Platform[P]]
(source: Producer[P, String],
store: P#Store[Long, SketchMap[String, Long]])
(implicit SMM: SketchMapMonoid[String, Long]) =
source.flatMap {
query => (query.getHour, SMM.create(query.getQuery, 1L))
}.sumByKey(store)
24
Summingbird
• Hybrid online/batch wordcount
25
Events with Batch ids
• Each event is tagged with a batch id
• Each even is mapped with a timestamp and then a batch id
• E.g. a log event are annotated with the timestamp it was generated,
then mapped to the nearest hour
• Each event is sent to both message queue (for stream
processing) and HDFS (for batch processing)
26
Batch processing
• Hadoop job is triggered periodically when all events from the latest
batch (e.g. one hour) have been deposited in HDFS
• Problem: some events will be generated near the end of a batch and will
be categorized into the next batch due to network latencies
• Solution: run Hadoop job across a moving window of configurable size
(e.g. two windows), but discard events not belonging to the relevant batch
• The batch-results key-value store polls HDFS for the new results
corresponding to the last batch
• Key-value store maintains (K, (batchId, V)) pairs
• New results will be used to update these pairs (e.g. sum, min or max), and
also renew the batchId
27
Stream processing
• In parallel with the batch jobs, the same summingbird program
is continuously executed in a Storm topology
• The online-results key-value store maintains partial results for
the latest batch in the format of ((K, batchId), V) pairs
28
Query processing
• Queries first go to the batch-results key-value store and obtain
the aggregated results up to last batch
• Queries then go to the online-results key-value store, obtain
the partial results for latest batch and obtain the final
aggregated results
29