Transcript pptx
Overview of Spark project
Presented by Yin Zhu ([email protected])
Materials from
•
http://spark-project.org/documentation/
•
Hadoop in Practice by A. Holmes
•
My demo code: https://bitbucket.org/blackswift/spark-example
25 March 2013
Outline
Review of MapReduce (15’)
Going Through Spark (NSDI’12) Slides (30’)
Demo (15’)
Review of MapReduce
Pagerank implemented in Scala and Hadoop
Why Pagerank?
>More complicated than the “hello world” WordCount
>Widely used in search engines, very large scale input (the
whole web graph!)
>Iterative algorithm (typical style for most numerical
algorithms for data mining)
New score: 0.15 + 0.5*0.85 = 0.575
A functional implementation
def pagerank(links: Array[(UrlId, Array[UrlId])], numIters: Int):Map[UrlId, Double] = {
val n = links.size
var ranks = (Array.fromFunction(i => (i, 1.0)) (n)).toMap // init: each node has rank 1.0
Map
Reduce
for (iter <- 1 to numIters) { // Take some interactions
val contrib =
links
.flatMap(node => {
val out_url = node._1
val in_urls = node._2
val score = ranks(out_url) / in_urls.size // the score each outer link recieves
in_urls.map(in_url => (in_url, score) )
}
)
.groupBy(url_score => url_score._1)
// group the (url, score) pairs by url
.map(url_scoregroup =>
// sum the score for each unique url
(url_scoregroup._1, url_scoregroup._2.foldLeft (0.0) ((sum,url_score) => sum+url_score._2)))
ranks = ranks.map(url_score => (url_score._1, if (contrib.contains(url_score._1)) 0.85 *
contrib(url_score._1) + 0.15 else url_score._2))
}
ranks
}
Hadoop implementation
https://github.com/alexholmes/hadoopbook/tree/master/src/main/java/com/manning/h
ip/ch7/pagerank
Hadoop/MapReduce implementation
Fault tolerance
the result after each iteration is saved to Disk
Speed/Disk IO
disk IO is proportional to the # of iterations
ideally the link graph should be loaded for only once
Resilient Distributed Datasets
A Fault-Tolerant Abstraction for
In-Memory Cluster Computing
Matei Zaharia, Mosharaf Chowdhury, Tathagata Das,
Ankur Dave, Justin Ma, Murphy McCauley,
Michael Franklin, Scott Shenker, Ion Stoica
UC Berkeley
UC BERKELEY
Motivation
MapReduce greatly simplified “big data” analysis
on large, unreliable clusters
But as soon as it got popular, users wanted more:
»More complex, multi-stage applications
(e.g. iterative machine learning & graph processing)
»More interactive ad-hoc queries
Response: specialized frameworks for some of
these apps (e.g. Pregel for graph processing)
Motivation
Complex apps and interactive queries both need
one thing that MapReduce lacks:
Efficient primitives for data sharing
In MapReduce, the only way to share data
across jobs is stable storage slow!
Examples
HDFS
read
HDFS
write
HDFS
read
iter. 1
HDFS
write
. . .
iter. 2
Input
HDFS
read
Input
query 1
result 1
query 2
result 2
query 3
result 3
. . .
Slow due to replication and disk I/O,
but necessary for fault tolerance
Goal: In-Memory Data Sharing
iter. 1
iter. 2
. . .
Input
query 1
one-time
processing
Input
query 2
query 3
. . .
10-100× faster than network/disk, but how to get FT?
Challenge
How to design a distributed memory abstraction
that is both fault-tolerant and efficient?
Solution: Resilient Distributed
Datasets (RDDs)
Restricted form of distributed shared memory
» Immutable, partitioned collections of records
» Can only be built through coarse-grained
deterministic transformations (map, filter, join, …)
Efficient fault recovery using lineage
» Log one operation to apply to many elements
» Recompute lost partitions on failure
» No cost if nothing fails
Three core concepts of RDD
Transformations
define a new RDD from an existing RDD
Cache and Partitioner
Put the dataset into memory/other persist media, and
specify the locations of the sub datasets
Actions
carry out the actual computation
RDD and its lazy transformations
RDD[T]: A sequence of objects of type T
Transformations are lazy:
https://github.com/mesos/spark/blob/master/core/src/main/scala/spark/PairRDDFunctions.scala
/**
* Return an RDD of grouped items.
*/
def
groupBy[K:
cleanF
val
=
ClassManifest
](f: T => K, p: Partitioner):
[(K, Seq[T])] = {
RDD
sc.clean(f)
.map(t => (cleanF(t), t)).groupByKey(p)
this
}
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def
groupByKey(partitioner:
def
createCombiner(v:
def
mergeValue(buf:
)
[(K, Seq[V])] = {
Partitioner : RDD
) = ArrayBuffer(v)
V
[ ],
ArrayBuffer V
) = buf += v
V
[ ], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
def
}
mergeCombiners(b1:
v:
ArrayBuffer V
Actions: carry out the actual computation
https://github.com/mesos/spark/blob/master/core/src/main/scala/spark/SparkContext.scala
runJob
Example
lines = spark.textFile(“hdfs://...”)
errors = lines.filter(_.startsWith(“ERROR”))
messages = errors.map(_.split(‘\t’)(2))
messages.persist()
or .cache()
messages.filter(_.contains(“foo”)).count
messages.filter(_.contains(“bar”)).count
Task Scheduler for actions
Dryad-like DAGs
Pipelines functions
within a stage
Locality & data
reuse aware
Partitioning-aware
to avoid shuffles
B:
A:
G:
Stage 1
C:
groupBy
D:
F:
map
E:
Stage 2
join
union
= cached data partition
Stage 3
RDD Recovery
iter. 1
iter. 2
Input
query 1
one-time
processing
Input
query 2
query 3
. . .
. . .
Fault Recovery
RDDs track the graph of transformations that
built them (their lineage) to rebuild lost data
E.g.:
messages = textFile(...).filter(_.contains(“error”))
.map(_.split(‘\t’)(2))
HadoopRDD
HadoopRDD
FilteredRDD
FilteredRDD
path = hdfs://…
func = _.contains(...)
MappedRDD
MappedRDD
func = _.split(…)
Iteratrion time (s)
Fault Recovery Results
140
120
100
80
60
40
20
0
Failure happens
119
81
1
57
56
58
58
57
59
57
59
2
3
4
5
6
Iteration
7
8
9
10
Generality of RDDs
Despite their restrictions, RDDs can express
surprisingly many parallel algorithms
» These naturally apply the same operation to many items
Unify many current programming models
» Data flow models: MapReduce, Dryad, SQL, …
» Specialized models for iterative apps: BSP (Pregel),
iterative MapReduce (Haloop), bulk incremental, …
Support new apps that these models don’t
Tradeoff Space
Network
bandwidth
Fine
Granularity
of Updates
Memory
bandwidth
Best for
transactional
workloads
K-V
stores,
databases,
RAMCloud
Best for batch
workloads
HDFS
RDDs
Coarse
Low
Write Throughput
High
Outline
Spark programming interface
Implementation
Demo
How people are using Spark
Spark Programming Interface
DryadLINQ-like API in the Scala language
Usable interactively from Scala interpreter
Provides:
» Resilient distributed datasets (RDDs)
» Operations on RDDs: transformations (build new RDDs),
actions (compute and output results)
» Control of each RDD’s partitioning (layout across nodes)
and persistence (storage in RAM, on disk, etc)
Example: Log Mining
Load error messages from a log into memory, then
interactively search for various patterns
lines = spark.textFile(“hdfs://...”)
BaseTransformed
RDD
RDD
results
errors = lines.filter(_.startsWith(“ERROR”))
messages = errors.map(_.split(‘\t’)(2))
messages.persist()
tasks
Master
Msgs. 1
Worker
Block 1
Action
messages.filter(_.contains(“foo”)).count
Msgs. 2
messages.filter(_.contains(“bar”)).count
Worker
Msgs. 3
Result: scaled
full-text
tosearch
1 TB data
of Wikipedia
in 5-7 sec
in <1(vs
sec170
(vssec
20 for
secon-disk
for on-disk
data)data)
Worker
Block 3
Block 2
Example: PageRank
1. Start each page with a rank of 1
2. On each iteration, update each page’s rank to
Σi∈neighbors ranki / |neighborsi|
links = // RDD of (url, neighbors) pairs
ranks = // RDD of (url, rank) pairs
for (i <- 1 to ITERATIONS) {
ranks = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}.reduceByKey(_ + _)
}
Optimizing Placement
Links
Ranks0
(url, neighbors)
(url, rank)
join
Contribs0
reduce
Ranks1
join
Contribs2
reduce
Ranks2
. . .
links
& ranks repeatedly joined
Can co-partition them (e.g. hash
both on URL) to avoid shuffles
Can also use app knowledge,
e.g., hash on DNS name
links = links.partitionBy(
new URLPartitioner())
Time per iteration (s)
PageRank Performance
200
171
Hadoop
150
100
50
0
Basic Spark
72
23
Spark + Controlled
Partitioning
Implementation
Runs on Mesos [NSDI 11]
to share clusters w/ Hadoop
Can read from any Hadoop
input source (HDFS, S3, …)
Spark
Hadoop
…
Mesos
Node
Node
Node
No changes to Scala language or compiler
» Reflection + bytecode analysis to correctly ship code
www.spark-project.org
MPI
Node
Behavior with Insufficient RAM
11.5
40
29.7
40.7
60
58.1
80
68.8
Iteration time (s)
100
20
0
0%
25%
50%
75%
Percent of working set in memory
100%
Scalability
87
106
121
150
157
200
143
250
Hadoop
HadoopBinMem
Spark
197
100
33
61
Iteration time (s)
300
3
50
6
15
50
80
100
76
62
116
184
Iteration time (s)
150
111
Hadoop
HadoopBinMem
Spark
250
200
K-Means
274
Logistic Regression
0
25
50
100
Number of machines
0
25
50
Number of machines
100
Breaking Down the Speedup
13.1
Binary Input
5
2.9
2.9
6.9
10
8.4
15
Text Input
15.4
Iteration time (s)
20
0
In-mem HDFS In-mem local file
Spark RDD
Spark Operations
Transformations
(define a new RDD)
Actions
(return a result to
driver program)
map
filter
sample
groupByKey
reduceByKey
sortByKey
flatMap
union
join
cogroup
cross
mapValues
collect
reduce
count
save
lookupKey
Demo
Open Source Community
15 contributors, 5+ companies using Spark,
3+ applications projects at Berkeley
User applications:
» Data mining 40x faster than Hadoop (Conviva)
» Exploratory log analysis (Foursquare)
» Traffic prediction via EM (Mobile Millennium)
» Twitter spam classification (Monarch)
» DNA sequence analysis (SNAP)
». . .
Related Work
RAMCloud, Piccolo, GraphLab, parallel DBs
» Fine-grained writes requiring replication for resilience
Pregel, iterative MapReduce
» Specialized models; can’t run arbitrary / ad-hoc queries
DryadLINQ, FlumeJava
» Language-integrated “distributed dataset” API, but cannot
share datasets efficiently across queries
Nectar [OSDI 10]
» Automatic expression caching, but over distributed FS
PacMan [NSDI 12]
» Memory cache for HDFS, but writes still go to network/disk
Conclusion
RDDs offer a simple and efficient programming
model for a broad range of applications
Leverage the coarse-grained nature of many
parallel algorithms for low-overhead recovery
Try it out at www.spark-project.org