Architectures for Stateful Data-intensive analytics
Download
Report
Transcript Architectures for Stateful Data-intensive analytics
Analyzing big, continuous data
Dionysios Logothetis, Georgos Siganos
Telefonica Research
Collaboration with:
Ken Yocum
Chris Olston, Ben Reed
UC San Diego
Yahoo! Research
Big data analytics
• Data abound
– Web content, e-mails, CDRs,
financial, retail, scientific data
• Drive innovation: science, business, finance
– CDRs: detect calling patterns
– E-mails: targeted advertising
– Social media feeds: brand monitoring
Numerous other: fraud detection, product recommendation,
content personalization, genomics, weather prediction, …
2
Big challenges
• Scale: data are growing too fast
– Facebook: 15TB/day in 2009 130TB/day in 2011
• Rich analytics: need more than relational
– Some operations not expressible in SQL
– e.g. graph mining, machine learning
• Efficiency: do it quickly and economically
– Data “freshness” is important
– Computation and network may cost
3
Data-intensive Scalable Computing (DISC)
Allow users to easily analyze big data
• MapReduce (Google), Hadoop
(Yahoo), Dryad (MS)
• Flexible programming models
– Allow sophisticated analytics
DISC
• Scalable architectures
– Easy to parallelize analysis
– Use 1000s of cheap PCs
4
Scaling with massive data-parallelism
Input
Compute cluster
Data sources
e.g. web crawlers
Bigger data?
Throw in more machinery!
Data
Processing
Operators
Output
Scale to virtually any data!
5
Update-driven analytics
• Scalability is not enough
• Analytics constantly integrate new data
• Example: web analytics
– New pages crawled every hour
– Re-compute results from scratch
– As data grow, add more machines
DISC
• Inefficient: only fraction of pages updated
• Wastes CPU, network, energy and money
Output
New
output
Need a new way to program these analytics efficiently
6
Programming with state
Must be able to re-use computation
• Incremental: data change across time
– e.g. new web pages every hour
– Update result, do not re-compute
• Iterative: data change across iterations
State
– e.g. machine learning, graph mining
– Refine prior analysis result
Outputoutput
Updated
State is fundamental for efficiency
7
Example: Incremental web analytics
• 90-node Hadoop cluster, real Yahoo! data set
• Incrementally maintain URL in-link counts
• Continuously:
– Crawl and process 30 GB of new pages: count URLs
– Read old URL counts and increment
– Save new URL counts to HDFS
“cnn.com”, 2
“fox.com”, 3
…
Processing time (sec)
70
Hadoop
60
Running time proportional
to state size
Ideal
50
40
30
Running time proportional
to state updates
20
10
0
0
1
2
3
4
5
Increment
6
7
8
8
The problem
State is outside the scope of the processing system
Application
Processing system
(e.g Hadoop)
State
management
Storage
(e.g. HDFS, DBMS, Bigtable)
• Programmability
– Users forced to manage state manually
– Use HDFS or local files, Bigtable, DBMS
– Result: Custom and fragile code
• Efficiency
– System is not aware of state
– Or treats it like any other input
– Result: Wasted data movement and processing
9
The Continuous Bulk Processing model (CBP)
Application
Processing system
(e.g Hadoop)
State
management
Storage
(e.g. HDFS, DBMS, Bigtable)
• Idea: State as a first-class abstraction
• Abstract state management: easier programming
• State-aware system: allows optimizations
10
Continuous Bulk Proccessing (CBP)
• Scalable processing on bulk-data
– Groupwise processing primitive
• Sophisticated incremental/iterative analytics
– Generic stateful operator
• Simplify dataflow programming
– Primitives for programmatic dataflow control
• Efficient processing
– Leverages model to optimize state management
11
Groupwise processing
• Core concept underlying relational operations, e.g. GROUP BY
• Users specify grouping key for input records
• User function applied per group, e.g. count
input
“cnn.com”
“nbc.com”
“nbc.com”
“fox.com”
“cnn.com”
output
group input by
key, e.g. URL
count(“cnn.com”,
)
2
count(“nbc.com”,
)
2
count(“fox.com”,
)
1
Allows each group to be processed in parallel
12
Groupwise processing in CBP
• User-defined Translate() function
– Multiple input and output flows
• RouteBy() function extracts grouping key
– One RouteBy function per input flow
Fin1
Fin2
…
T(k1, F1[], F2[] )
Processing T(k , F [], F [] )
Ø
2 1
2
stage
• Processing performed in a stage:
– Groups input records by key
– Calls Translate() for every key in parallel
– Runs repeatedly in epochs
Fout1
2
1
13
Maintaining state in CBP
• We model state as a loopback flow
• Writes to loopback flows appear in next epoch
• Stage groups input with state records
Fin1
FState
T(k1, F1[], FState
Ø [] )
T(k2, F1[], FState
Ø [] )
…
• Translate uses loopback flows like any other flow
New data
21
21
• Translate can:
– Add/modify state: writes new record
– Delete state: does not write record
14
Allowing selective state updates
• State increases in size
– e.g. by adding new URL counts
• New data affect only fraction of the state
– e.g. only the “green” state record
Fin
FState
1
T(k1, F1[], FState[])
T(k2, F1[], FState[] )
…
• Current systems do “outer”-grouping
– Call user function for every key in input or state
• Forces system to access the whole state
• CBP allows “inner”-grouping of input with state
– Call Translate() only if key exists in the input
• Allows to access only the necessary state
15
Implementing the CBP model
• Naïve approach: use Hadoop/MapReduce as a
“black-box”
• Leverage scalability and fault-tolerance
State
Input 1
Hadoop
Updated state
Hadoop
Input 2
Output 1
Output 2
16
CBP emulation on MapReduce
map()
reduce()
RouteBy()
Translate()
Input
Output
Shuffle
M
R
2
R
2
1
M
M
17
Impedance mismatches
• Treats state as any input
– Unnecessary state processing/movement
• Supports only “outer”-grouping
– Accesses whole state even for small updates
CBP prototype implements model and optimizations
18
Incremental state shuffling
• Hadoop treats state as any input
– Re-maps & re-shuffles state to group
– State already grouped
State
map
reduce
M
M
R
State
R
• Unnecessary processing/movement
State
• CBP separates state from other data
– Stores state to side files
– Copies state directly to reducers
M
M
R
R
• Avoids re-processing and re-shuffling
19
Impact of incremental shuffling
• 90-node Hadoop cluster, real Yahoo! data set
• Incrementally maintain URL in-link counts
• Continuously:
– Crawl and process 30 GB of new pages: count URLs
– Read old URL counts and increment
– Save new URL counts to HDFS
“cnn.com”, 2
“fox.com”, 3
…
Processing time (sec)
70
Hadoop
CBP
Ideal
60
Running time proportional
to state size
50
50% less time
40
Near constant
processing time
30
20
10
Running time proportional
to state updates
0
0
1
2
3
4
5
Increment
6
7
8
20
Supporting inner-grouping
• Inner-grouping selects fraction of state
• Requires random access to state
• DISC systems: optimized for sequential access
Lesson from the DB community:
• Maintain an index on state records
21
Accessing state randomly
Idea: Put state in a table-based store
• Bigtable/Hypertable
– Scalable table-stores: allow random access
– More functionality than we need
• Implemented custom indexed files
– Each reducer maintains its own indexed file
• Reducers avoid scanning whole state
Input
M
M
R
R
State
Key
“blue”
“green”
“purple”
…
Value
…
22
Evaluation
• Validate benefit of explicit state modeling
– Impact of optimizations on efficiency
• Processing time
• Data moved across network
• Sophisticated applications, real-world data
– Web analytics
– PageRank
– Clustering coefficients
23
The PageRank algorithm
• Used by Google to deliver search results
– Given a web graph, assigns ranks to pages
0.2
0.5
0.3
Algorithm overview:
• Assign initial rank
• Repeat until ranks converge
– Vertices distribute their ranks to neighbors
– Vertices update their own ranks
1/3
1/3
1/6
A
B
1/6
C
1/3
24
Solving graph problems
How we usually do it:
• Vertices maintain state
– e.g. neighbors, current rank
State
A
State
B
• Local computation per vertex
– e.g. update rank
– Can be performed in parallel
C
State
• Message exchange between vertices
– e.g. distribute rank
– Synchronization among vertices
25
Programming graphs in CBP
• A state record corresponds to a vertex
•
Grouping key is the ID of the vertex, e.g. page URL
• Stage calls Translate() for every vertex
– Updates current state
– Outputs messages to a loopback flow
• Stage groups messages with state (i.e. vertex)
input
state
messages
- Updates state
- Outputs messages
output
– Vertices “receive” messages
Leveraging CBP grouping primitives:
• Inner-grouping: process vertices with messages, not whole graph
26
Incremental PageRank
• PageRank
– Iterative algorithm
– Computes site popularity based on web-graph
– Small changes affect the whole graph
• Incremental version
[S. Chien et al.]
– Re-computes only mostly affected subgraph
– Makes use of inner-grouping
• The experiment
subgraph
– Real web graph with 7.5M nodes, 109M edges
– We add 2800 new edges
– 16-node cluster
27
Results
450
400
350
300
250
200
150
100
50
0
Data transferred
CBP emulation
Incremental shuffling
Inner grouping
Phase 1
0
2
4
Cumulative data moved (GB)
Cumulative time (min)
Running time
Phase 2
6
8 10
Epoch
12
14
16
Incremental shuffling:
• Reduces running time by 23%
400
350
300
250
200
150
100
50
0
CBP emulation
Incremental shuffling
Phase 1
0
2
4
6
Phase 2
8 10
Epoch
12
14
16
• Reduces network load by 46%
Inner grouping:
• Only 0.5% of the state needs to be accessed
• Reduces running time by 53%
28
Summary of CBP
• Integrates state with groupwise processing
– Scalable stateful analytics
• Expressive model for sophisticated stateful analytics
– Used to implement real applications
• Leverages model to optimize state management
– Reduces processing times
– Uses less compute and network resources
29
Mining large graphs in real-time
• A lot of intelligence hidden inside relations
• Social graphs
– Friend/article recommendation, targeted Ads
• Call graphs in Telcos
– Fraud detection, network monitoring
“Find what my community
read in the last 5 minutes”
• Graphs may change rapidly
– Phone calls, posts every second
• Challenge: update graph analytics in real-time
“Detect suspicious calling
patterns for every new call”
30
Incremental graph algorithms
• Real-timeness requires incremental updates
– Too costly to re-compute from scratch
• CBP: explicitly incremental programs
– Easy for some analytics, e.g. a sum
• But… graph algorithms are more complex!
– e.g. clustering, PageRank, pattern matching, etc.
– How do you update clustering when the graph changes?
• Designing incremental graph algorithms is hard
31
Real-time graph mining with GraphInc
Idea
• Same trick: computation re-use
But
• Users write algorithms as if on static data
– Much simpler, works as before
• System updates analysis transparently
– Automatically detects opportunities for re-use
32
Example: computing shortest paths
S
D
New edge
• Identify computations that don’t change
• When changes happen, execute only the diff
• Remember previous state and messages sent
– If all remain the same, no need to execute
• Results in computation and communication savings
33
Thank you