Transcript 2 - BigData

MapReduce Programming
MapReduce: Recap
• Programmers must specify:
map (k, v) → list(<k’, v’>)
reduce (k’, list(v’)) → <k’’, v’’>
– All values with the same key are reduced together
• Optionally, also:
partition (k’, number of partitions) → partition for k’
– Often a simple hash of the key, e.g., hash(k’) mod n
– Divides up key space for parallel reduce operations
combine (k’, v’) → <k’, v’>*
– Mini-reducers that run in memory after the map phase
– Used as an optimization to reduce network traffic
• The execution framework handles everything else…
k1 v1
k2 v2
map
a 1
k4 v4
map
b 2
c 3
combine
a 1
k3 v3
c 6
a 5
map
b 7
c 2
combine
c 9
partition
k6 v6
map
combine
b 2
k5 v5
a 5
partition
combine
c 2
b 7
partition
1 5
b
2 7
c 8
partition
Shuffle and Sort: aggregate values by keys
a
c 8
c
2 9 8
reduce
reduce
reduce
r1 s1
r2 s2
r3 s3
“Everything Else”
• The execution framework handles everything else…
– Scheduling: assigns workers to map and reduce tasks
– “Data distribution”: moves processes to data
– Synchronization: gathers, sorts, and shuffles intermediate data
– Errors and faults: detects worker failures and restarts
• Limited control over data and execution flow
– All algorithms must expressed in m, r, c, p
• You don’t know:
– Where mappers and reducers run
– When a mapper or reducer begins or finishes
– Which input a particular mapper is processing
– Which intermediate key a particular reducer is processing
Tools for Synchronization
• Cleverly-constructed data structures
– Bring partial results together
• Sort order of intermediate keys
– Control order in which reducers process keys
• Partitioner
– Control which reducer processes which keys
• Preserving state in mappers and reducers
– Capture dependencies across multiple keys and
values
Basic Hadoop API
• Mapper
– void map(K1 key, V1 value, OutputCollector<K2, V2> output,
Reporter reporter)
– void configure(JobConf job)
– void close() throws IOException
• Reducer/Combiner
– void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3,V3> output, Reporter reporter)
– void configure(JobConf job)
– void close() throws IOException
• Partitioner
– void getPartition(K2 key, V2 value, int numPartitions)
*Note: forthcoming API changes…
Data Types in Hadoop
Writable
Defines a de/serialization protocol.
Every data type in Hadoop is a Writable.
WritableComparable Defines a sort order. All keys must be
of this type (but not values).
IntWritable
LongWritable
Text
…
SequenceFiles
Concrete classes for different data types.
Binary encoded of a sequence of
key/value pairs
Hadoop Map Reduce Example
• See the word count example from Hadoop
Tutorial
Word Count in Java
public class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> out,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
out.collect(new text(itr.nextToken()), ONE);
}
}
}
Word Count in Java
public class ReduceClass extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> out,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
out.collect(key, new IntWritable(sum));
}
}
Word Count in Java
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(ReduceClass.class);
conf.setReducerClass(ReduceClass.class);
FileInputFormat.setInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class); // out keys are words (strings)
conf.setOutputValueClass(IntWritable.class); // values are counts
JobClient.runJob(conf);
}
Word Count in Python with Hadoop Streaming
Mapper.py:
Reducer.py:
import sys
for line in sys.stdin:
for word in line.split():
print(word.lower() + "\t" + 1)
import sys
counts = {}
for line in sys.stdin:
word, count = line.split("\t”)
dict[word] = dict.get(word, 0) + int(count)
for word, count in counts:
print(word.lower() + "\t" + 1)
Basic Cluster Components
• One of each:
– Namenode (NN)
– Jobtracker (JT)
• Set of each per slave machine:
– Tasktracker (TT)
– Datanode (DN)
Putting everything together…
namenode
job submission node
namenode daemon
jobtracker
tasktracker
tasktracker
tasktracker
datanode daemon
datanode daemon
datanode daemon
Linux file system
Linux file system
Linux file system
…
slave node
…
slave node
…
slave node
Anatomy of a Job
• MapReduce program in Hadoop = Hadoop job
– Jobs are divided into map and reduce tasks
– An instance of running a task is called a task attempt
– Multiple jobs can be composed into a workflow
• Job submission process
– Client (i.e., driver program) creates a job, configures it, and
submits it to job tracker
– JobClient computes input splits (on client end)
– Job data (jar, configuration XML) are sent to JobTracker
– JobTracker puts job data in shared location, enqueues tasks
– TaskTrackers poll for tasks
– Off to the races…
Mapper
Mapper
Mapper
Mapper
Mapper
Intermediates
Intermediates
Intermediates
Intermediates
Intermediates
Partitioner
Partitioner
Partitioner
Partitioner
Partitioner
(combiners omitted here)
Intermediates
Intermediates
Intermediates
Reducer
Reducer
Reduce
Source: redrawn from a slide by Cloduera, cc-licensed
OutputFormat
Reducer
Reducer
Reduce
RecordWriter
RecordWriter
RecordWriter
Output File
Output File
Output File
Source: redrawn from a slide by Cloduera, cc-licensed
Input and Output
• InputFormat:
–
–
–
–
TextInputFormat
KeyValueTextInputFormat
SequenceFileInputFormat
…
• OutputFormat:
– TextOutputFormat
– SequenceFileOutputFormat
–…
Shuffle and Sort in Hadoop
• Probably the most complex aspect of MapReduce!
• Map side
– Map outputs are buffered in memory in a circular buffer
– When buffer reaches threshold, contents are “spilled” to disk
– Spills merged in a single, partitioned file (sorted within each partition):
combiner runs here
• Reduce side
– First, map outputs are copied over to appropriate reducer machine
– “Sort” is a multi-pass merge of map outputs (happens in memory and on
disk): partitioner runs here
– Final merge pass goes directly into reducer
Shuffle and Sort
intermediate files
(on disk)
Mapper
merged spills
(on disk)
Partitioner
circular buffer
(in memory)
Combiner
spills (on disk)
other mappers
other reducers
Reducer
Graph Algorithms in MapReduce
• G = (V,E), where
– V represents the set of vertices (nodes)
– E represents the set of edges (links)
– Both vertices and edges may contain additional
information
Graphs and MapReduce
• Graph algorithms typically involve:
– Performing computations at each node: based
on node features, edge features, and local link
structure
– Propagating computations: “traversing” the
graph
• Key questions:
– How do you represent graph data in
MapReduce?
– How do you traverse a graph in MapReduce?
Representing Graphs
• G = (V, E)
• Two common representations
– Adjacency matrix
– Adjacency list
Adjacency Matrices
Represent a graph as an n x n square matrix M
– n = |V|
– Mij = 1 means a link from node i to j
1
2
3
4
1
0
1
1
1
2
1
0
0
0
3
0
1
0
1
4
1
1
0
0
2
1
3
4
Adjacency Matrices: Critique
• Advantages:
– Amenable to mathematical manipulation
– Iteration over rows and columns corresponds to
computations on outlinks and inlinks
• Disadvantages:
– Lots of zeros for sparse matrices
– Lots of wasted space
Adjacency Lists
Take adjacency matrices… and throw away
all the zeros
1
2
3
4
1
0
1
1
1
2
1
0
0
0
3
0
1
0
1
4
1
1
0
0
1: 2, 4
2: 1, 3, 4
3: 1
4: 1, 3
Adjacency Lists: Critique
• Advantages:
– Much more compact representation
– Easy to compute over outlinks
• Disadvantages:
– Much more difficult to compute over inlinks
Single Source Shortest Path
• Problem: find shortest path from a source
node to one or more target nodes
– Shortest might also mean lowest weight or cost
• First, a refresher: Dijkstra’s Algorithm
Dijkstra’s Algorithm Example
1


10
2
0
3
9
6
7
5


2
Example from CLR
4
Dijkstra’s Algorithm Example
1
10

10
2
0
3
9
6
7
5

5
2
Example from CLR
4
Dijkstra’s Algorithm Example
1
8
14
10
2
0
3
9
6
7
5
5
7
2
Example from CLR
4
Dijkstra’s Algorithm Example
1
8
13
10
2
0
3
9
6
7
5
5
7
2
Example from CLR
4
Dijkstra’s Algorithm Example
1
1
8
9
10
2
0
3
9
6
7
5
5
7
2
Example from CLR
4
Dijkstra’s Algorithm Example
1
8
9
10
2
0
3
9
6
7
5
5
7
2
Example from CLR
4
Single Source Shortest Path
• Problem: find shortest path from a source
node to one or more target nodes
– Shortest might also mean lowest weight or cost
• Single processor machine: Dijkstra’s
Algorithm
• MapReduce: parallel Breadth-First Search
(BFS)
Source: Wikipedia (Wave)
Finding the Shortest Path
• Consider simple case of equal edge weights
• Solution to the problem can be defined inductively
• Here’s the intuition:
– Define: b is reachable from a if b is on adjacency list of a
– DISTANCETO(s) = 0
– For all nodes p reachable from s,
DISTANCETO(p) = 1
– For all nodes n reachable from some other set of nodes M,
DISTANCETO(n) = 1 + min(DISTANCETO(m), m  M)
d1 m1
…
…
s
…
d2
n
m2
d3
m3
Visualizing Parallel BFS
n7
n0
n1
n2
n3
n6
n5
n4
n8
n9
From Intuition to Algorithm
• Data representation:
– Key: node n
– Value: d (distance from start), adjacency list (list of nodes
reachable from n)
– Initialization: for all nodes except for start node, d = 
• Mapper:
– m  adjacency list: emit (m, d + 1)
• Sort/Shuffle
– Groups distances by reachable nodes
• Reducer:
– Selects minimum distance path for each reachable node
– Additional bookkeeping needed to keep track of actual path
Multiple Iterations Needed
• Each MapReduce iteration advances the
“known frontier” by one hop
– Subsequent iterations include more and more
reachable nodes as frontier expands
– Multiple iterations are needed to explore entire
graph
• Preserving graph structure:
– Problem: Where did the adjacency list go?
– Solution: mapper emits (n, adjacency list) as
well
BFS Pseudo-Code
Stopping Criterion
• How many iterations are needed in parallel
BFS (equal edge weight case)?
• Convince yourself: when a node is first
“discovered”, we’ve found the shortest path
• Now answer the question...
– Six degrees of separation?
• Practicalities of implementation in
MapReduce
Comparison to Dijkstra
• Dijkstra’s algorithm is more efficient
– At any step it only pursues edges from the
minimum-cost path inside the frontier
• MapReduce explores all paths in parallel
– Lots of “waste”
– Useful work is only done at the “frontier”
• Why can’t we do better using MapReduce?
Weighted Edges
• Now add positive weights to the edges
– Why can’t edge weights be negative?
• Simple change: adjacency list now includes
a weight w for each edge
– In mapper, emit (m, d + wp) instead of (m, d +
1) for each node m
• That’s it?
Stopping Criterion
• How many iterations are needed in parallel
BFS (positive edge weight case)?
• Convince yourself: when a node is first
“discovered”, we’ve found the shortest path
Additional Complexities
1
search frontier
1
n6
1
n7
n8
10
r
1
n1
1
s
p
n9
n5
1
q
1
n2
1
n3
n4
Stopping Criterion
• How many iterations are needed in parallel
BFS (positive edge weight case)?
• Practicalities of implementation in
MapReduce
Graphs and MapReduce
• Graph algorithms typically involve:
– Performing computations at each node: based on node features,
edge features, and local link structure
– Propagating computations: “traversing” the graph
• Generic recipe:
– Represent graphs as adjacency lists
– Perform local computations in mapper
– Pass along partial results via outlinks, keyed by destination node
– Perform aggregation in reducer on inlinks to a node
– Iterate until convergence: controlled by external “driver”
– Don’t forget to pass the graph structure between iterations
PageRank: Defined
Given page x with inlinks t1…tn, where
– C(t) is the out-degree of t
–  is probability of random jump
– N is the total number of nodes in the graph
n
PR (ti )
1
PR ( x)      (1   )
N
i 1 C (ti )
t1
X
t2
…
tn
Example
y a
y 1/2 1/2
a 1/2 0
m 0 1/2
Yahoo
Amazon
M’soft
m
0
1
0
Simulating a Random Walk
• Start with the vector v = [1,1,…,1]
representing the idea that each Web page
is given one unit of importance.
• Repeatedly apply the matrix M to v,
allowing the importance to flow like a
random walk.
• Limit exists, but about 50 iterations is
sufficient to estimate final distribution.
Example
• Equations v = M v :
y = y /2 + a /2
a = y /2 + m
m = a /2
y
a =
m
1
1
1
1
3/2
1/2
5/4
1
3/4
9/8
11/8
1/2
...
6/5
6/5
3/5
Solving The Equations
• Because there are no constant terms, these
3 equations in 3 unknowns do not have a
unique solution.
• Add in the fact that y +a +m = 3 to solve.
• In Web-sized examples, we cannot solve by
Gaussian elimination; we need to use
relaxation (= iterative solution).
Real-World Problems
• Some pages are “dead ends” (have no links
out).
– Such a page causes importance to leak out.
• Other (groups of) pages are spider traps (all
out-links are within the group).
– Eventually spider traps absorb all importance.
Microsoft Becomes Dead End
y a
y 1/2 1/2
a 1/2 0
m 0 1/2
Yahoo
Amazon
M’soft
m
0
0
0
Example
• Equations v = M v :
y = y /2 + a /2
a = y /2
m = a /2
y
a =
m
1
1
1
1
1/2
1/2
3/4
1/2
1/4
5/8
3/8
1/4
...
0
0
0
M’soft Becomes Spider Trap
y a
y 1/2 1/2
a 1/2 0
m 0 1/2
Yahoo
Amazon
M’soft
m
0
0
1
Example
• Equations v = M v :
y = y /2 + a /2
a = y /2
m = a /2 + m
y
a =
m
1
1
1
1
1/2
3/2
3/4
1/2
7/4
5/8
3/8
2
...
0
0
3
Google Solution to Traps, Etc.
• “Tax” each page a fixed percentage at each
interation.
• Add the same constant to all pages.
• Models a random walk with a fixed
probability of going to a random place next.
Example: Previous with 20% Tax
• Equations v = 0.8(M v ) + 0.2:
y = 0.8(y /2 + a/2) + 0.2
a = 0.8(y /2) + 0.2
m = 0.8(a /2 + m) + 0.2
y
a =
m
1
1
1
1.00 0.84
0.60 0.60
1.40 1.56
0.776
0.536 . . .
1.688
7/11
5/11
21/11
Computing PageRank
• Properties of PageRank
– Can be computed iteratively
– Effects at each iteration are local
• Sketch of algorithm:
– Start with seed PRi values
– Each page distributes PRi “credit” to all pages it
links to
– Each target page adds up “credit” from multiple inbound links to compute PRi+1
– Iterate until values converge
Sample PageRank Iteration (1)
Iteration 1
n2 (0.2)
n1 (0.2) 0.1
0.1
n2 (0.166)
0.1
n1 (0.066)
0.1
0.066
0.2
n4 (0.2)
0.066
0.066
n5 (0.2)
0.2
n5 (0.3)
n3 (0.2)
n4 (0.3)
n3 (0.166)
Sample PageRank Iteration (2)
Iteration 2
n2 (0.166)
n1 (0.066) 0.033
0.083
n2 (0.133)
0.083
n1 (0.1)
0.033
0.1
0.3
n4 (0.3)
0.1
0.1
n5 (0.3)
n5 (0.383)
n3 (0.166)
0.166
n4 (0.2)
n3 (0.183)
PageRank in MapReduce
n1 [n2, n4]
n2 [n3, n5]
n2
n3
n3 [n4]
n4 [n5]
n4
n5
n5 [n1, n2, n3]
Map
n1
n4
n2
n2
n5
n3
n3
n4
n4
n1
n2
n5
Reduce
n1 [n2, n4] n2 [n3, n5]
n3 [n4]
n4 [n5]
n5 [n1, n2, n3]
n3
n5
PageRank Pseudo-Code
PageRank Convergence
• Alternative convergence criteria
– Iterate until PageRank values don’t change
– Iterate until PageRank rankings don’t change
– Fixed number of iterations
• Convergence for web graphs?
Beyond PageRank
• Link structure is important for web search
– PageRank is one of many link-based features:
HITS, SALSA, etc.
– One of many thousands of features used in
ranking…
• Adversarial nature of web search
–
–
–
–
Link spamming
Spider traps
Keyword stuffing
…
Mapreduce and Databases
Relational Databases vs. MapReduce
• Relational databases:
– Multipurpose: analysis and transactions; batch and interactive
– Data integrity via ACID transactions
– Lots of tools in software ecosystem (for ingesting, reporting, etc.)
– Supports SQL (and SQL integration, e.g., JDBC)
– Automatic SQL query optimization
• MapReduce (Hadoop):
– Designed for large clusters, fault tolerant
– Data is accessed in “native format”
– Supports many query languages
– Programmers retain control over performance
– Open source
Source: O’Reilly Blog post by Joseph Hellerstein (11/19/2008)
Database Workloads
• OLTP (online transaction processing)
– Typical applications: e-commerce, banking, airline reservations
– User facing: real-time, low latency, highly-concurrent
– Tasks: relatively small set of “standard” transactional queries
– Data access pattern: random reads, updates, writes (involving
relatively small amounts of data)
• OLAP (online analytical processing)
– Typical applications: business intelligence, data mining
– Back-end processing: batch workloads, less concurrency
– Tasks: complex analytical queries, often ad hoc
– Data access pattern: table scans, large amounts of data involved
per query
OLTP/OLAP Architecture
ETL
(Extract, Transform, and Load)
OLTP
OLAP
OLTP/OLAP Integration
• OLTP database for user-facing transactions
– Retain records of all activity
– Periodic ETL (e.g., nightly)
• Extract-Transform-Load (ETL)
– Extract records from source
– Transform: clean data, check integrity, aggregate, etc.
– Load into OLAP database
• OLAP database for data warehousing
– Business intelligence: reporting, ad hoc queries, data mining, etc.
– Feedback to improve OLTP services
OLTP/OLAP/Hadoop
Architecture
ETL
(Extract, Transform, and Load)
OLTP
Hadoop
OLAP
ETL Bottleneck
• Reporting is often a nightly task:
– ETL is often slow: why?
– What happens if processing 24 hours of data takes longer than 24
hours?
• Hadoop is perfect:
– Most likely, you already have some data warehousing solution
– Ingest is limited by speed of HDFS
– Scales out with more nodes
– Massively parallel
– Ability to use any processing tool
– Much cheaper than parallel databases
– ETL is a batch process anyway!
MapReduce algorithms
for processing relational data
Relational Algebra
• Primitives
– Projection ()
– Selection ()
– Cartesian product ()
– Set union ()
– Set difference ()
– Rename ()
• Other operations
– Join (⋈)
– Group by… aggregation
– …
Projection
R1
R1
R2
R2
R3
R4
R5

R3
R4
R5
Projection in MapReduce
• Easy!
– Map over tuples, emit new tuples with appropriate attributes
– Reduce: take tuples that appear many times and emit only one
version (duplicate elimination)
• Tuple t in R: Map(t, t) -> (t’,t’)
•
Reduce (t’, [t’, …,t’]) -> [t’,t’]
• Basically limited by HDFS streaming speeds
– Speed of encoding/decoding tuples becomes important
– Relational databases take advantage of compression
– Semistructured data? No problem!
Selection
R1
R2
R3
R4
R5

R1
R3
Selection in MapReduce
• Easy!
– Map over tuples, emit only tuples that meet criteria
– No reducers, unless for regrouping or resorting tuples (reducers are
the identity function)
– Alternatively: perform in reducer, after some other processing
• But very expensive!!! Has to scan the database
– Better approaches?
Union, Set Intersection and Set
Difference
• Similar ideas: each map outputs the tuple
pair (t,t). For union, we output it once, for
intersection only when in the reduce we
have (t, [t,t])
• For Set difference?
Set Difference
- Map Function: For a tuple t in R, produce keyvalue pair (t, R), and for a tuple t in S, produce
key-value pair (t, S).
- Reduce Function: For each key t, do the following.
1. If the associated value list is [R], then
produce (t, t).
2. If the associated value list is anything else,
which could only be [R, S], [S, R], or [S], produce
(t, NULL).
Group by… Aggregation
• Example: What is the average time spent per URL?
• In SQL:
– SELECT url, AVG(time) FROM visits GROUP BY url
• In MapReduce:
– Map over tuples, emit time, keyed by url
– Framework automatically groups values by keys
– Compute average in reducer
– Optimize with combiners
Relational Joins
R1
S1
R2
S2
R3
S3
R4
S4
R1
S2
R2
S4
R3
S1
R4
S3
Join Algorithms in MapReduce
• Reduce-side join
• Map-side join
• In-memory join
– Striped variant
– Memcached variant
Reduce-side Join
• Basic idea: group by join key
– Map over both sets of tuples
– Emit tuple as value with join key as the intermediate key
– Execution framework brings together tuples sharing the same key
– Perform actual join in reducer
– Similar to a “sort-merge join” in database terminology
Map-Reduce Join
• A Map process turns:
– Each input tuple R(a,b) into key-value pair (b,(a,R))
– Each input tuple S(b,c) into (b,(c,S))
• Map processes send each key-value pair with
key b to Reduce process h(b)
– Hadoop does this automatically; just tell it what k is.
• Each Reduce process matches all the pairs
(b,(a,R)) with all (b,(c,S)) and outputs (a,b,c).
87
Map-side Join: Parallel Scans
• If datasets are sorted by join key, join can be accomplished by a scan
over both datasets
• How can we accomplish this in parallel?
– Partition and sort both datasets in the same manner
• In MapReduce:
– Map over one dataset, read from other corresponding partition
– No reducers necessary (unless to repartition or resort)
• Consistently partitioned datasets: realistic to expect?
In-Memory Join
• Basic idea: load one dataset into memory, stream over other dataset
– Works if R << S and R fits into memory
– Called a “hash join” in database terminology
• MapReduce implementation
– Distribute R to all nodes
– Map over S, each mapper loads R in memory, hashed by join key
– For every tuple in S, look up join key in R
– No reducers, unless for regrouping or resorting tuples
In-Memory Join: Variants
• Striped variant:
– R too big to fit into memory?
– Divide R into R1, R2, R3, … s.t. each Rn fits
into memory
– Perform in-memory join: n, Rn ⋈ S
– Take the union of all join results
• Memcached join:
– Load R into memcached
– Replace in-memory hash lookup with
memcached lookup
Memcached Join
• Memcached join:
– Load R into memcached
– Replace in-memory hash lookup with memcached lookup
• Capacity and scalability?
– Memcached capacity >> RAM of individual node
– Memcached scales out with cluster
• Latency?
– Memcached is fast (basically, speed of network)
– Batch requests to amortize latency costs
Source: See tech report by Lin et al. (2009)
Which join to use?
• In-memory join > map-side join > reduceside join
– Why?
• Limitations of each?
– In-memory join: memory
– Map-side join: sort order and partitioning
– Reduce-side join: general purpose
Processing Relational Data:
Summary
• MapReduce algorithms for processing relational data:
– Group by, sorting, partitioning are handled automatically by
shuffle/sort in MapReduce
– Selection, projection, and other computations (e.g., aggregation),
are performed either in mapper or reducer
– Multiple strategies for relational joins
• Complex operations require multiple MapReduce jobs
– Example: top ten URLs in terms of average time spent
– Opportunities for automatic optimization