Sliding - UCLA Computer Science

Download Report

Transcript Sliding - UCLA Computer Science

Data Stream Management Systems
CS240B Notes
by
Carlo Zaniolo
1
Data Streams
 Continuous, unbounded, rapid, time-varying
streams of data elements
 Occur in a variety of modern applications
Network monitoring and traffic engineering
Sensor networks, RFID tags
Telecom call records
Financial applications
Web logs and click-streams
Manufacturing processes
 DSMS = Data Stream Management System
2
Many Research Projects …
 Amazon/Cougar (Cornell) – sensors
 Aurora (Brown/MIT) – sensor monitoring, dataflow
 Hancock (AT&T) – Telecom streams
 Niagara (OGI/Wisconsin) – Internet DBs & XML
 OpenCQ (Georgia) – triggers, view maintenance
 Stream (Stanford) – general-purpose DSMS
 Tapestry (Xerox) – pubish/subscribe filtering
 Telegraph (Berkeley) – adaptive engine for sensors
 Tribeca (Bellcore) – network monitoring
 Stream Mill (UCLA) - power & extensibility
 Gigascope: AT&T Labs – Network Monitoring
3
The (Simplified) Big Picture
Clients
Register
Query
Streamed
Result
DSMS
Server
Input streams
Archive
Scratch Store
Stored
Relations
4
Databases vs Data Streams
Database Systems
Data Stream Systems
 Model: persistent data
 Model: transient data
 Table: set|bag of tuples
 Updates: All
 Query: transient
 Query Answer: exact
 Query Eval. multi-pass
 Operator: blocking OK
 Query Plan: fixed
 Infinite sequence of tuples
 Updates: append only
 Query: persistent
 Query Answer: Often approx
 Query Eval. one-pass
 Operators: unblocking only
 Query Plan: adaptive
5
Research Challenges
 Data Models
Relational Streams first, XML streams important too
Tuple-Time Stamping
 Order is important
Windows or other synopses
 Query Languages: SQL or XQUERY + extensions
Blocking operators and Expressive Power
 Query Plans:
Optimized scheduling for response time or memory
 Quality of Services (QoS) & Approximation
Load shedding, sampling
 Support for Advanced Applications
Data Stream Mining
6
Data Models
Relational Data Streams
Each data stream consists of relational tuples
 The stream can be modelled as an append-only
relation
But repetitions are allowed and order is very
important!
 Order based on timestamps—or arrival order
Streaming XML Data.
A stream of structured SAX elements
7
Timestamps
 Data streams are (basically) ordered according to their timestamps
 The meaning of windows, unions an joins is based on timestamps
 External
 Injected by data source
 Model real-world event represented by tuple
 Tuples may be out-of-order, but if near-ordered can reorder with
small buffers
 Internal
 Introduced as special field by the DSMS
 Approx. based on the time they arrived
 Missing (called latent in Stream Mill)
 The system assigns no timestamp to arriving tuples,
 But tuples are still processed as ordered sequences
 By operators whose semantics expects timestamps…
 Thus operators might instantiated timestamps as/when needed
8
Data Stream Query Languages
Continuous queries and
Blocking Operators
9
Query Operators: Sample Stream
Traffic (sourceIP,
sourcePort,
destIP,
destPort,
length ,
time
%source IP address
%port number on source
% destination IP address
% port number on destination
%length in bytes
% time stamp
);
10
Blocking Query Operators
 No output until the entire input has been seen—i.e., the last
tuple in the input, … often detected after we hit the EOF.
 Streams – input never ends: thus blocking operators cannot
be used as such
 Traditional SQL aggregates are blocking
 Many SQL operators have DBMS implementations that are
blocking but are not intrinsically blocking
 group by, sort join can be implemented in blcoking and
nonblocking ways
 Other operators are intrinsically blocking
 Can we formally characterize which is which?
We will see that nonblocking operators are the monotonic ones
11
Problematic Operators for Data Streams
 Blocking query operators—i.e., those that must
see everything in the input before they can return
anything in the output
 NonBlocking query operators are those that can
return results now, without seeing the rest of the
stream
 Selection and projection are nonblocking
 Set Difference, and Traditional aggregates are
blocking
 Continuous aggregates are not.
12
Aggregate Invocation: two Forms
 Traditional
select G, F1 from S where P
group by G having F2 op J
G: grouping
attributes,
F1,F2:
aggregate
expressions
 With windows (SQL:2003 OLAP Functions)
traffic (sourceIP, sourcePort, destIP , destPort, length, Time)
select sourceIP, Time, avg(lenght) over(order by Time,
partition by sourceIP 50 rows preceding)
Cumulative (running) window:
... over(order by Time,
partition by sourceIP unlimited preceding)
13
Aggregate Function Properties
1. distributive: sum, count, min, max
2. algebraic: AVG
3. holistic: count-distinct, median
4. On-line aggregates such as exponentially decaying AVG
5. User-Defined Aggregates (UDAs)
 Sliding window invocation 1—2. Efficient computation for
memory and CPU
 Sliding window invocation on 3 ?
 Continuous window on these ? Yes, also for 5.
 UDAs can be similar to any of those
14
Avoiding Blocking Behavior
 Windows: aggregates on a limited size window are
approximate and nonblocking
 DSMS do windows of all kinds:
 Sliding windows (same as OLAP functions)
 Tumbles: restart every new window (traditional
definition)
 Panes: the window is broken up into panes
 Punctuation
[Tucker, Maier, Sheard, Fegaras]
Assertion about future stream contents
Unblocks operators, reduces state
 Construct used for avoiding blocking are also
useful for avoiding infinite memory
15
Joins
 General case problematic on streams: May need to join
arbitrarily far-apart stream tuples
 Equijoin on timestamps is easy to compute—but not very useful
 Majority of work focuses on joins between one stream and a
window specified on the other
Select A.sourceIP, B.sourceIP
from Traffic1 as A [window TA],
Traffic2 as B
where A.destIP = B.destIP
 The symmetric case also common
… Traffic2 as B [window TB] …
 Multi-joins less common but possible.
16
Join of Stream S with a Table T
(where T is a DB relation or a Window on a Stream)
When a new tuple z with timestamp ts(z)
arrives in S, join it with all the tuples in T.
-
ts(z) is the timestamp of tuples so produced
If T is a window on a stream S’
 T must contain all the tuples up to ts(z) included:
cumulative window on S’
 But we do not have infinite memory: so we must
approximate T with a synopsis.
E.g., 30 minutes preceding
17
Multi-way Sliding Window Joins
 Evaluation of n-way sliding window joins queries
 n streams with associated sliding windows
 continuously evaluate the joins of all n windows
 Two natural joins strategies
 eager: join is evaluated each time a new tuple arrives in any
of the input streams
 lazy: join is evaluated with some pre-specified frequency,
e.g., every t time units
 Computation incremental, as in differential fixpoint
of recursive rules.
18
Query Optimization
and Scheduling
 Sceduling to minimize response time or
minimize memory—no real change in CPU
time
Optimization based on sharing, query
plans, operators, buffers, …
19
A Query Plan
Q1
Q2

⋈
Scheduler
Given – query
plan and
selectivity
estimates
Schedule –
tuples through
operator chains
⋈
Stream3
Stream1
Stream2
20
Schedulers and QoS Metrics
Round Robin (RR) is perhaps the most basic
 operators in a circular queue are given a fixed
time slice.
Starvation is avoided, but little adaptivity
FIFO: takes the first tuple in input and
moves it through the chain
Minimal latency, poor memory
Greedy Alogrithms:
Buffers with most tuples first
Tuples that waited longest first
Operators that release more memory first
21
Memory Optimization on a Chain
[Babcock, Babu, Datar, Motwani]
Output
selectivity = 0.0
σ3
selectivity = 0.6
σ2
selectivity = 0.2
Net Selectivity
σ1
best slope
σ2
starvation point
σ3
σ1
Time
Input
22
Main ideas
 Operators are thought of as filters which
Operate on a set of tuples
Produce s tuples in return
 s  selectivity of an operator
 If s = 0.2 we can interpret the value in two ways
Out of every 10 tuples, the operator outputs 2 tuples
If the input requires 1 unit of memory, the output will
require 0.2 units of memory
23
The lower envelope
 Imagine there is a
line from this
point to every
operator point (ti,
si) to its right
 The operator that
corresponds to the
line with the steepest
slope is called the
“steepest descent
operator point”
24
The Lower Envelope
 By starting at the
first point (t0, s0)
and repeatedly
calculating the
steepest descent
operator point we
find the lower
envelope P’ for a
progress chart P
 Notice that the
slopes of the
segments are nonincreasing
 The operators in
each segment form
a chain.
 FIFO within chain
 Greedy across
chains
25
Scheduling
 Chain minimizes memory be required in special overload
situations
 But increases response time (latency)
 Typically though we want to optimize for response time
 Different scheduling protocols optimize different objectives:
latency, inaccuracy, memory use, computation, starvation, …
 Computation complexity is independent from scheduler
 Different policies give significantly different results only for
bursty loads
 Research Issues:
Complex query plans (beyond simple paths)
Minimization of response time
Adaptive strategies: how do we switch between the two to
adapt to load changes?
26
Optimization by Sharing
In traditional multi-query optimization:
sharing (of expressions, results etc) among queries
can lead to improved performance
Examples:Similar issues arise when processing
queries on streams:
sharing of query operators and expressions
 sharing of sliding windows
27
Multi-query Processing on Streams
 Opportunities for optimization when windows are
shared---e.g:
select sum (A.length)
from Traffic1 A [window 1hour], Traffic2 B [window 1 hour]
where A.destIP = B.destIP
select count (distinct A.sourceIP)
from Traffic1 A [window 1 min], Traffic2 B [window 1 min]
where A.destIP = B.destIP
 Strategies for scheduling the evaluation of shared
joins:
 Largest window only
 Smallest window first
 Process at any instant the tuple that is likely to benefit the
largest number of joins (maximize throughput)
28
Shared Predicates
Predicates
for R.A
>
7
1
R.A > 1
R.A > 7
R.A > 11
R.A < 3
R.A < 5
[Niagara, Telegraph]
11
A>1
<
A>7
R.A ≠ 9
=
≠
Tuple
A=8
3
A<3
R.A = 6
R.A = 8
A>11
A<5
6
8
9
29
QoS and Load Schedding
When input stream rate exceeds system capacity
a stream manager can shed load (tuples)
 Load shedding affects queries and their answers
 Introducing load shedding in a data stream
manager is a challenging problem
 Random and semantic load shedding
30
DSMS
Quality of Service (QOS)
Approximation and
Load Shedding
31
QOS via Synopses and Approximation
 Synopsis: bounded-memory history-approximation
Succinct summary of old stream tuples
Like indexes/materialized-views, but base data is
unavailable
Examples
Sliding Windows
Samples
Sketching techniques
Histograms
Wavelet representation
 Approximate Algorithms: e.g., median, quantiles,…
 Fast and light Data Mining algorithms
32
QoS and Load Schedding
When input stream rate exceeds system capacity
a stream manager can shed load (tuples)
 Load shedding affects queries and their answers:
drop the tasks and the tuples that will cause least
loss
 Introducing load shedding in a data stream
manager is a challenging problem
 Random load shedding or semantic load shedding
33
XML Data Streams
34
XML Data Streams: Applications
•
An XML data stream is a sequence of tokens
• Data and application integration
• Distributed monitoring of computing systems
• Message-based web services
• Purchase orders, retail transactions
• Personalized content delivery
35
XML Streams: Data Model
XML data: tree structure
Data stream: ~ SAX events
<Purchase_Doc>
<PR_Number val = “50”/>
<Supp_Name>ABC</Supp_Na
me>
<Address>
<City>Florham Park</City>
<State>New Jersey</State>
</Address>
<Line_Items>
<Item>
<Part_Number val= “1050”/>
<Quantity val=“20”/>
</Item>
[element Purchase_Doc
anyType]
[element PR_Number anyType]
[attribute val anySimpleType]
[chardata 50]
[end-attribute]
[end-element]
[element Supp_Name anyType]
[text ABC]
[end-element]
…
36
XML Query Languages
XML query languages
 Xquery, XSLT, Xpath
 Declarative matching of structured data and text
 Easy restructuring to meet needs of data
consumers
37
XML Streams: research Issue
Efficient Processing of single/multiple
queries (e.g., Xfilters/Yfilters)
Blocking operators/constructs in XQuery—
e.g., XQuery new function definition
mechanisms are blocking
Integration of relational and XML DSMS—
just like relational and XML DBMS are now
being intergrated.
38
Prototype Systems
Aurora (Brandeis, Brown, MIT) [CCC+02]
 Gigascope (AT&T) [CJSS03]
 Hancock (AT&T) [CFP+00]
 STREAM (Stanford) [MWA+03]
 Telegraph (Berkeley) [CCD+03]
 … Stream Mill [UCLA]
39
Aurora (Brandeis, Brown, MIT)
 Geared towards monitoring applications (streams,
triggers, imprecise data, real time requirements)
 Specified set of operators, connected in a data flow
graph
 Optimization of the data flow graph
 Three query modes (continuous, ad-hoc, view)
 Aurora accepts QoS specifications and attempts to
optimize QoS for the outputs produced
 Real time scheduling, introspection and load shedding
40
AT&T: Hancock and Gigascope
Hancock: A C-based domain specific language which
facilitates signature extraction from transactional data
streams.
 Signature: charetizes behavior of customer or services
 Support for efficient and tunable representation of signature
collections
 Support for custom scalable persistent data structures
 Elaborate statistics collection from streams
 Gigascope: SQL based DSMS for monitoring of network
data
41
STREAM [Stanford Uiversity]
General purpose stream data manager
 CQL (continuous query language) for
declarative query specification
 Consider query plan generation
 Resource management:
 Operator scheduling
 Static and dynamic approximations
42
Telegraph [UCB]
Continuous query processing system
 Support for stream oriented operators
 Support for adaptivity in query processing
Various aspects of optimized multi-query
stream processing
43
Commercial Systems
 Sybase: publish-subscribe using MQ (Memory
Queues)
MQs: are in-memory tables processed using active rules
and stored procedures
 Similar solutions in Oracle and Teradata. But IBM's
MQSeries, Microsoft's MSMQ are web-service oriented:
Java Message Service (JMS), WebSphere, CORBA.
 Two DSMS startups:
CORAL8: http://coral8.com/
Streambase: http://www.streambase.com/
44
More Tutorial Talks
Brian Babcock, Shivnath Babu, Mayur Datar, Rajeev Motwani,Jennifer
Widomhttp://theory.stanford.edu/~rajeev/pods-full-talk.ppt
Nick Koudas and Divesh Srivastava. Data stream query processing.
Tutorial presented at International Conference on Very Large
Databases (VLDB), 1149, 2003. [ PDF | talk slides (PDF)
Nick Koudas et al. Matching XML Documents Approximately (with S.
Yahia and D. Srivastava) Tutorial delivered at ICDE 2003
Nick Koudas et al. Stream Data Management: Research Directions and
Opportunities. Invited Talk at IDEAS 2002.
Nick Koudas et al. Mining Data Streams (with S. Guha) Invited
Tutorial delivered at PAKDD 2003
45
Implementation Approaches for
Continuous Queries on Streaming XML
 Automata-based techniques:
 XFilter [AF00]: finite state machine per path
expression
 XTrie [CFGR02]: shares common sub-paths of PC
paths
 YFilter [DF03]: single NFA for all path
expressions
 [GMOS03]: single DFA, limitations on flexibility
 XPush [GS03]: pushdown automaton for tree
patterns
 Index-based techniques:
 MatchMaker [LP02]: shared tree patterns
 IndexFilter [BGKS03]: shared path expressions,
46
comparison
XML Stream Processing: Key Ideas
 Obtain bindings of for clause path expression
variables
 Ordered sequence, no duplicates
 Filter bindings using where clause path expression
predicates
 Existential check suffices
 Compute bindings of return clause path expressions
 Ordered (possibly null) sequence
 Goal: Efficient matching/binding of XML path
expressions
 Very large number of path expressions
47