Slides - UCLA Computer Science
Download
Report
Transcript Slides - UCLA Computer Science
CS239-Lecture 7
UDF Optimizations
Madan Musuvathi
Visiting Professor, UCLA
Principal Researcher, Microsoft Research
Schedule update
Approximation
May 2 Guest Lecture Andy Konwinski of Databricks
May 4 a)
BlinkDB: Queries with Bounded Errors and Bounded Response Times on Very Large Data
b)
Quickr: Lazily Approximating Complex AdHoc Queries in BigData Clusters
Graph Analytics
May 9 a)
Distributed GraphLab: A Framework for Machine Learning and Data Mining in the Cloud
b)
Scalability! But at what COST?
May 11 a)
Latency-Tolerant Software Distributed Shared Memory
b)
Arabesque: A System for Distributed Graph Mining
Curation & Transactions
May 16 a)
Wrangler: Interactive Visual Speciο¬cation of Data Transformation Scripts
b)
Data Curation at Scale: The Data Tamer System
May 18 a)
No compromises: distributed transactions with consistency, availability, and performance
b)
Type-Aware Transactions for Faster Concurrent Code
Large Scale Machine Learning + Project Presentations
May 23 a)
Large Scale Distributed Deep Networks
b)
Scaling Distributed Machine Learning with the Parameter Server
May 25 Project Presentations
Zhaoxing Bu
Qi Zhao
Tushar Sudhakar Jee
Zhouyihai Ding
David Hong
Yushu Qin
Sakshi Goplani
Saswat Padhi
Lun Liu
Nisarg Shah
Liqiang Yu
Hao Chen
Paper presentations
read your paper well before the presentation (ideally, at least a week before)
- consult with me on the difficult parts
assume everyone has read the paper
- your goal is to quickly summarize the paper
- bring out the key points for discussion in the class
- cull insights about the topic by reading related work
plan on 15-20 minute talk with 25 minutes discussion
Pop Quiz
The State of California recently decided to charge sales taxes on Amazon shipments to California residents.
The following query calculates the additional taxes incurred by this change:
purchases: a set of (user_id, purchased_product, price_before_taxes)
addresses: a set of (user_id, address, zipcode)
A = join(purchases, addresses, purchases.user_id == addresses.user_id)
B = reduce(A, user_id, sum(price_before_taxes))
C = filter(B, if_in_california(zipcode))
Output C
Name two optimizations that you can perform on this query. Justify why these optimizations will produce a
faster plan
Opening the Black Boxes
in Data Flow Optimization
Yi Ding
[email protected]
Overview
What is a Black Box?
User Define Functions!
What does βopenβ mean?
Static Code Analysis!
No Knowledge of Algebraic Properties!
Background
Stratosphere:
Nephele engine + PACT complier
PACT
Map-Reduce
Map, Reduce, Cross, Match, CoGroup
Map, Reduce
Arbitrary acyclic data flow graphs
MapβReduce
Operator: Second-Order Function + First-Order
User-Defined Function(UDF).
Background
Example 1
π΄, π΅
π: πΌ β πππ1 β πππ2 β πππ3 β π
πππ1 β π1 : π΅ β π΅
π΄, π΄ β₯ 0
πππ2 β π2 : π΄ β
β₯, π΄ < 0
πππ3 β π3 : π΄ β π΄ + π΅
2, β3 β π1 β 2, 3 β π2 β 2, 3 β π3 β 5, 3
β2, β3 β π1 β β2, 3 β π2 ββ₯ π3 ββ₯
Reorder!
πβ² : πΌ β πππ2 β πππ1 β πππ3 β π
2, β3 β π2 β 2, β3 β π1 β 2, 3 β π3 β 5, 3
β2, β3 β π2 ββ₯β π1 ββ₯ π3 ββ₯
πβ² is better!
πβ²β² : πΌ β πππ2 β πππ3 β πππ1 β π
2, β3 β π2 β 2, β3 β π3 β β1, β3 β π1 β β1, 3
Wrong answer!
We donβt have any knowledge of π1 , π2 and π3 !
Review Example
π: πΌ β πππ1 β πππ2 β πππ3 β π Problem!!!!!
π΄, π΄ β₯ 0
πππ2 β π2 : π΄ β
β₯, π΄ < 0
πππ1 β π1 : π΅ β π΅
πππ3 β π3 : π΄ β π΄ + π΅
2, β3 β π1 β 2, 3 β π2 β 2, 3 β π3 β 5, 3
πβ²β² : πΌ β πππ2 β πππ3 β πππ1 β π
2, β3 β π2 β 2, β3 β π3 β β1, β3 β π1 β β1, 3
Wrong answer!
Conditions
Global record: π΄, π΅
Read set: π
β all attributes that might influence the
operatorβs output.
Write set: π β all attributes whose values change after the
operator.
π
π1 = π΅ , ππ1 = π΅ , π
π2 = π΄ , ππ2 = π, π
π3 = π΄, π΅ , ππ3 = π΄ ,
Conflict β one attribute appears in both operatorsβ write set
or in one operatorβs read set and in anotherβs write set.
ππ & ππ Conflict!!!
Condition
ROC Condition
π
π1 β© ππ2 = π
π2 β© ππ1 = ππ1 β© ππ2 = π
Example 2
π: πΌ β ππππ β π β π
πππ’πππ β π
π: ππππ‘ πππ πππ π£πππ’ππ ππ π΄ πππ π΅
π: πππππ’πππ‘π π‘βπ π π’ππ ππ π΅ π’π πππ π΄ ππ πππ¦,
πππ ππππππ πΆ ππ π‘βπ π π’π
π
π = ππ = π, π
π = π΄, π΅ , ππ = π΅, πΆ
ROC Holds!
Condition
KGP condition
function π, when applied to a set of records
πΌπ with the same value for πΎ, either emits or filters
all these records.
Together with ROC & KGP, we can determine
almost all UDFs whether they can be reordered!
πΆπππ π π π
, π β‘ ππππ π
× π
πππ‘πβπ π
, π β‘ ππππβ² π
× π
ππππ π
× π β‘ ππππ π
× π πππ π
π βͺ ππ β© π = π
πππ‘πβπ πππ‘πβπ π
, π , π
β πππ‘πβπ ππππβ² π
× π , π
β ππππβ² ππππβ² π
× π × π
β ππππβ² ππππβ² π
× π × π
β ππππβ² ππππβ² π
× π × π
β πππ
πβ²
π
× πππ
πβ²
π×π
β ππππβ² π
× πππ‘πβπ π, π
πππ‘πβπ π
, πππ‘πβπ π, π
πΉπβ² βͺ πΎπ β© π» = π
πΉπΆπͺ πͺπππ
πππππ
πΉπβ² βͺ πΎπ β© πΉ = π
Implementation
Read set $π‘ β πππ‘πΉππππ($ππ, π)
Write set $ππ = πππ€ ππ’π‘ππ’π‘π
πππππ($ππ),
π ππ‘πΉππππ $ππ, π, $π‘ ,
Enumeration
Evaluation
β’ Relational Online Analysis Processing
β’ Text mining
β’ Non-relational Clickstream Processing
New Model:
Cost Model:
Static Code Analysis,
Enumeration
Network IO, Disk IO,
CPU Cost
Evaluation
Future Work
β’ Enumeration costs time
β’ To identify beneficial reorderings
β’ Wider range of optimizations
β’ Semantic information
Spotting Code Optimization in Data Parallel Pipelines through
PeriSCOPE
MUHAMMAD ALI GULZAR
Outline
β¦ Introduction
β¦ Running Example
β¦ Optimizations using PeriSCOPE
β¦ Column Reduction
β¦ Early Filtering
β¦ Smart Cut
β¦ Evaluation
Data Shuffling is Expensive
Disconnect b/w Pipeline and Code Optimizers
Pipeline Optimizers
β¦ Logical optimizers work on high level pipeline topology
β¦ Unable to consider procedural code (black box) while
optimizing
Compiler Optimizations
β¦ Works only on user defined functions
β¦ Unaware of the pipeline -> No inter-stage optimizations
Manual Optimizations
β¦ Tedious
β¦ Error-prone
Spark, MapReduce etc , being unaware of the UDFs, apply only
logical optimizations on the workflow
Example
If filter GetLength(query) > 4 was written in the procedural code
PScoreReducer then this optimization could not be performed
PeriSCOPE -- Overview
β¦ Optimizes procedural code of SCOOPE programs
β¦ Connects data-flow of the program to the high-level
pipeline
β¦ Searches for code candidates susceptible to
optimizations
β¦ Three core optimizations:
β¦ Column reduction
β¦ Early filtering
β¦ Smart cut
Running Example
Column Reduction
Problem
β¦ Unused column in data parallel programs consume high network I/O
β¦ Well established concept i.e. Early Projection and also applied on procedural code (UDFs) [Manimal '11]
Solution by PeriSCOPE
β¦ Analyze dependency between input and output columns of all operators
β¦ Remove unused columns and associated computations
Limitation in PeriSCOPE's column reduction
β¦
No column reduction can be applied if the column name is unresolved
Column Reduction in Example
Code Motion
β¦
β¦
β¦
β¦
Moving code from user defined functions across stages
Goal is to reduce the number and size of inter stage rows.
Program correctness has to be maintained --- Safe Code Motion
Safety Rules
β¦ Rule 1: Must not move statement if it generates shuffle-key column
β¦ Rule 2: Must not move stateful statement across shuffle
β¦ Rule 3: Must not filtering statement if it is, or is reachable from , a stateful statement
β¦ Perform loop dependency analysis to confront dependencies in different iterations of loop
Early Filtering
β¦ Filtering earlier reduces the I/O burden on shuffling
β¦ Similar to Early Selection
β¦ Related works [Manimal etc] perform the same optimizationβbut can not rewrite UDFs
How ?
β¦
β¦
β¦
β¦
First UDF after shuffling is analyzed for filtering statements
Stateful filtering statements are removed
Backward slice to collect the filter and the statements that can affect it
Removal and addition of code slice into appropriate earlier location (Rewriting UDFs)
Early Filtering -- Example
Smart Cut
β¦ Minimize cross stage data flow
β¦ Similar to combiners but more fine grained β instruction level
How ?
β¦ Move row-local computations upstream to leverage parallelism
β¦ Builds a data dependency graph -- Vertices are instructions, Edges are RAW
β¦ Edge weight is name and size of the dependent variable
β¦ Applies a minimum cut algorithm (one directional, same variables are counted once)
Smart Cut -- Example
Evaluation
Optimization coverage evaluation on 28K jobs
Nature and type of jobs are not clear
Case Study
Performance improvements
I/O reduction at shuffling phase
Thoughts
β¦
β¦
β¦
β¦
β¦
Intelligent tool to perform optimizations from within UDFs.
Not platform specific β Can be generalized to other frameworks
Early filtering and column reduction is already well established optimizations in Databases
Column reduction can only be applied only to columnar data engines
In the context of Spark, users are bound to use specific APIs designed for filters. Early filtering is not
needed.