Hadoop - CUHK CSE

Download Report

Transcript Hadoop - CUHK CSE

Hadoop Framework and Its
Applications
CSCI5570 Large Scale Data Processing Systems
Instructor: James Cheng, CSE, CUHK
Slide Ack.: modified based on the slides from Shumo Chu
Outline


Background
Hadoop Framework
◦ What is Hadoop
◦ Hadoop Distributed File System (HDFS)
◦ MapReduce





Mapper
Reducer
Combiner
Custom Partitioner
Applications of Hadoop
◦ HBase
◦ TF-IDF
◦ SSSP
2
Parallel processing is the trend

Moore’s Law: roughly stated, processing power
doubles every two years
However,
we will
reach the
physical
limitation
soon!

Worse: data grow at a much faster rate!!!
3
Single Node Architecture
Load-Process-Dump:
• Load the data
• Process the data
• Dump the result to persistent storage
CPU
Machine Learning, Statistics
Memory
“Classical” Data Mining
Disk
J. Leskovec, A. Rajaraman, J. Ullman: Mining
of Massive Datasets, http://www.mmds.org
4
Motivation: Google Example
20+ billion web pages x 20KB = 400+ TB
 Load: 1 computer reads 30-35 MB/sec from disk

◦ ~4 months to read the web
Process: huge memory (1 computer with 400 TB
RAM is >1,000 times more expensive than 4,000
computers with 100 GB RAM)
 Dump: ~1,000 hard drives to store the web
 Today, a standard architecture for such
problems is emerging:

◦ Cluster of commodity Linux nodes
◦ Commodity network (ethernet) to connect them
J. Leskovec, A. Rajaraman, J. Ullman: Mining
of Massive Datasets, http://www.mmds.org
5
Cluster Architecture
2-10 Gbps backbone between racks
Switch
1 Gbps between
any pair of nodes
in a rack
Switch
CPU
Mem
Disk
…
Switch
CPU
CPU
Mem
Mem
Disk
Disk
CPU
…
Mem
Disk
Each rack contains 16-64 nodes
J. Leskovec, A. Rajaraman, J. Ullman: Mining
of Massive Datasets, http://www.mmds.org
6
Drawbacks of traditional distributed
framework

Traditional distributed framework
◦ MPI (Message Passing Interface)
◦ PVM (Parallel Virtual Machine)
◦ Condor

Programming is complicated
◦ Data exchange requires synchronization
◦ Difficult to deal with partial system failure
(not uncommon in a large cluster)
7
Why Hadoop?
Reliability: system supports partial
failure
 Scalability: automatically scales to even
thousands of computing nodes
 Programmability: applications are
written in high-level code, developers do
not worry about network programming,
temporal dependencies, etc

8
Outline


Background
Hadoop Framework
◦ What is Hadoop
◦ Hadoop Distributed File System (HDFS)
◦ MapReduce
 Mapper
 Reducer

Applications of Hadoop
◦ HBase
◦ TF-IDF
◦ SSSP
9
What is Hadoop ?
Hadoop is an open-source project by
Apache Software Foundation
 The key concept of Hadoop is based on
papers published by Google in 2003 and
2004 (Google File System and
MapReduce)
 Hadoop is matured and become popular
due to commitment by many
organizations
◦ Google, Yahoo, Facebook, Cloudera, etc

10
Hadoop Components
Hadoop consists of two core components
◦ The Hadoop Distributed File System (HDFS)
◦ MapReduce
 There are many other projects based on
core Hadoop
◦ Referred as Hadoop Ecosystem
◦ Pig, Hive, HBase, Flume, Oozie, Sqoop, etc

11
Hadoop: System Configuration
12
Outline


Background
Hadoop Framework
◦ What is Hadoop
◦ Hadoop Distributed File System (HDFS)
◦ MapReduce





Mapper
Reducer
Combiner
Custom Partitioner
Applications of Hadoop
◦ HBase
◦ TF-IDF
◦ SSSP
13
Hadoop Component: HDFS
HDFS is responsible for storing data in
the cluster
 Data is split into blocks and distributed
across multiple nodes
◦ Each block is typically 64MB or 128MB in size
 Each block is replicated multiple times
◦ Default is to replicate each block 3 times
◦ Replicas are stored on different nodes

 For reliability and availability
14
Hadoop Component: HDFS

NameNode
◦ The centerpiece of an HDFS file system
◦ Keeps the directory tree of all files, and tracks where
in the cluster the data of each file are kept
◦ Does not store the data of these files itself

NameNode is a single point of failure,
thus secondary NameNode
15
How files are stored: Example


NameNode holds
metadata
DataNodes hold the
actual data blocks
16
How is HDFS working?

When a client application wants to read a file:
◦ It communicates with the NameNode to
determine which blocks make up the file, and
which DataNodes those blocks reside on
◦ It then communicates directly with the
DataNodes to read the data
17
Outline


Background
Hadoop Framework
◦ What is Hadoop
◦ Hadoop Distributed File System (HDFS)
◦ MapReduce





Mapper
Reducer
Combiner
Custom Partitioner
Applications of Hadoop
◦ HBase
◦ TF-IDF
◦ SSSP
18
Hadoop component: MapReduce


MapReduce is a method for distributing a task
across multiple nodes
Each node processes data stored on that node
(for locality)
◦ Where possible

Consists of two phases:
◦ Map
◦ Reduce

Features
◦ Automatic parallelization
◦ Fault-tolerance
◦ A clean abstraction for programmers (away from
other tedious and complicated works for
distributed/parallel computing)
19
MapReduce: The Big Picture
20
MapReduce: The Mapper


The mapper reads data in the form of key/value
pairs
It outputs zero or more key/value pairs
21
MapReduce: The Mapper (cont’d)

The mapper may use or completely
ignore the input key
◦ For example, a standard pattern is to read a line of a file at a
time
 The key is the byte offset into the file at which line starts
 The value is the contents of the line itself
 Typically the key is considered irrelevant

The output must be in the form of
key/value pairs
22
Example Mapper: Upper Case
Mapper

Turn input into upper case:
23
Example Mapper: Filter Mapper

Only output key/value pairs where the input
value is a prime number:
24
MapReduce: The Reducer


After the map phase is over, all the intermediate
values for a given intermediate key are
combined together into a list
The list is sent to a reducer
◦ There may be a single reducer, or multiple reducers
◦ The intermediate keys and their value lists are sent to reducers
◦ All values associated with a particular intermediate key are
guaranteed to go to the the same reducer
◦ This step is known as the “shuffle” step (done by the system, e.g.,
Hadoop, automatically, by external sorting)

Each reducer computes on the intermediate
keys and their value lists
◦ Results are written to HDFS
25
Example Reducer: Sum Reducer

Sum up all the values associated with each
intermediate key
26
Example Reducer: Identity Reducer

The Identity Reducer is very common:
27
MapReduce Execution
28
MapReduce: Data Localization



Whenever possible, Hadoop will attempt to
ensure that a map task on a node is working on a
block of data stored locally on the node via
HDFS
If this is not possible, the map task will have to
transfer the data across the network as it
processes that data
Once the map task has finished, the data is then
transferred across the network to reducers
◦ Although a reducer may run on the same physical machines as
the map tasks, there is no concept of data locality for reducers
 All mappers will, in general, have to communicate with all
reducers
29
MapReduce: Is the shuffle step a
bottleneck
It appears that the shuffle phase is a
bottleneck
◦ No reducers can start until all mappers have
finished?
 In practice, Hadoop will start to transfer
data from mappers to reducers as the
mappers finish work
◦ This mitigates against transferring a huge
amount of data that starts only after the last
mapper finishes

30
MapReduce: Is a slow mapper a
bottleneck?

It is possible for one map task to run more slowly
than others
◦ Perhaps due to faulty hardware, or just a very slow machine

It would appear that this would create a bottleneck
◦ No reducers can start until every mapper has finished

Hadoop uses speculative execution to mitigate against
this
◦ If a mapper appears to be running significantly more slowly than
others, a new instance of the mapper will be started on another
machine, operating on the same data
◦ The result of the first mapper to finish will be used
◦ Hadoop will kill off the mapper which is still running

Similarly for a slow reducer
31
MapReduce: The Combiner

Often, mappers produce large amount of
intermediate data
◦ That data must be passed to reducers
◦ This can result in a lot of network traffic

It is often possible to specify a combiner
◦ Like a “mini-reduce”
◦ Runs locally on a single mapper’s output
◦ Output from the combiner is sent to reducers

Combiner and reducer codes are often
identical
◦ Technically, this is possible if the operation
performed is commutative and associative
32
MapReduce Example: Word Count

Count the number of occurrences of each
word in a large amount of input data
◦ This is the “hello world” of MapReduce programming
33
MapReduce Example: Word Count
(cont’d)

Input to Mappers

Output from Mappers
34
MapReduce Example: Word Count
(cont’d)

Intermediate data sent to Reducers

Final Output from Reducers:
35
Word Count with Combiner

Combiners would reduce the amount of data sent to
reducers
◦ Intermediate data sent to reducers after a combiner using the
same code as a reducer

Combiners decrease the amount of network traffic
required during the shuffle phase
◦ Often also decrease the amount of work needed to be done by
reducers
36
MapReduce: Custom Partitioners
Sometimes you will need to write your
own partitioner
 Number of partitions = number of
reducers
 Example:

◦ You may want that all keys with value in a range to go
to the same reducer
◦ The default partitioner is not sufficient in this case
◦ Write your own partitioner like:
job.setPartitionerClass(MyPartitioner.class);
37
Custom Partitioners (cont’d)
Custom partitioners are needed when
performing a secondary sort
 Custom partitioners are also useful to
avoid potential performance issues

◦ To avoid one reducer having to deal with
many large lists of values
38
Outline


Background
Hadoop Framework
◦ What is Hadoop
◦ Hadoop Distributed File System (HDFS)
◦ MapReduce





Mapper
Reducer
Combiner
Custom Partitioner
Applications of Hadoop
◦ HBase
◦ TF-IDF
◦ SSSP
39
HBase vs RDBMS
40
HBase: Fast Single-Element Acess

Rapid access to a single (row, column)
element
41
HBase Data as Input to MapReduce
Jobs

Rows from an HBase table can be used as input
to a MapReduce job
◦ Each row is treated as a single record
◦ MapReduce jobs can sort/search/index/query data in bulk
42
Data Mining – TF-IDF

Term Frequency – Inverse Document
Frequency (TF-IDF)
◦ Answers the question “How important is this term in
a document”

Known as a term weighting function
◦ Assigns a score (weight) to each term (word) in a
document
Very commonly used in text processing
and search
 Has many applications in data mining

43
TF-IDF: Motivation

Merely counting the number of
occurrences of a word in a document is not
a good enough measure of its relevance
◦ If the word appears in many other documents, it is
probably less relevance
◦ Some words appear too frequently in all documents
to be relevant
◦ Known as ‘stopwords’

TF-IDF considers both the frequency of a
word in a given document and the number
of documents which contain the word
44
TF-IDF: Definition

Term Frequency (TF)
◦ Number of times a term appears in a
document (i.e., the count)

Inverse Document Frequency (IDF)
◦ N: total number of documents
◦ n: number of documents that contain a term

TD-IDF
◦ TF × IDF
45
Computing TF-IDF With MapReduce

Overview of algorithm: 3 MapReduce
jobs
◦ Job 1: compute term frequencies
◦ Job 2: compute number of documents each word
appears in
◦ Job 3: compute TD-IDF

Notations:
◦
◦
◦
◦
tf = term frequency
n = number of documents a term appears in
N = total number of documents
docid = a unique id for each document
46
Computing TF-IDF: Job 1 – Compute tf

Mapper
◦ Input: (docid, contents)
◦ For each term in the document, generate a (term, docid)
pair
 i.e., we have seen this term in this document once
◦ Output: ((term, docid), 1)

Reducer
◦ Sum counts for word in document
◦ Outputs ((term, docid), tf)
 i.e., the term frequency of term in docid is tf
 We can add a combiner, which will use the
same code as a reducer
47
Computing TF-IDF: Job 2 – Compute n

Mapper
◦ Input: ((term, docid), tf)
◦ Output: (term, (docid, tf, 1))

Reducer
◦ Sum ‘1’s to compute n (number of documents
containing term)
◦ Note: need to buffer (docid, tf) pairs while we
are doing this (more later)
◦ For each (docid, tf) pair:
 Outputs ((term, docid), (tf, n))
48
Computing TF-IDF: Job 3 – Compute TF-IDF

Mapper
◦ Input: ((term, docid), (tf, n))
◦ Assume N is known (easy to find)
◦ Output ((term, docid), TF × IDF)

Reducer
◦ The identity function
49
Computing TF-IDF: Working At Scale

For Job 2, we need to buffer (docid, tf)
pairs while summing ‘1’s (to compute n)
◦ Potential problem: pairs may not fit in memory!
◦ How many documents does the word “the”
appear in?

Possible solutions
◦ Ignore very-high-frequency words
◦ Write out intermediate data to a file
◦ Use another MapReduce pass
50
TF-IDF: Final Thoughts

Several small jobs add up to full
algorithm
◦ Thinking in MapReduce often means
decomposing a complex algorithm into a
sequence of smaller jobs

Beware of memory usage for large
amounts of data!
◦ Any time when you need to buffer data,
there’s a potential scalability bottleneck
51
Graph Algorithm: SSSP
Graph usually represented as
adjacency lists
 Serial algorithm: Dijkstra’s
Algorithm

◦ Not suitable for parallelization

MapReduce algorithm: parallel
breadth-first search
52
Parallel Breadth-First Search

The algorithm, intuitively:
◦ Distance from the source to itself = 0
◦ For all neighbors of the source: distance = 1
◦ For all nodes that are neighbors of some
node v in the graph, distance from the source:
= 1 + min(distance from the source to v)
53
Parallel Breadth-First Search: Algorithm

Mapper:
◦ Input key is a node id
◦ Input value is (d, adjacency list), where d is the
distance from source
◦ Processing: for each node in the adjacency list,
emit (node id, d +1)
 If the distance to this node is d, then the distance to any
of its neighbors is d + 1

Reducer:
◦ Receive a node id and a list of distance values
◦ Processing: selects the smallest distance value for
that node
54
PBFS: Pseudo-Code
55
Iterations of PBFS

A MapReduce job corresponds to one iteration of
parallel breadth-first search
◦ Each iteration advances the ‘known frontier’ by one hop
◦ Iteration is accomplished by using the output from one job as
the input to the next

How many iterations are needed?
◦ Multiple iterations are needed to explore the entire graph
 As many as the diameter of the graph
◦ Graph diameters are surprisingly small, even for large graphs
 ‘Six degrees of separation’

Controlling iterations in Hadoop
◦ Use counters; when you reach a node, ‘count’ it
◦ At the end of each iteration, check the counters
◦ When you’ve reached all the nodes, you finish
56