Slides - UCLA Computer Science

Download Report

Transcript Slides - UCLA Computer Science

CS239-Lecture 7
UDF Optimizations
Madan Musuvathi
Visiting Professor, UCLA
Principal Researcher, Microsoft Research
Schedule update
Approximation
May 2 Guest Lecture Andy Konwinski of Databricks
May 4 a)
BlinkDB: Queries with Bounded Errors and Bounded Response Times on Very Large Data
b)
Quickr: Lazily Approximating Complex AdHoc Queries in BigData Clusters
Graph Analytics
May 9 a)
Distributed GraphLab: A Framework for Machine Learning and Data Mining in the Cloud
b)
Scalability! But at what COST?
May 11 a)
Latency-Tolerant Software Distributed Shared Memory
b)
Arabesque: A System for Distributed Graph Mining
Curation & Transactions
May 16 a)
Wrangler: Interactive Visual Specification of Data Transformation Scripts
b)
Data Curation at Scale: The Data Tamer System
May 18 a)
No compromises: distributed transactions with consistency, availability, and performance
b)
Type-Aware Transactions for Faster Concurrent Code
Large Scale Machine Learning + Project Presentations
May 23 a)
Large Scale Distributed Deep Networks
b)
Scaling Distributed Machine Learning with the Parameter Server
May 25 Project Presentations
Zhaoxing Bu
Qi Zhao
Tushar Sudhakar Jee
Zhouyihai Ding
David Hong
Yushu Qin
Sakshi Goplani
Saswat Padhi
Lun Liu
Nisarg Shah
Liqiang Yu
Hao Chen
Paper presentations
read your paper well before the presentation (ideally, at least a week before)
- consult with me on the difficult parts
assume everyone has read the paper
- your goal is to quickly summarize the paper
- bring out the key points for discussion in the class
- cull insights about the topic by reading related work
plan on 15-20 minute talk with 25 minutes discussion
Pop Quiz
The State of California recently decided to charge sales taxes on Amazon shipments to California residents.
The following query calculates the additional taxes incurred by this change:
purchases: a set of (user_id, purchased_product, price_before_taxes)
addresses: a set of (user_id, address, zipcode)
A = join(purchases, addresses, purchases.user_id == addresses.user_id)
B = reduce(A, user_id, sum(price_before_taxes))
C = filter(B, if_in_california(zipcode))
Output C
Name two optimizations that you can perform on this query. Justify why these optimizations will produce a
faster plan
Opening the Black Boxes
in Data Flow Optimization
Yi Ding
[email protected]
Overview
What is a Black Box?
User Define Functions!
What does β€œopen” mean?
Static Code Analysis!
No Knowledge of Algebraic Properties!
Background
Stratosphere:
Nephele engine + PACT complier
PACT
Map-Reduce
Map, Reduce, Cross, Match, CoGroup
Map, Reduce
Arbitrary acyclic data flow graphs
Map→Reduce
Operator: Second-Order Function + First-Order
User-Defined Function(UDF).
Background
Example 1
𝐴, 𝐡
𝑃: 𝐼 β†’ π‘€π‘Žπ‘1 β†’ π‘€π‘Žπ‘2 β†’ π‘€π‘Žπ‘3 β†’ 𝑂
π‘€π‘Žπ‘1 βˆ’ 𝑓1 : 𝐡 β†’ 𝐡
𝐴, 𝐴 β‰₯ 0
π‘€π‘Žπ‘2 βˆ’ 𝑓2 : 𝐴 β†’
βŠ₯, 𝐴 < 0
π‘€π‘Žπ‘3 βˆ’ 𝑓3 : 𝐴 β†’ 𝐴 + 𝐡
2, βˆ’3 β†’ 𝑓1 β†’ 2, 3 β†’ 𝑓2 β†’ 2, 3 β†’ 𝑓3 β†’ 5, 3
βˆ’2, βˆ’3 β†’ 𝑓1 β†’ βˆ’2, 3 β†’ 𝑓2 β†’βŠ₯ 𝑓3 β†’βŠ₯
Reorder!
𝑃′ : 𝐼 β†’ π‘€π‘Žπ‘2 β†’ π‘€π‘Žπ‘1 β†’ π‘€π‘Žπ‘3 β†’ 𝑂
2, βˆ’3 β†’ 𝑓2 β†’ 2, βˆ’3 β†’ 𝑓1 β†’ 2, 3 β†’ 𝑓3 β†’ 5, 3
βˆ’2, βˆ’3 β†’ 𝑓2 β†’βŠ₯β†’ 𝑓1 β†’βŠ₯ 𝑓3 β†’βŠ₯
𝑃′ is better!
𝑃′′ : 𝐼 β†’ π‘€π‘Žπ‘2 β†’ π‘€π‘Žπ‘3 β†’ π‘€π‘Žπ‘1 β†’ 𝑂
2, βˆ’3 β†’ 𝑓2 β†’ 2, βˆ’3 β†’ 𝑓3 β†’ βˆ’1, βˆ’3 β†’ 𝑓1 β†’ βˆ’1, 3
Wrong answer!
We don’t have any knowledge of 𝑓1 , 𝑓2 and 𝑓3 !
Review Example
𝑃: 𝐼 β†’ π‘€π‘Žπ‘1 β†’ π‘€π‘Žπ‘2 β†’ π‘€π‘Žπ‘3 β†’ 𝑂 Problem!!!!!
𝐴, 𝐴 β‰₯ 0
π‘€π‘Žπ‘2 βˆ’ 𝑓2 : 𝐴 β†’
βŠ₯, 𝐴 < 0
π‘€π‘Žπ‘1 βˆ’ 𝑓1 : 𝐡 β†’ 𝐡
π‘€π‘Žπ‘3 βˆ’ 𝑓3 : 𝐴 β†’ 𝐴 + 𝐡
2, βˆ’3 β†’ 𝑓1 β†’ 2, 3 β†’ 𝑓2 β†’ 2, 3 β†’ 𝑓3 β†’ 5, 3
𝑃′′ : 𝐼 β†’ π‘€π‘Žπ‘2 β†’ π‘€π‘Žπ‘3 β†’ π‘€π‘Žπ‘1 β†’ 𝑂
2, βˆ’3 β†’ 𝑓2 β†’ 2, βˆ’3 β†’ 𝑓3 β†’ βˆ’1, βˆ’3 β†’ 𝑓1 β†’ βˆ’1, 3
Wrong answer!
Conditions
Global record: 𝐴, 𝐡
Read set: 𝑅 – all attributes that might influence the
operator’s output.
Write set: π‘Š – all attributes whose values change after the
operator.
𝑅𝑓1 = 𝐡 , π‘Šπ‘“1 = 𝐡 , 𝑅𝑓2 = 𝐴 , π‘Šπ‘“2 = πœ™, 𝑅𝑓3 = 𝐴, 𝐡 , π‘Šπ‘“3 = 𝐴 ,
Conflict – one attribute appears in both operators’ write set
or in one operator’s read set and in another’s write set.
π’‡πŸ & π’‡πŸ‘ Conflict!!!
Condition
ROC Condition
𝑅𝑓1 ∩ π‘Šπ‘“2 = 𝑅𝑓2 ∩ π‘Šπ‘“1 = π‘Šπ‘“1 ∩ π‘Šπ‘“2 = πœ™
Example 2
𝑃: 𝐼 β†’ π‘€π‘Žπ‘π‘“ β†’ 𝑆 β†’ 𝑅𝑒𝑑𝑒𝑐𝑒𝑔 β†’ 𝑂
𝑓: π‘’π‘šπ‘–π‘‘ π‘Žπ‘™π‘™ π‘œπ‘‘π‘‘ π‘£π‘Žπ‘™π‘’π‘’π‘  π‘œπ‘“ 𝐴 π‘Žπ‘›π‘‘ 𝐡
𝑔: π‘π‘Žπ‘™π‘π‘’π‘™π‘Žπ‘‘π‘’ π‘‘β„Žπ‘’ π‘ π‘’π‘šπ‘  π‘œπ‘“ 𝐡 𝑒𝑠𝑖𝑛𝑔 𝐴 π‘Žπ‘  π‘˜π‘’π‘¦,
π‘Žπ‘›π‘‘ π‘Žπ‘π‘π‘’π‘›π‘‘ 𝐢 π‘Žπ‘  π‘‘β„Žπ‘’ π‘ π‘’π‘š
𝑅𝑓 = π‘Šπ‘“ = πœ™, 𝑅𝑔 = 𝐴, 𝐡 , π‘Šπ‘” = 𝐡, 𝐢
ROC Holds!
Condition
KGP condition
function 𝑓, when applied to a set of records
πΌπ‘˜ with the same value for 𝐾, either emits or filters
all these records.
Together with ROC & KGP, we can determine
almost all UDFs whether they can be reordered!
πΆπ‘Ÿπ‘œπ‘ π‘ π‘“ 𝑅, 𝑆 ≑ π‘€π‘Žπ‘π‘“ 𝑅 × π‘†
π‘€π‘Žπ‘‘π‘β„Žπ‘“ 𝑅, 𝑆 ≑ π‘€π‘Žπ‘π‘“β€² 𝑅 × π‘†
π‘€π‘Žπ‘π‘“ 𝑅 × π‘† ≑ π‘€π‘Žπ‘π‘“ 𝑅 × π‘† 𝑖𝑓𝑓 𝑅𝑓 βˆͺ π‘Šπ‘“ ∩ 𝑆 = πœ™
π‘€π‘Žπ‘‘π‘β„Žπ‘” π‘€π‘Žπ‘‘π‘β„Žπ‘“ 𝑅, 𝑆 , 𝑇
β†’ π‘€π‘Žπ‘‘π‘β„Žπ‘” π‘€π‘Žπ‘π‘“β€² 𝑅 × π‘† , 𝑇
β†’ π‘€π‘Žπ‘π‘”β€² π‘€π‘Žπ‘π‘“β€² 𝑅 × π‘† × π‘‡
β†’ π‘€π‘Žπ‘π‘”β€² π‘€π‘Žπ‘π‘“β€² 𝑅 × π‘† × π‘‡
β†’ π‘€π‘Žπ‘π‘“β€² π‘€π‘Žπ‘π‘”β€² 𝑅 × π‘† × π‘‡
β†’ π‘€π‘Žπ‘
𝑓′
𝑅 × π‘€π‘Žπ‘
𝑔′
𝑆×𝑇
β†’ π‘€π‘Žπ‘π‘“β€² 𝑅 × π‘€π‘Žπ‘‘π‘β„Žπ‘” 𝑆, 𝑇
π‘€π‘Žπ‘‘π‘β„Žπ‘“ 𝑅, π‘€π‘Žπ‘‘π‘β„Žπ‘” 𝑆, 𝑇
𝑹𝒇′ βˆͺ 𝑾𝒇 ∩ 𝑻 = 𝝓
𝑹𝑢π‘ͺ π‘ͺπ’π’π’…π’Šπ’•π’Šπ’π’
π‘Ήπ’ˆβ€² βˆͺ π‘Ύπ’ˆ ∩ 𝑹 = 𝝓
Implementation
Read set $𝑑 ≔ 𝑔𝑒𝑑𝐹𝑖𝑒𝑙𝑑($π‘–π‘Ÿ, 𝑛)
Write set $π‘œπ‘Ÿ = 𝑛𝑒𝑀 π‘‚π‘’π‘‘π‘π‘’π‘‘π‘…π‘’π‘π‘œπ‘Ÿπ‘‘($π‘–π‘Ÿ),
𝑠𝑒𝑑𝐹𝑖𝑒𝑙𝑑 $π‘œπ‘Ÿ, 𝑛, $𝑑 ,
Enumeration
Evaluation
β€’ Relational Online Analysis Processing
β€’ Text mining
β€’ Non-relational Clickstream Processing
New Model:
Cost Model:
Static Code Analysis,
Enumeration
Network IO, Disk IO,
CPU Cost
Evaluation
Future Work
β€’ Enumeration costs time
β€’ To identify beneficial reorderings
β€’ Wider range of optimizations
β€’ Semantic information
Spotting Code Optimization in Data Parallel Pipelines through
PeriSCOPE
MUHAMMAD ALI GULZAR
Outline
β—¦ Introduction
β—¦ Running Example
β—¦ Optimizations using PeriSCOPE
β—¦ Column Reduction
β—¦ Early Filtering
β—¦ Smart Cut
β—¦ Evaluation
Data Shuffling is Expensive
Disconnect b/w Pipeline and Code Optimizers
Pipeline Optimizers
β—¦ Logical optimizers work on high level pipeline topology
β—¦ Unable to consider procedural code (black box) while
optimizing
Compiler Optimizations
β—¦ Works only on user defined functions
β—¦ Unaware of the pipeline -> No inter-stage optimizations
Manual Optimizations
β—¦ Tedious
β—¦ Error-prone
Spark, MapReduce etc , being unaware of the UDFs, apply only
logical optimizations on the workflow
Example
If filter GetLength(query) > 4 was written in the procedural code
PScoreReducer then this optimization could not be performed
PeriSCOPE -- Overview
β—¦ Optimizes procedural code of SCOOPE programs
β—¦ Connects data-flow of the program to the high-level
pipeline
β—¦ Searches for code candidates susceptible to
optimizations
β—¦ Three core optimizations:
β—¦ Column reduction
β—¦ Early filtering
β—¦ Smart cut
Running Example
Column Reduction
Problem
β—¦ Unused column in data parallel programs consume high network I/O
β—¦ Well established concept i.e. Early Projection and also applied on procedural code (UDFs) [Manimal '11]
Solution by PeriSCOPE
β—¦ Analyze dependency between input and output columns of all operators
β—¦ Remove unused columns and associated computations
Limitation in PeriSCOPE's column reduction
β—¦
No column reduction can be applied if the column name is unresolved
Column Reduction in Example
Code Motion
β—¦
β—¦
β—¦
β—¦
Moving code from user defined functions across stages
Goal is to reduce the number and size of inter stage rows.
Program correctness has to be maintained --- Safe Code Motion
Safety Rules
β—¦ Rule 1: Must not move statement if it generates shuffle-key column
β—¦ Rule 2: Must not move stateful statement across shuffle
β—¦ Rule 3: Must not filtering statement if it is, or is reachable from , a stateful statement
β—¦ Perform loop dependency analysis to confront dependencies in different iterations of loop
Early Filtering
β—¦ Filtering earlier reduces the I/O burden on shuffling
β—¦ Similar to Early Selection
β—¦ Related works [Manimal etc] perform the same optimizationβ€”but can not rewrite UDFs
How ?
β—¦
β—¦
β—¦
β—¦
First UDF after shuffling is analyzed for filtering statements
Stateful filtering statements are removed
Backward slice to collect the filter and the statements that can affect it
Removal and addition of code slice into appropriate earlier location (Rewriting UDFs)
Early Filtering -- Example
Smart Cut
β—¦ Minimize cross stage data flow
β—¦ Similar to combiners but more fine grained – instruction level
How ?
β—¦ Move row-local computations upstream to leverage parallelism
β—¦ Builds a data dependency graph -- Vertices are instructions, Edges are RAW
β—¦ Edge weight is name and size of the dependent variable
β—¦ Applies a minimum cut algorithm (one directional, same variables are counted once)
Smart Cut -- Example
Evaluation
Optimization coverage evaluation on 28K jobs
Nature and type of jobs are not clear
Case Study
Performance improvements
I/O reduction at shuffling phase
Thoughts
β—¦
β—¦
β—¦
β—¦
β—¦
Intelligent tool to perform optimizations from within UDFs.
Not platform specific – Can be generalized to other frameworks
Early filtering and column reduction is already well established optimizations in Databases
Column reduction can only be applied only to columnar data engines
In the context of Spark, users are bound to use specific APIs designed for filters. Early filtering is not
needed.