Transcript 2 - CWI

Distributed Query Processing
Agenda
•
•
•
•
Recap of query optimization
Transformation rules for P&D systems
Memoization
Queries in heterogenous systems
• Query evaluation strategies
• Eddies
• Open-ended and stream-based queries
Introduction
• Alternative ways of evaluating a given query
– Equivalent expressions
– Different algorithms for each operation (Chapter 13)
• Cost difference between a good and a bad way of evaluating a query
can be enormous
– Example: performing a r X s followed by a selection r.A = s.B is
much slower than performing a join on the same condition
• Need to estimate the cost of operations
– Depends critically on statistical information about relations which
the database must maintain
– Need to estimate statistics for intermediate results to compute cost
of complex expressions
Introduction (Cont.)
Relations generated by two equivalent expressions have the same set
of attributes and contain the same set of tuples, although their
attributes may be ordered differently.
Introduction (Cont.)
• Generation of query-evaluation plans for an expression
involves several steps:
1. Generating logically equivalent expressions
• Use equivalence rules to transform an expression
into an equivalent one.
2. Annotating resultant expressions to get alternative
query plans
3. Choosing the cheapest plan based on estimated cost
• The overall process is called cost based optimization.
Equivalence Rules
1. Conjunctive selection operations can be deconstructed
into a sequence of individual selections.
   ( E )    (  ( E ))
2. Selection operations are commutative.
1
2
1
2
  (  ( E ))    (  ( E ))
1
2
2
1
3. Only the last in a sequence of projection operations is
needed, the others can be omitted.
t1 (t2 ((tn (E ))))  t1 (E )
4. Selections can be combined with Cartesian products and
theta joins.
a. (E1 X E2) = E1  E2
b. 1(E1 2 E2) = E1 1 2 E2
Equivalence Rules (Cont.)
5. Theta-join operations (and natural joins) are commutative.
E1  E2 = E2  E1
6. (a) Natural join operations are associative:
(E1 E2) E3 = E1 (E2 E3)
(b) Theta joins are associative in the following manner:
(E1
1 E2)
2  3
E3 = E1
2 3
(E2
2
E3)
where 2 involves attributes from only E2 and E3.
Pictorial Depiction of Equivalence Rules
Equivalence Rules (Cont.)
7. The selection operation distributes over the theta join
operation under the following two conditions:
(a) When all the attributes in 0 involve only the attributes
of one of the expressions (E1) being joined.
0E1

E2) = (0(E1))

E2
(b) When  1 involves only the attributes of E1 and 2
involves only the attributes of E2.
1 E1  E2) = (1(E1))  ( (E2))
Equivalence Rules (Cont.)
8. The projections operation distributes over the theta join
operation as follows:
(a) if  involves only attributes from L1  L2:
 L1  L2 ( E1.......  E2 )  ( L1 ( E1 ))......  ( L2 ( E2 ))
(b) Consider a join E1  E2.
– Let L1 and L2 be sets of attributes from E1 and E2,
respectively.
– Let L3 be attributes of E1 that are involved in join
condition , but are not in L1  L2, and
– let L4 be attributes of E2 that are involved in join
condition , but are not in L1  L2.
 L1  L2 ( E1..... E2 )   L1  L2 (( L1  L3 ( E1 ))......  ( L2  L4 ( E2 )))
Equivalence Rules (Cont.)
9. The set operations union and intersection are commutative
E1  E2 = E2  E1
E1  E2 = E2  E1
 (set difference is not commutative).
10. Set union and intersection are associative.
(E1  E2)  E3 = E1  (E2  E3)
(E1  E2)  E3 = E1  (E2  E3)
11. The selection operation distributes over ,  and –.
 (E1
– E2) =  (E1) –
(E2)
and similarly for  and  in place of –
Also:
 (E1
– E2) = (E1) – E2
and similarly for  in place of –, but not for 
12. The projection operation distributes over union
L(E1  E2) = (L(E1))  (L(E2))
Multiple Transformations (Cont.)
Optimizer strategies
• Heuristic
– Apply the transformation rules in a specific order such
that the cost converges to a minimum
• Cost based
– Simulated annealing
– Randomized generation of candidate QEP
– Problem, how to guarantee randomness
Memoization Techniques
• How to generate alternative Query Evaluation Plans?
– Early generation systems centred around a tree representation of
the plan
– Hardwired tree rewriting rules are deployed to enumerate part of
the space of possible QEP
– For each alternative the total cost is determined
– The best (alternatives) are retained for execution
– Problems: very large space to explore, duplicate plans, local
maxima, expensive query cost evaluation.
– SQL Server optimizer contains about 300 rules to be deployed.
Memoization Techniques
• How to generate alternative Query Evaluation Plans?
– Keep a memo of partial QEPs and their cost.
– Use the heuristic rules to generate alternatives to built
more complex QEPs
– r1
r2
r3
r4
r4
r3
r2 r1
r1 r2
Level n plans
Level 2 plans
r3
r2 r3
r3 r4
x
r1 r4
Level 1 plans
Distributed Query Processing
• For centralized systems, the primary criterion for
measuring the cost of a particular strategy is the number of
disk accesses.
• In a distributed system, other issues must be taken into
account:
– The cost of a data transmission over the network.
– The potential gain in performance from having several
sites process parts of the query in parallel.
Transformation rules for distributed systems
• Primary horizontally fragmented table:
– Rule 9: The union is commutative
E1  E2 = E2  E1
– Rule 10: Set union is associative.
(E1  E2)  E3 = E1  (E2  E3)
– Rule 12: The projection operation distributes over union
L(E1  E2) = (L(E1))  (L(E2))
• Derived horizontally fragmented table:
– The join through foreign-key dependency is already reflected in
the fragmentation criteria
Transformation rules for distributed systems
Vertical fragmented tables:
– Rules: Hint look at projection rules
Optimization in Par & Distr
• Cost model is changed!!!
– Network transport is a dominant cost factor
• The facilities for query processing are not homogenous
distributed
– Light-resource systems form a bottleneck
– Need for dynamic load scheduling
Simple Distributed Join Processing
• Consider the following relational algebra expression in
which the three relations are neither replicated nor
fragmented
account depositor
branch
•
•
•
•
account is stored at site S1
depositor at S2
branch at S3
For a query issued at site SI, the system needs to produce
the result at site SI
Possible Query Processing Strategies
• Ship copies of all three relations to site SI and choose a
strategy for processing the entire locally at site SI.
• Ship a copy of the account relation to site S2 and compute
temp1 = account depositor at S2. Ship temp1 from S2 to
S3, and compute temp2 = temp1 branch at S3. Ship the
result temp2 to SI.
• Devise similar strategies, exchanging the roles S1, S2, S3
• Must consider following factors:
– amount of data being shipped
– cost of transmitting a data block between sites
– relative processing speed at each site
Semijoin Strategy
• Let r1 be a relation with schema R1 stores at site S1
Let r2 be a relation with schema R2 stores at site S2
• Evaluate the expression r1 r2 and obtain the result at S1.
1. Compute temp1  R1  R2 (r1) at S1.
2. Ship temp1 from S1 to S2.
3. Compute temp2  r2 temp1 at S2
4. Ship temp2 from S2 to S1.
5. Compute r1 temp2 at S1. This is the same as r1 r2.
Formal Definition
• The semijoin of r1 with r2, is denoted by:
r1 r2
• it is defined by:
R1 (r1
r2 )
• Thus, r1 r2 selects those tuples of r1 that contributed to r1 r2.
• In step 3 above, temp2=r2 r1.
• For joins of several relations, the above strategy can be extended
to a series of semijoin steps.
Join Strategies that Exploit Parallelism
• Consider r1
r2
r3
r4 where relation ri is stored at site Si.
The result must be presented at site S1.
• r1 is shipped to S2 and r1
r2 is computed at S2: simultaneously r3
is shipped to S4 and r3 r4 is computed at S4
• S2 ships tuples of (r1
S4 ships tuples of (r3
r2) to S1 as they produced;
r4) to S1
• Once tuples of (r1 r2) and (r3
r4) arrive at S1 (r1
r2) (r3
r4) is computed in parallel with the computation of (r1 r2) at S2
and the computation of (r3 r4) at S4.
Query plan generation
• Apers-Aho-Hopcroft
– Hill-climber, repeatedly split the multi-join query in
fragments and optimize its subqueries independently
• Apply centralized algorithms and rely on cost-model to
avoid expensive query execution plans.
Query evaluators
Query evaluation strategy
• Pipe-line query evaluation strategy
– Called Volcano query processing model
– Standard in commercial systems and MySQL
• Basic algorithm:
– Demand-driven evaluation of query tree.
– Operators exchange data in units such as records
– Each operator supports the following interfaces:– open, next, close
• open() at top of tree results in cascade of opens down the tree.
• An operator getting a next() call may recursively make next()
calls from within to produce its next answer.
• close() at top of tree results in cascade of close down the tree
Query evaluation strategy
• Pipe-line query evaluation strategy
– Evaluation:
• Oriented towards OLTP applications
– Granule size of data interchange
• Items produced one at a time
• No temporary files
– Choice of intermediate buffer size allocations
• Query executed as one process
• Generic interface, sufficient to add the iterator primitives for
the new containers.
• CPU intensive
• Amenable to parallelization
Query evaluation strategy
• Materialized evaluation strategy
– Used in MonetDB
– Basic algorithm:
• for each relational operator produce the complete intermediate
result using materialized operands
– Evaluation:
• Oriented towards decision support queries
• Limited internal administration and dependencies
• Basis for multi-query optimization strategy
• Memory intensive
• Amendable for distributed/parallel processing
Eddies: Continuously Adaptive Query processing
R. Avnur, J.M. Hellerstein
UCB
ACM Sigmod 2000
Problem Statement
• Context: large federated and shared-nothing databases
• Problem: assumptions made at query optimization rarely
hold during execution
• Hypothesis: do away with traditional optimizers, solve it
thru adaptation
• Focus: scheduling in a tuple-based pipeline query
execution model
Problem Statement Refinement
• Large scale systems are unpredictable, because
– Hardware and workload complexity,
• bursty servers & networks, heterogenity, hardware
characteristics
– Data complexity,
• Federated database often come without proper
statistical summaries
– User Interface Complexity
• Online aggregation may involve user ‘control’
Research Laboratory setting
• Telegraph, a system designed to query all data available online
• River, a low level distributed record management system for sharednothing databases
• Eddies, a scheduler for dispatching work over operators in a query
graph
The Idea
• Relational algebra operators consume a stream from
multiple sources to produce a new stream
• A priori you don’t now how selective- how fast- tuples are
consumed/produced
• You have to adapt continuously and learn this information
on the fly
• Adapt the order of processing based on these lessons
The Idea
JOIN
next
next
JOIN
JOIN
next
next
next
next
The Idea
• Standard method: derive a spanning tree over the query graph
• Pre-optimize a query plan to determine operator pairs and their
algorithm, e.g. to exploit access paths
• Re-optimization a query pipeline on the fly requires careful state
management, coupled with
– Synchronization barriers
• Operators have widely differing arrival rates for their operands
– This limits concurrency, e.g. merge-join algorithm
– Moments of symmetry
• Algorithm provides option to exchange the role of the operands
without too much complications
– E.g switching the role of R and S in a nested-loop join
Nested-loop
R
s
Join and sorting
• Index-joins are asymmetric, you can not easily change their role
– Combine index-join + operands as a unit in the process
• Sorting requires look-ahead
– Merge-joins are combined into unit
• Ripple joins
– Break the space into smaller pieces and solve the join operation for
each piece individually
– The piece crossings are moments of symmetry
The Idea
JOIN
Tuple buffer
JOIN
next
JOIN
next
next
next
Eddie
next
next
next
next
Rivers and Eddies
Eddies are tuple routers that distribute arriving tuples to interested
operators
– What are efficient scheduling policies?
• Fixed strategy? Random ? Learning?
Static Eddies
• Delivery of tuples to operators can be hardwired in the Eddie to reflect
a traditional query execution plan
Naïve Eddie
• Operators are delivered tuples based on a priority queue
• Intermediate results get highest priority to avoid buffer congestion
Observations for selections
• Extended priority queue for the operators
– Receiving a tuple leads to a credit increment
– Returning a tuple leads to a credit decrement
– Priority is determined by “weighted lottery”
• Naïve Eddies exhibit back pressure in the tuple flow; production is
limited by the rate of consumption at the output
• Lottery Eddies approach the cost of optimal ordering, without a need
to a priory determine the order
• Lottery Eddies outperform heuristics
– Hash-use first, or Index-use first, Naive
Observations
• The dynamics during a run can be controlled by a learning scheme
– Split the processing in steps (‘windows’) to re-adjust the weight
during tuple delivery
• Initial delays can not be handled efficiently
• Research challenges:
– Better learning algorithms to adjust flow
– Aggressive adjustments
– Remove pre-optimization
– Balance ‘hostile’ parallel environment
– Deploy eddies to control degree of partitioning (and replication)
The tranquil database scene
• Traditional DBMS – data stored in finite, persistent data
sets, SQL-based applications to manage and access it
Data entry
application
OLTP-web
application
RDBMS
‘Ad-hoc’
reporting
The tranquil database scene
• The user community grows and MANY wants up-to-thesecond (aggregate) information from the database
Data entry
application
OLTP-web
application
‘Ad-hoc’
reporting
RDBMS
Informed
reporting
The tranquil database scene
• Database entry is taken over by a remote device which
issues a high-volume of update transactions
Data entry
application
Dataentry
application
OLTP-web
application
‘Ad-hoc’
reporting
RDBMS
Informed
reporting
The tranquil database scene
• Database entry is taken over by MANY remote devices
which issues a high-volume of update transactions
Dataentry
application
Dataentry
application
OLTP-web
application
‘Adhoc’
reporting
RDBMS
Informed
reporting
The tranquil database scene
• Database solutions can not carry the weight
Dataentry
application
Dataentry
application
OLTP-web
application
‘Adhoc’
reporting
RDBMS
Informed
reporting
Application domains
• Personalized financial tickers
• Personalized information delivery
• Personalized environment control
• Business to business middelware
• Web-services application based on XML exchange
•
•
•
•
•
Monitoring the real-world environment (pollution, traffic)
Monitoring the data flow in an ISP
Monitoring web-traffic behaviour
Monitoring the load on a telecom switch
Monitoring external news-feeds
Application domains
• Personalized financial tickers
• Personalized information delivery
• Personalized environment control
• Business to business middelware
• Web-services application based on XML exchange
•
•
•
•
•
Monitoring the real-world environment (pollution, traffic)
Monitoring the data flow in an ISP
Monitoring web-traffic behaviour
Monitoring the load on a telecom switch
Monitoring external news-feeds
Application domains
• Personalized
• Personalized
• Personalized
•
•
•
•
•
•
•
QUERYING
middelware
on XML exchange
Monitoring
Monitoring
Monitoring
Monitoring
Monitoring
WEB SERVICES
STREAM UPDATE
Continuous queries
• Continous query – the user observes the changes made to
the database through a query
– Query registration once
– Continously up-to-date answers.
Continuous
queries
RDBMS
Data Streams
• Data streams
– The database is in constant bulk load mode
– The update rate is often non-uniform
– The entries are time-stamped
– The source could be web-service, sensor, wrapped
source
Dataentry
application
DSMS
DSMS
Data Stream Management Systems (DSMS) support
high volume update streams and real-time response
to ad-hoc complex queries.
What can be salvaged from the DBMS core technology ?
What should be re-designed from scratch ?
Dataentry
application
DSMS
Informed
reporting
DBMS versus DSMS
• Persistent relations
• Transient streams
• Transaction oriented
• Query orientation
• One-time queries
• Continuous queries
• Precise query answering
• Best-effort query answering
• Access plan determines
physical database design
• Unpredictable data
characteristics
Old technology to rescue?
• Many stream based applications are low-volume with
simple queries
– Thus we can live with automatic query ‘refresh’
• Triggers are available for notification of changes
– They are hooked up to simple changes to the datastore
– There is no technology to merge/optimize trigger
groups
Outline of remainder
• Query processing over multiple streams
DSMS
• Organizing hundreds of ad-hoc queries
DSMS
• Sensor-network based querying
DSMS
A stream application
• [Widom] Consider a network traffic system for an ISP
• with customer link and backbone link and two streams
• keeping track of the IP traffic
A stream application
• [Widom] Consider a network traffic system for an ISP
• with customer link and backbone link and two streams
• keeping track of the IP traffic
TPc(saddr, daddr, id, length, timestamp)
TPb(saddr, daddr, id, length, timestamp)
PTc
PTb
DSMS
A stream application
• Q1 Compute the load on the backbone link averaged over
one minute period and notify the operator when the load
exceeds a threshold T
Select notifyoperator(sum(length))
From PTb
Group By getminute(timestamp)
Having sum(length) >T
With low stream flow it could be handled with a DBMS trigger,
Otherwise sample the stream to get an approximate answer
A stream application
• Q2 Find the fraction of traffic on the backbone link coming
from the customer network to check cause of congestion.
( Select count(*)
From PTc as C, PTb as B
Where C.saddr = B.saddr and C.daddr=B.daddr
and C.id=B.id ) /
( Select count(*) From PTb)
Both streams might require an unbounded resource to perform the join,
which could be avoided with an approximate answer and synopsis
A stream application
• Q3 Monitor the 5% source-to-destination pairs in terms of
traffic on the backbone.
With Load As (Select saddr, daddr,sum(length) as traffic
From PTb Group By saddr,daddr)
Select saddr, daddr, traffic
From Load as l1
Where (Select count(*) From Load as l2
Where l2.traffic <l1.traffic) >
(Select 0.95*count(*) From Load)
Order By Traffic
This query contains ‘blocking’ operators
STREAM architecture
TPc
DSMS
Answer
Store
Answer
TPb
Scratch
Area
Trash
• Q1 Compute the load on the backbone link averaged over
one minute period and notify the operator when the load
exceeds a threshold T
Select notifyoperator(sum(length))
From PTb
Group By getminute(timestamp)
Having sum(length) >T
The answer store area simply needs an integer
• Q2 Find the fraction of traffic on the backbone link coming
from the customer network to check cause of congestion.
( Select count(*)
From PTc as C, PTb as B
Where C.saddr = B.saddr and C.daddr=B.daddr
and C.id=B.id ) /
( Select count(*) From PTb)
The scratch area should maintain part of the two streams to
implement the join. Or a complete list of saddr and daddr.
Joining two tables
RelA
Nested loop join
RelB
Joining two tables
RelA
Nested loop join
RelB
Joining two stream
PTa ……..
Nested loop join
PTb ……..
An unbounded store would be required
Joining two stream
PTa ……..
merge join
PTb ……..
If the streams are ordered a simple merge join is possible
With limited resource requirements
Joining two stream
window
PTa ……..
histogram
Join synopsis
histogram
PTb ……..
A statistical summary could provide an approximate answer
• Q3 Monitor the 5% source-to-destination pairs in terms of
traffic on the backbone.
With Load As (Select saddr, daddr,sum(length) as traffic
From PTb Group By saddr,daddr)
Select saddr, daddr, traffic
From Load as l1
Where (Select count(*) From Load as l2
Where l2.traffic <l1.traffic) >
(Select 0.95*count(*) From Load)
Order By Traffic
The scratch area should maintain part of the two streams to
implement the join.
Finance
• [DeWitt] Consider a financial feed where thousands of
clients can register arbitrary complex continues queries.
– XML stream querying
XML
DSMS
Finance
• Q5 Notify me whenever the price of KPN stock drops
below 6 euro
Select notifyUser(name, price)
From ticker t1
Where t1.name = “KPN” and t1.price < 6
Finance
• Q5 Notify me whenever the price of KPN stock drops by
5% over the last hour
Select notifyUser(name, price)
From ticker t1,t2
Where t1.name = “KPN” and t2.name= t1.name
and getminutes(t1.timestamp-t2.timestamp) <60
and t1.price < 0.95 * t2.price
Finance
• Q6 Notify me whenever the price of KPN stock drops by 5% over the
last hour and T-mobile remains constant
Select notifyUser(name, price)
From ticker t1,t2, t3,t4
Where t1.name = “KPN” and t2.name= t1.name
and getminutes(t1.timestamp-t2.timestamp) <60
and t1.price < 0.95 * t2.price
and t1.timestamp=t3.timestamp and t2.timestamp=t4.timestamp
and t3.name = “T-Mobile” and t4.name= t3.name
and getminutes(t3.timestamp-t4.timestamp) <60
and t3.price = t4.price
Query signatures
• Traditional SQL applications already use the notion of
parameterised queries, I.e. some constants are replaced by
a program variable.
– Subsequent calls use the same query evaluation plan
• In a DSMS we should recognize such queries as quick as
possible
– Organize similar queries into a group
– Decompose complex queries into smaller queries
– Manage the amount of intermediate store
Finance
• Queries can be organized in groups using a signature and
evaluation can be replaced by single multi-user request.
Select notifyUser(name, price)
From ticker t1
Where t1.name = “KPN” and
t1.price < 6
Client
Name
Threshold
192.871.12.1 KPN
6
192.777.021 ING
12
Price
Finance
• Queries can be organized in groups using a signature and
evalution can be replaced by single multi-user request.
Select notifyUser(c.client, t1.name, t1.price)
From ticker t1, clients c
Where t1.name = c.name and t1.price < c.price
Client
Name
Threshold
192.871.12.1 KPN
6
192.777.021 ING
12
Price
Finance
• Timer-based queries call for a stream window with
incremental evaluation
• Multiple requests can be organized by time-table and event
detection methods provided by database triggers.
Select notifyUser(name, price)
From ticker t1,t2
Where t1.name = “KPN” and t2.name= t1.name
and getminutes(t1.timestamp-t2.timestamp) <60
and t1.price < 0.95 * t2.price
Finance
• Complex queries can be broken down into independent components
Select notifyUser(name, price)
From ticker t1,t2, t3,t4
Where t1.name = “KPN” and t2.name= t1.name
and getminutes(t1.timestamp-t2.timestamp) <60
and t1.price < 0.95 * t2.price
and t1.timestamp=t3.timestamp and t2.timestamp=t4.timestamp
and t3.name = “T-Mobile” and t4.name= t3.name
and getminutes(t3.timestamp-t4.timestamp) <60
and t3.price = t4.price
Finance
• Intermediate results should be materialized. Can be
integrated in tradition query evaluation schemes
t1.timestamp=t3.timestamp and t2.timestamp=t4.timestamp
Sensor networks
• [Madden] Sensor networks are composed of thousands of
small devices, interconnected through radio links. This
network can be queried.
– Sensors have limited energy
– Sensors have limited reachability
– Sensors can be ‘crushed’
DSMS
Aggregate Queries Over Ad-Hoc Wireless Sensor Networks
Sensor networks
• Q7 Give me the traffic density on the A1 for the last hour
Select avg(t.car)
From traffic t
Where t.segment in (Select segment From roads
Where name = “A1”)
Group By gethour(t.timestamp)
Sensor networks
• The sensors should organize themselves into a P2P
infrastructure
• An aggregate query is broadcasted through the network
• Each Mote calculates a partial answer and sent it to its
peers
• Peers aggregate the information to produce the final
answer.
• Problems
– The energy to broadcast some information is high
– Tuples and partial results may be dropped
Conclusions and outlook
• Data stream management technology require changes in
our expectation of a DBMS functionality
– Queries not necessarily provide a precise answer
– Queries continue as long as we are interested in their
approximate result
– The persistent store not necessarily contains a
consistent and timeless view on the state of the
database.
Conclusions and outlook
• Datastream management technology capitalizes upon
proven DBMS technology
• DSMS provide a basis for ambient home settings, sensor
networks, and globe spanning information systems
• It is realistic to expect that some of the properties to
support efficient datastream management will become part
of the major products
– Multi query optimization techniques should be added.
Literature
• NiagaraCQ: A Scalable Contious Query System for
Internet Databases, J. Chen, D.J. deWitt, F. Tian, Y. Wang,
Wisconsin Univ.
• Streaming Queries over Streaming Data , Sirish
Chandrasekaran, Michael J. Franklin, Univ Berkeley
• Continous Queries over Data Streams, S.Babu, J. Widom,
Stanford University