Spark Streaming

Download Report

Transcript Spark Streaming

Spark, Shark and Spark
Streaming Introduction
Part2
Tushar Kale
[email protected]
June 2015
This Talk
 Introduction to Shark, Spark and Spark Streaming
 Architecture
 Deployment Methodology
 Performance
 References
Data Processing Stack
Data Processing Layer
Resource Management Layer
Storage Layer
Hadoop Stack
Hive
Pig
Data Processing Layer HBase
… Storm
Hadoop MR
Yarn
ResourceHadoop
Management
Layer
HDFS, S3,
…
Storage
Layer
BDAS Stack
BlinkDB
MLBase
Spark
GraphX
Streaming
Data
Processing
LayerMLlib
Shark
SQL
Spark
Mesos
Resource Management
Layer
Tachyon
Storage Layer
HDFS, S3, …
How do BDAS & Hadoop fit together?
MLbase
BlinkDB
MLBase
Spark
Graph
Spark
Hive Pig
GraphX
Streaming Shark
ML
X
Streaming Shark SQL
HBase Storm
MLlib
SQL
library
BlinkDB
Spark
Mesos
Spark Hadoop MR
Hadoop Yarn
Tachyon
HDFS, S3, …
Mesos
Spark
Streaming
BlinkDB
Shark
GraphX
Spark
Apache Mesos
Mesos
Tachyon
HDFS, S3, …
 Enable multiple frameworks to share same cluster resources
(e.g., Hadoop, Storm, Spark)
 Twitter’s large scale deployment
- 6,000+ servers,
- 500+ engineers running jobs on Mesos
 Third party Mesos schedulers
- AirBnB’s Chronos
- Twitter’s Aurora
 Mesospehere: startup to commercialize
MLBase
MLlib
Spark
Streaming.
BlinkDB
Shark
GraphX
Spark
Apache Spark
 Distributed Execution Engine
Mesos
Tachyon
HDFS, S3, …
- Fault-tolerant, efficient in-memory storage (RDDs)
- Powerful programming model and APIs (Scala, Python, Java)
 Fast: up to 100x faster than Hadoop
 Easy to use: 5-10x less code than Hadoop
 General: support interactive & iterative apps
 Two major releases since last AMPCamp
MLBase
MLlib
Spark
Streaming
Spark Streaming
BlinkDB
Shark
GraphX
Spark
Mesos
Tachyon
HDFS, S3, …
 Large scale streaming computation
 Implement streaming as a sequence of <1s jobs
- Fault tolerant
- Handle stragglers
- Ensure exactly one semantics
 Integrated with Spark: unifies batch, interactive, and batch
computations
 Alpha release (Spring, 2013)
MLBase
MLlib
Spark
Streaming
Shark
BlinkDB
Shark
GraphX
Spark
Mesos
Tachyon
HDFS, S3, …
 Hive over Spark: full support for HQL and UDFs
 Up to 100x when input is in memory
 Up to 5-10x when input is on disk
 Running on hundreds of nodes at Yahoo!
 Two major releases along Spark
MLBase
MLlib
Unified Programming Models
 Unified system for
SQL, graph processing,
machine learning
 All share the same set
of workers and caches
def logRegress(points: RDD[Point]): Vector {
var w = Vector(D, _ => 2 * rand.nextDouble - 1)
for (i <- 1 to ITERATIONS) {
val gradient = points.map { p =>
val denom = 1 + exp(-p.y * (w dot p.x))
(1 / denom - 1) * p.y * p.x
}.reduce(_ + _)
w -= gradient
}
w
}
val users = sql2rdd("SELECT * FROM user u
JOIN comment c ON c.uid=u.uid")
val features = users.mapRows { row =>
new Vector(extractFeature1(row.getInt("age")),
extractFeature2(row.getStr("country")),
...)}
val trainedVector = logRegress(features.cache())
Spark
Streaming
BlinkDB
Shark
GraphX
Spark
BlinkDB
Mesos
Tachyon
HDFS, S3, …
 Trade between query performance and
accuracy using sampling
512GB
 Why?
- In-memory processing doesn’t guarantee interactive
processing
- E.g., ~10’s sec just to scan 512 GB RAM!
40-60GB/s
- Gap between memory capacity and transfer rate increasing
16 cores
doubles
every 18
months
doubles
every 36
months
MLBase
MLlib
Spark
Streaming
Key Insights
BlinkDB
Shark
GraphX
MLBase
Spark
Mesos
Tachyon
HDFS, S3, …
 Input often noisy: exact computations do not guarantee exact
answers
 Error often acceptable if small and bounded
 Main challenge: estimate errors for arbitrary computations
 Alpha release (August, 2013)
- Allow users to build uniform and stratified samples
- Provide error bounds for simple aggregate queries
MLlib
Spark
Streaming
GraphX
BlinkDB
Shark
GraphX
Spark
Mesos
Tachyon
HDFS, S3, …
 Combine data-parallel and graph-parallel computations
 Provide powerful abstractions:
- PowerGraph, Pregel implemented in less than 20 LOC!
 Leverage Spark’s fault tolerance
 Alpha release: expected this fall
MLBase
MLlib
Spark
Streaming
MLlib and MLbase
BlinkDB
Shark
GraphX
Spark
Mesos
Tachyon
HDFS, S3, …
 MLlib: high quality library for ML algorithms
- Will be released with Spark 0.8 (September, 2013)
 MLbase: make ML accessible to non-experts
- Declarative interface: allow users to say what they want
- E.g., classify(data)
- Automatically pick best algorithm for given data, time
- Allow developers to easily add and test new algorithms
- Alpha release of MLI, first component of MLbase, in September, 2013
MLBase
MLlib
Spark
Streaming
Tachyon
BlinkDB
Shark
GraphX
MLBase
MLlib
Spark
Mesos
Tachyon
HDFS, S3, …
 In-memory, fault-tolerant storage system
 Flexible API, including HDFS API
 Allow multiple frameworks (including Hadoop) to share in-memory
data
 Alpha release (June, 2013)
Compatibility to Existing Ecosystem
Accept inputs from
Kafka, Flume, Twitter,
TCP Sockets, …
GraphLab API
BlinkDB
Spark
GraphX
Streaming Shark SQL
Hive API
MLBase
MLlib
Spark
Mesos
Resource Management
Layer
Support Hadoop,
Storm, MPI
Tachyon HDFS API
Storage Layer
HDFS, S3, …
Summary
 BDAS: address next Big Data challenges
 Unify batch, interactive, and streaming computations
 Easy to develop sophisticate applications
- Support graph & ML algorithms, approximate queries
 Witnessed significant adoption
- 20+ companies, 70+ individuals contributing code
 Exciting ongoing work
- MLbase, GraphX, BlinkDB, …
Interactive
Batch
Spar
k
Streamin
g
This Talk
 Introduction to Shark, Spark and Spark Streaming
 Architecture
 Deployment Methodology
 Performance
 References
RDDs
 Three methods for creation
- Parallelizing an existing collection
- Referencing a dataset
- From another RDD
 Dataset from any storage supported by Hadoop
- HDFS
- Cassandra
- HBase
- Amazon S3
- Others
 File types supported
- Text files
- SequenceFiles
- Hadoop InputFormat
Scala and Python
 Spark comes with two shells
- Scala
- Python
 APIs available for Scala, Python and Java
 Appropriate versions for each Spark release
 Spark’s native language is Scala, more natural to write Spark applications using
Scala.
 This presentation will focus on code examples in Scala
Spark’s Scala and Python Shell
 Powerful tool to analyze data interactively
 The Scala shell runs on the Java VM
- Can leverage existing Java libraries
 Scala:
- To launch the Scala shell (from Spark home directory):
./bin/spark-shell
- To read in a text file:
scala> val textFile = sc.textFile("README.txt")
 Python:
- To launch the Python shell (from Spark home directory):
./bin/pyspark
- To read in a text file:
>>> textFile = sc.textFile("README.txt")
Scala
 ‘Scalable Language’
 Object oriented, functional programming language
 Runs in a JVM
- Java Interoperability
 Functions are passable objects
 Two approaches
- Anonymous function syntax
x => x + 1
- Static methods in a global singleton object
object MyFunctions {
def func1 (s: String): String = {…}
}
myRdd.map(MyFunctions.func1)
Code Execution (1)
 ‘spark-shell’ provides Spark context as ‘sc’
// Create RDD
val quotes = sc.textFile("hdfs:/sparkdata/sparkQuotes.txt")
// Transformations
val danQuotes = quotes.filter(_.startsWith("DAN"))
val danSpark = danQuotes.map(_.split(" ")).map(x => x(1))
// Action
danSpark.filter(_.contains("Spark")).count()
File: sparkQuotes.txt
DAN Spark is cool
BOB Spark is fun
BRIAN Spark is great
DAN Scala is awesome
BOB Scala is flexible
Code Execution (2)
// Create RDD
val quotes = sc.textFile("hdfs:/sparkdata/sparkQuotes.txt")
// Transformations
val danQuotes = quotes.filter(_.startsWith("DAN"))
val danSpark = danQuotes.map(_.split(" ")).map(x => x(1))
// Action
danSpark.filter(_.contains("Spark")).count()
File: sparkQuotes.txt
DAN Spark is cool
BOB Spark is fun
BRIAN Spark is great
DAN Scala is awesome
BOB Scala is flexible
RDD: quotes
Code Execution (3)
// Create RDD
val quotes = sc.textFile("hdfs:/sparkdata/sparkQuotes.txt")
// Transformations
val danQuotes = quotes.filter(_.startsWith("DAN"))
val danSpark = danQuotes.map(_.split(" ")).map(x => x(1))
// Action
danSpark.filter(_.contains("Spark")).count()
File: sparkQuotes.txt
DAN Spark is cool
BOB Spark is fun
BRIAN Spark is great
DAN Scala is awesome
BOB Scala is flexible
RDD: quotes
RDD: danQuotes
Code Execution (4)
// Create RDD
val quotes = sc.textFile("hdfs:/sparkdata/sparkQuotes.txt")
// Transformations
val danQuotes = quotes.filter(_.startsWith("DAN"))
val danSpark = danQuotes.map(_.split(" ")).map(x => x(1))
// Action
danSpark.filter(_.contains("Spark")).count()
File: sparkQuotes.txt
DAN Spark is cool
BOB Spark is fun
BRIAN Spark is great
DAN Scala is awesome
BOB Scala is flexible
RDD: quotes
RDD: danQuotes
RDD: danSpark
Code Execution (5)
// Create RDD
val quotes = sc.textFile("hdfs:/sparkdata/sparkQuotes.txt")
// Transformations
val danQuotes = quotes.filter(_.startsWith("DAN"))
val danSpark = danQuotes.map(_.split(" ")).map(x => x(1))
// Action
danSpark.filter(_.contains("Spark")).count()
File: sparkQuotes.txt
RDD: quotes
RDD: danQuotes
RDD: danSpark
DAN Spark is cool
BOB Spark is fun
BRIAN Spark is great
DAN Scala is awesome
BOB Scala is flexible
DAN Spark is cool
BOB Spark is fun
BRIAN Spark is great
DAN Scala is awesome
BOB Scala is flexible
DAN Spark is cool
DAN Scala is awesome
Spark
Scala
HadoopR
DD
1
RDD Transformations
 Transformations are lazy evaluations
 Returns a pointer to the transformed RDD
Transformation
Meaning
map(func)
Return a new dataset formed by passing each element of the source through
a function func.
filter(func)
Return a new dataset formed by selecting those elements of the source on
which func returns true.
flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items.
So func should return a Seq rather than a single item
join(otherDataset,
[numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V,
W)) pairs with all pairs of elements for each key.
reduceByKey(func)
When called on a dataset of (K, V) pairs, returns a dataset of (K,V) pairs
where the values for each key are aggregated using the given reduce function
func
sortByKey([ascending],
[numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns
a dataset of (K,V) pairs sorted by keys in ascending or descending order.
Full documentation at http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.package
RDD Actions
 Actions returns values
Action
Meaning
collect()
Return all the elements of the dataset as an array of the driver program. This
is usually useful after a filter or another operation that returns a sufficiently
small subset of data.
count()
Return the number of elements in a dataset.
first()
Return the first element of the dataset
take(n)
Return an array with the first n elements of the dataset.
foreach(func)
Run a function func on each element of the dataset.
Full documentation at http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.package
RDD Persistence
 Each node stores any partitions of the cache that it computes in memory
 Reuses them in other actions on that dataset (or datasets derived from it)
- Future actions are much faster (often by more than 10x)
 Two methods for RDD persistence: persist() and cache()
Storage Level
Meaning
MEMORY_ONLY
Store as deserialized Java objects in the JVM. If the RDD does not fit in memory, part of
it will be cached. The other will be recomputed as needed. This is the default. The
cache() method uses this.
MEMORY_AND_DISK
Same except also store on disk if it doesn’t fit in memory. Read from memory and disk
when needed.
MEMORY_ONLY_SER
Store as serialized Java objects (one bye array per partition). Space efficient, but more
CPU intensive to read.
MEMORY_AND_DISK_SER
Similar to MEMORY_AND_DISK but stored as serialized objects.
DISK_ONLY
Store only on disk.
MEMORY_ONLY_2,
MEMORY_AND_DISK_2, etc.
Same as above, but replicate each partition on two cluster nodes
OFF_HEAP (experimental)
Store RDD in serialized format in Tachyon.
Quick Introduction to Data Frames
 Experimental API introduced in Spark 1.3
 Distributed Collection of Data organized in Columns
 Targeted at Python ecosystem
 Equivalent to Tables in Databases or DataFrame in R/PYTHON
 Much richer optimization than any other implementation of DF
 Can be constructed from a wide variety of sources and APIs
Create a DataFrame
val df = sqlContext.jsonFile("/home/ned/attendees.json")
df.show()
df.printSchema()
df.select ("First Name").show()
df.select("First Name","Age").show()
df.filter(df("age")>40).show()
df.groupBy("age").count().show()
Create a DataFrame from an RDD
case class attendees_class (first_name: String, last_name:String, age:Int)
Val
attendees=sc.textFile("/home/ned/attendees.csv").map(_.split(",")).map(p=>at
tendees_class(p(0),p(1),p(2).trim.toInt)).toDF()
people.registerTempTable("attendees")
val youngppl=sqlContext.sql("select first_name,last_name from attendees
where age <35")
youngppl.map(t=>"Name: " +t(0)+ " " + t(1)).collect().foreach(println)
SparkContext in Applications
 The main entry point for Spark functionality
 Represents the connection to a Spark cluster
 Create RDDs, accumulators, and broadcast variables on that cluster
 In the Spark shell, the SparkContext, sc, is automatically initialized for you to
use
 In a Spark program, import some classes and implicit conversions into your
program:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
A Spark Standalone Application in Scala
Import statements
Transformations
and Actions
SparkConf and
SparkContext
Running Standalone Applications
 Define the dependencies
- Scala - simple.sbt
 Create the typical directory structure with the files
Scala:
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
 Create a JAR package containing the application’s code.
- Scala: sbt package
 Use spark-submit to run the program
Fault Recovery
RDDs track lineage information that can be used to efficiently
recompute lost data
Ex:
msgs = textFile.filter(lambda s: s.startsWith(“ERROR”))
.map(lambda s: s.split(“\t”)[2])
HDFS File
Filtered RDD
filter
(func = _.contains(...))
Mapped RDD
map
(func = _.split(...))
Which Language Should I Use?
 Standalone programs can be written in any, but interactive
shell is only Python & Scala
 Python users: can do Python for both
 Java users: consider learning Scala for shell
Performance: Java & Scala are faster due to static typing, but
Python is often fine
Scala Cheat Sheet
 Variables:
Functions:
 var x: Int = 7
var x = 7
// type
inferred
def square(x: Int): Int = x*x
 val y = “hi”
// read-only
def square(x: Int): Int = {
x*x
// last line returned
}
Collections and closures:
Java interop:
val nums = Array(1, 2, 3)
import java.net.URL
nums.map((x: Int) => x + 2) // {3,4,5}
nums.map(x => x + 2)
// same
nums.map(_ + 2)
// same
new URL(“http://cnn.com”).openStream()
nums.reduce((x, y) => x + y) // 6
nums.reduce(_ + _)
// same
More details: scala-lang.org
Spark in Scala and Java
// Scala:
val lines = sc.textFile(...)
lines.filter(x => x.contains(“ERROR”)).count()
// Java:
JavaRDD<String> lines = sc.textFile(...);
lines.filter(new Function<String, Boolean>() {
Boolean call(String s) {
return s.contains(“error”);
}
}).count();
This Talk
 Introduction to Shark, Spark and Spark Streaming
 Architecture
 Deployment Methodology
 Performance
 References
Behavior with Less RAM
58
80
69
60
30
41
40
12
Execution time (s)
100
20
0
Cache
disabled
25%
50%
75%
% of working set in cache
Fully
cached
Performance
Can process 6 GB/sec (60M records/sec) of data on 100 nodes at
sub-second latency
7
Grep
6
5
4
3
2
1 sec
1
2 sec
0
Cluster Throughput (GB/s)
Cluster Thhroughput (GB/s)
- Tested with 100 text streams on 100 EC2 instances with 4 cores each
3.5
WordCount
3
2.5
2
1.5
1
1 sec
0.5
2 sec
0
0
50
# Nodes in Cluster
100
0
50
# Nodes in Cluster
100
Performance and Generality
(Unified Computation Models)
20
10
Shark
0
Interactive
(SQL, Shark)
80
60
40
20
Spark
Streaming
25
20
15
Storm
30
100
Spark
Time per Iteration (s)
40
Impala
Response Time (s)
50
30
Throughput (MB/s/node)
120
60
35
Hadoop
140
Hive
70
10
5
0
0
Batch
(ML, Spark)
Streaming
(SparkStreaming)
Example: Video Quality Diagnosis
Top 10 worse
performers identical!
440x faster!
Latency: 772.34 sec
(17TB input)
Latency: 1.78 sec
(1.7GB input)
This Talk
 Introduction to Shark, Spark and Spark Streaming
 Architecture
 Deployment Methodology
 Implementation Next Steps
 References
 https://amplab.cs.Berkeley.edu/software
 www.bigdatauniversity.com/bdu-wp/bdu-course/sparkfundamentals/
 www.ibm.com/analytics/us/en/technology/spark/
 https://amplab.cs.berkeley.edu/software/
THANK YOU