External storage needed for data reuse cross computations

Download Report

Transcript External storage needed for data reuse cross computations

Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave,
Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica
Presented by Peifeng Yu
EECS 582 – F16
RESILIENT DISTRIBUTED DATASETS:
A FAULT-TOLERANT ABSTRACTION FOR
IN-MEMORY CLUSTER COMPUTING
TABLE OF CONTENTS
▪ RDD in a nutshell
▪ Spark the implementation
▪ Evaluation
▪ What’s new in Spark
EECS 582 – F16
▪ Background
Yet Another Cluster Computing Frameworks?
RELATED WORKS
PROBLEM
• General purpose • External storage needed for data
• MapReduce, Dryad
• Specialized
reuse cross computations
•
Iterative algorithm
Ad-hoc query
• Pregel: iterative graph•compute
• Don’t steps
generalize
• HaLoop: loop of MapReduce
• Hard to implement efficient fault
• In-memory Storage
tolerance
• Distributed shared memory
• Key Value Storage / Databases
• Piccolo
RDD IN A NUTSHELL
▪ General purpose distributed memory abstraction
▪ In-memory
▪ Immutable
▪ Can only be created through deterministic operations
(Transformations)
▪ Atomic piece of data: partition
▪ Fault-tolerant
EECS 582 – F16
▪ Resilient Distributed Dataset
RDD - OPERATIONS
▪ Transformations
EECS 582 – F16
▪ map, filter, union, join, etc.
▪ Actions
▪ count, collect, reduce, lookup, save
External
Source
Transformation
Transformation
RDD1
Action
RDD2
External
Result
• Checkpoint if the lineage is too
long
RDD - FAULT TOLERANCE
EECS 582 – F16
• Store actual data => Lineage: how
the partitions were derived from
other datasets
RDD - PERSISTENCE & PARTITIONING
▪ In-memory
▪ Disk backed
▪ Replica
▪ Partitioning
▪ Default hashing
▪ User defined
Advantages
- In case when re-computation is more costly than storage IO
- Help improve data locality
EECS 582 – F16
▪ Persistence Level
Aspect
RDDs
Distributed Shared Memory
Reads
Coarse- or fine-grained
Fine-grained
Writes
Coarse-grained
Fine-grained
Consistency
Trivial (immutable)
Up to app / runtime
Fault recovery
Fine-grained and lowoverhead using lineage
Requires checkpoints and program
rollback
Straggler mitigation
Possible using backup tasks
Difficult
Work placement
Automatic based on data
locality
Up to app (runtimes aim for
transparency)
Behavior if no enough RAM
Similar to existing data flow
systems
Poor performance (swapping)
EECS 582 – F16
RDD VS. DSM
EECS 582 – F16
SPARK THE IMPLEMENTATION
SPARK THE IMPLEMENTATION
▪ Memory manager
▪ Interactive interpreter (shipping closure around)
▪ Not a cluster manager
▪ Mesos
▪ YARN
▪ Standalone (added later)
EECS 582 – F16
▪ Job scheduler
SPARK THE SCHEDULER
▪ Build a DAG of stages to execute based on the RDD’s lineage
▪ Transformation are lazily computed
▪ Factors
▪ Data locality
▪ Pipeline
▪ Worker fault tolerance
EECS 582 – F16
▪ Mapping high-level logical representation to low-level tasks
• Narrow
• Pipeline execution
• Partition-wise
• Easy recover
• Wide
• All parents must be
present to compute
any partition
• Full re-computation
needed for recovering
EECS 582 – F16
REVISIT RDD DEPENDENCIES
• Stage: pipelined op with
narrow dependencies
• Boundaries
• shuffle operations
required by wide
dependencies
• Already computed
partitions
EECS 582 – F16
SPARK THE SCHEDULER
SPARK THE SCHEDULER
▪ Re-run on another node in case a task fails
▪ Resubmit tasks for missing partitions in parallel
▪ Only worker failures are tolerated
▪ Scheduler (master) failure can be recovered by using additional service like
zookeeper or simple local filesystem based checkpoint
▪ Optimization for long lineage: checkpointing
▪ Leave to the user to decide which RDD to checkpoint
EECS 582 – F16
Fault tolerance
SPARK THE MEMORY MANAGER
▪ In-memory vs. on-disk
▪ Deserialized vs. serialized
▪ Single copy vs. replica
▪ Insufficient Memory
▪ LRU (skipping the RDD currently operating on)
▪ User defined priority
EECS 582 – F16
▪ Options for storage of persistent RDDs
EVALUATION
▪ Limited memory
▪ Interactive data mining
▪ Others, refer to paper for details
EECS 582 – F16
▪ Iterative machine learning
EVALUATION
▪ Spark : first vs. later
iterations
▪ Hadoop, Spark: first
iteration
▪ HadoopBinMem, Spark:
later iteration
EECS 582 – F16
Iterative machine learning
EVALUATION
EECS 582 – F16
Limited memory
EVALUATION
EECS 582 – F16
Interactive data mining
TAKE AWAY
▪ Immutable in-memory data
partitions
▪ Fault tolerance using lineage, with
optional checkpoint
▪ Lazily computed until user
requested
▪ Limited operation, but still quite
expressive
▪ Spark
▪ Schedule computation task
▪ Move data and code around in cluster
▪ Interactive interpreter
EECS 582 – F16
▪ RDD
WHAT’S NEW IN SPARK
▪ Java, Scala, Python, R
▪ Libraries built on top of Spark
▪ Spark SQL: working with structured data, mix SQL queries with Spark programs
▪ Spark Streaming: build scalable fault-tolerant streaming application
▪ MLlib: scalable machine learning library
▪ GraphX: API for graphs and graph-parallel computation
▪ SparkNet: distributed neural networks for Spark. Paper
EECS 582 – F16
▪ Language bindings
EECS 582 – F16
CODE EXAMPLE
EXAMPLE - INVERTED INDEX
"""InvertedIndex.py""”
from pyspark import SparkContext
sc = SparkContext("local", ”Inverted Index")
docFile = ”path/to/input/file" # Should be some file on your system
# each record is <docId, docContent>
docData = sc.textFile(docFile)
# split words, of type <word, docId>
docWords = docData.flatMap(lambda k, v: [(wd, docId) for wd in v.split()])
# sort and then group by key, invIndex is of type <word, list<docId> >
invIndex = docWords.sortByKey().groupByKey()
# persist
invIndex.save(‘path/to/output/file’)
EECS 582 – F16
▪ Spark version (python)
EXAMPLE - INVERTED INDEX
map(String key, String value):
// key: document id
// value: document contents
for each word w in value:
EmitIntermediate(w, key);
reduce(String key, Iterator values):
// key: a word
// values: a list of document ids
sort(values)
Emit(key, values)
EECS 582 – F16
▪ MapReduce version (pseudo code)
▪ What if we want to do streaming data analytic, what’s the best way
given a batch processing system like Spark?
▪ Optimal partition function? Application specific?
EECS 582 – F16
WHAT IF