Transcript CCGrid10_WJ
A Map-Reduce System with an Alternate API
for Multi-Core Environments
Wei Jiang, Vignesh T. Ravi and Gagan Agrawal
Outline
Introduction
MapReduce
Generalized Reduction
System Design and Implementation
Experiments
Related Work
Conclusion
2
April 7, 2016
Motivation
Growing need for analysis of large scale
data
Scientific
Commercial
Data-intensive Supercomputing (DISC)
Map-Reduce has received a lot of attention
Database and Datamining communities
High performance computing community
3
E.g. this conference !!
April 7, 2016
Map-Reduce: Positives and Questions
Positives:
Simple API
Functional language based
Very easy to learn
Support for fault-tolerance
Important for very large-scale clusters
Questions
Performance?
4
Comparison with other approaches
Suitability for different class of applications?
April 7, 2016
Class of Data-Intensive Applications
Many different types of applications
Data-center kind of applications
More ``compute-intensive`` data-intensive applications
Data scans, sorting, indexing
Machine learning, data mining, NLP
Map-reduce / Hadoop being widely used for this class
Standard Database Operations
Sigmod 2009 paper compares Hadoop with Databases and OLAP
systems
What is Map-reduce suitable for?
What are the alternatives?
5
MPI/OpenMP/Pthreads – too low level?
April 7, 2016
This Paper
Proposes MATE (a Map-Reduce system with an AlternaTE
API) based on Generalized Reduction
Phoenix implemented Map-Reduce in shared-memory systems
MATE adopted Generalized Reduction, first proposed in
FREERIDE that was developed at Ohio State 2001-2003
Observed API similarities and subtle differences between
MapReduce and Generalized Reduction
Comparison for
Data Mining Applications
Compare performance and API
Understand performance overheads
Will an alternative API be better for ``Map-Reduce``?
6
April 7, 2016
Map-Reduce Execution
7
April 7, 2016
Phoenix Implementation
It is based on the same principles of MapReduce
Consists of a simple API that is visible to application
programmers
Users define functions like splitter, map, reduce, and etc..
An efficient runtime that handles low-level details
8
But targets shared-memory systems
Parallelization
Resource management
Fault detection and recovery
April 7, 2016
Phoenix Runtime
9
April 7, 2016
Comparing Processing Structures
• Reduction Object represents the intermediate state of the execution
• Reduce func. is commutative and associative
• Sorting, grouping.. overheads are eliminated with red. func/obj.
10
April 7, 2016
Observations on Processing Structure
Map-Reduce is based on functional idea
Do not maintain state
This can lead to overheads of managing intermediate
results between map and reduce
Map could generate intermediate results of very large size
MATE API is based on a programmer managed reduction
object
11
Not as ‘clean’
But, avoids sorting of intermediate results
Can also help shared memory parallelization
Helps better fault-recovery
April 7, 2016
An Example
12
Apriori pseudo-code using MATE
April 7, 2016
Example – Now with Phoenix
13
Apriori pseudo-code using MapReduce
April 7, 2016
System Design and Implementation
Basic dataflow of MATE (MapReduce with AlternaTE
API) in Full Replication scheme
Data structures in MATE scheduler
Three sets of functions in MATE
14
Used to communicate between the user code and the
runtime
Internally used functions
MATE API provided by the runtime
MATE API to be defined by the users
Implementation considerations
April 7, 2016
MATE Runtime Dataflow
15
Basic one-stage dataflow (Full Replication
scheme)
April 7, 2016
Data Structures-(I)
scheduler_args_t: Basic fields
Field
16
Description
Input_data
Input data pointer
Data_size
Input dataset size
Data_type
Input data type
Stage_num
Computation-Stage number (Iteration number)
Splitter
Pointer to Splitter function
Reduction
Pointer to Reduction function
Finalize
Pointer to Finalize function
April 7, 2016
Data Structures-(II)
scheduler_args_t: Optional fields for
performance tuning
Field
17
Description
Unit_size
# of bytes for one element
L1_cache_size
# of bytes for L1 data cache size
Model
Shared-memory parallelization model
Num_reduction_workers
Max # of threads for reduction
workers(threads)
Num_procs
Max # of processor cores used
April 7, 2016
Functions-(I)
18
Transparent to users (R-required; O-optional)
Function Description
R/O
static inline void * schedule_tasks(thread_wrapper_arg_t *)
R
static void * combination_worker(void *)
R
static int array_splitter(void *, int, reduction_args_t *)
R
void clone_reduction_object(int num)
R
static inline int isCpuAvailable(unsigned long, int)
R
April 7, 2016
Functions-(II)
19
APIs provided by the runtime
Function Description
R/O
int mate_init(scheudler_args_t * args)
R
int mate_scheduler(void * args)
R
int mate_finalize(void * args)
O
void reduction_object_pre_init()
R
int reduction_object_alloc(int size)—return the object id
R
void reduction_object_post_init()
R
void accumulate(int id, int offset, void * value)
O
void reuse_reduction_object()
O
void * get_intermediate_result(int iter, int id, int offset)
O
April 7, 2016
Functions-(III)
20
APIs defined by the user
Function Description
R/O
int (*splitter_t)(void *, int, reduction_args_t *)
O
void (*reduction_t)(reduction_args_t *)
R
void (*combination_t)(void*)
O
void (*finalize_t)(void *)
O
April 7, 2016
Implementation Considerations
Focus on the API differences in programming
models
Data partitioning: dynamically assigns splits
to worker threads, same as Phoenix
Buffer management: two temporary buffers
one for reduction objects
the other for combination results
Fault tolerance: re-executes failed tasks
21
Checking-point may be a better solution since
reduction object maintains the computation state
April 7, 2016
Experiments Design
For comparison, we used three
applications
Experiments on two multi-core
platforms
22
Data Mining: KMeans, PCA, Apriori
Also evaluated the single-node
performance of Hadoop on KMeans and
Apriori
8 cores on one WCI node (intel cpu)
16 cores on one AMD node (amd cpu)
April 7, 2016
Results: Data Mining (I)
K-Means: 400MB dataset, 3-dim points, k =
100 on one WCI node with 8 cores
Avg. Time Per Iteration
(sec)
120
100
80
Phoenix
MATE
Hadoop
60
40
20
0
2.0 speedup
1
2
4
8
# of threads
23
April 7, 2016
Results: Data Mining (II)
K-means: 400MB dataset, 3-dim points, k =
100 on one AMD node with 16 cores
Avg. Time Per Iteration
(sec)
140
120
100
80
Phoenix
MATE
Hadoop
60
40
20
0
3.0 speedup
1
2
4
8
16
# of threads
24
April 7, 2016
Results: Data Mining (III)
PCA: 8000 * 1024 matrix, on one WCI node
with 8 cores
300
Total Time (sec)
250
200
Phoenix
MATE
150
100
50
0
2.0 speedup
1
2
4
8
# of threads
25
April 7, 2016
Results: Data Mining (IV)
PCA: 8000 * 1024 matrix, on one AMD node
with 16 cores
Total Time (sec)
450
400
350
300
250
200
Phoenix
MATE
150
100
50
0
2.0 speedup
1
2
4
8
16
# of threads
26
April 7, 2016
Results: Data Mining (V)
Apriori: 1,000,000 transactions, 3% support,
on one WCI node with 8 cores
Avg. Time Per Iteration
(sec)
140
120
100
80
Phoenix
MATE
Hadoop
60
40
20
0
1
2
4
8
# of threads
27
April 7, 2016
Results: Data Mining (VI)
Apriori: 1,000,000 transactions, 3% support,
on one AMD node with 16 cores
Avg. Time Per Iteration
(sec)
160
140
120
100
Phoenix
MATE
Hadoop
80
60
40
20
0
1
2
4
8
16
# of threads
28
April 7, 2016
Observations
MATE system has between reasonable to significant
speedups for all the three datamining applications.
In most of the cases, it outperforms Phoenix and
Hadoop significantly, while slightly better in only one
case.
Reduction object can help to reduce the memory
overhead associated with managing the large set of
intermediate results between map and reduce
29
April 7, 2016
Related Work
Lots of work on improving and generalizing
MapReduce’s API…
Industry: Dryad/DryadLINQ from Microsoft,
Sawzall from Google, Pig/Map-Reduce-Merge from
Yahoo!
Academia: CGL-MapReduce, Mars, MITHRA,
Phoenix, Disco, Hive…
Address MapReduce limitations
30
One input, two-stage data flow is extremely rigid
Only two high-level primitives
April 7, 2016
Conclusions
31
MapReduce is simple and robust in expressing
parallelism
Two-stage computation style may cause
performance losses for some subclasses of
applications in data-intensive computing
MATE provides an alternate API that is based on
generalized reduction
This variation can reduce overheads of data
management and communication between Map and
Reduce
April 7, 2016