How to Build a Stream Database
Download
Report
Transcript How to Build a Stream Database
How to Build a Stream Database
Theodore Johnson
AT&T Labs - Research
What is a stream database?
• Query data from a stream
– A data feed with a schema
– You can also query conventional relations
• Examples
– Sensor data
– Stock market quotes
– Network monitoring data …
• Querying a stream forces some changes to the DBMS:
– Must use push-based rather than pull-based operators
– Must be able to provide partial answers
• E.g., you never finish the query
– One-pass
• E.g., you cannot (in general) rewind the stream.
Stream Databases for Network Measurements
• Continuing need to measure and monitor networks
– Router configuration, debugging, detect network
attacks, verify service agreements, ….
• Very large amounts of data
– In principle, we’d like to query every packet flowing in
the network
– And in real time
• Data arrives in streams
– IP streams, NetFlow streams, SNMP streams, ..
• Special queries : grouping by subsequences
– IP packets forming a flow, forming a TCP/IP session,
forming a user’s interactions, …
Query Language
• Typical queries:
– For each source IP address and each 5 minute interval,
count the number of bytes and number of packets
related to HTTP transfers
– Find the TCP/IP SYN packets with and without
matching FIN packets
– Compute the NetFlows in the packet stream, using a 30second timeout between packets
• Pervasive use of time and sequence.
• We would like to express these queries using a
minimal change to SQL.
• We will rely on the query optimizer making use of
ordering properties of the data streams.
Basics
• Selection, projection, join, group-by, aggregation, etc.
– Mix stream with tables
• Some restrictions to ensure that we can answer the query in
limited space
– Join : When joining streams, the join predicate must define a
window in which the join must occur
• E.g. match SYN packets on an inbound link with SYNACK on an
outbound link.
– Group-by and Aggregation : We must be able to determine when
all tuples for a group have been processed
• E.g., number of packets during each 30 second interval
• More on this later.
Complex Aggregation
• Grouping Variables
– Analogous to table variables
– Represents the value of a correlated subquery
– Only aggregate values can be referenced
• Example:
Select SourceIP, tb, (count(*)+count(X)/2+count(Y)/4)/1.75
From Packets
Group By SourceIP, [ts/60, ts/60+1,ts/60+2] as tb, X, Y
Such that
X.SourceIP=SourceIP and X.ts/60+1=tb
Y.SourceIP=SourceIP and Y.ts/60+2=tb
X represents the query
Select * from Packets
where SourceIP=$SourceIP and ts/60+1 = $tb
Defining Sequences
• Count the packets in connection K between the
SYN packet and the FIN packet
Select K, ts, count(Y) from TCPIP
Where SYN=1
Group by K, ts : X, Y
Such That
X.K = K and X.ts > ts and X.FIN = 1
Y.K = K and Y.ts >= ts and Y.ts <= MIN(X.ts)
Ordering Properties
• The query language lets us express queries that
seem to require self-joins, etc.
• But the queries frequently have a temporal
component: timestamps as group-by variables,
timestamps in the join predicates, etc.
• If we can reason about timestamps, we can find a
stream evaluation plan for these queries
– But not all …
• We want to avoid cumbersome model restrictions,
e.g. sequence databases
• We want precise semantics, e.g. avoid “continuous
query” models.
Temporal Properties
• Define ordering properties on attributes of a stream.
– Allow for multiple ordering properties, e.g. multiple timestamps,
start time vs. end time, timestamp vs. sequence number, etc.
• Many types of ordering properties
– Increasing, nondecreasing, …
– Increasing within delta, banded-increasing(epsilon)
– Increasing in group G …
• Ordering properties are part of the data type.
Stream TCPIP{
Ullong timestamp {increasing};
Uint SourceIP;
…
Uint SequenceNbr {increasing_in_group(SourceIP, …) };
…
}
Stream Operators
• Power of relational algebra : closed algebra.
– Enable the composition of complex queries
– E.g., COUNT DISTINCT is a COUNT(*) over a GROUP BY
• Need stream operators which produce streams
– That is, we can deduce ordering properties of the output
• We have defined ordering properties to capture semantics
of the output of operators
– Increasing in group G : group-by and aggregation
– Banded-increasing : window join.
• Implementation detail : special operators
– Emulate complex network protocols, e.g. IP defragmentation
Basic Operators
• Selection, projection, non-stream join, etc.
– Scalar expressions : perform type imputation on
temporal properties, e.g. timestamp/5000 is nondecreasing
• Join between two streams:
– The join predicate must define a window between
ordered attributes
• E.g. R.ts BETWEEN(S.ts, S.ts+epsilon)
– Join algorithm can trade off buffer space for improved
ordering properties.
• R.ts and S.ts banded-increasing, vs. R.ts (S.ts) increasing and
S.ts (R.ts) banded-increasing.
Additional Operators
• Stream Union : Merge two streams
– Preserve an ordering property
• Stream sort
– Improve an ordering property
• User-defined operators
Group-by and Aggregation
• We need to determine when to open and when to flush
groups based on the tuple stream
– GOPEN(t,G) : set of groups to create when tuple t arrives, and the
set of groups is G.
– GCLOSE(t,g): returns TRUE when if group g will not receive any
further tuples, based on attributes of t.
• Complex aggregation : Each aggregate has an associated
predicate. A tuple contributes to the aggregate only if it
satisfies the predicate.
– Note: In this general this predicate defines a join condition
between G and the tuple stream.
– Correlated aggregates : In some cases (especially when defining
sequences) we can even compute correlated aggregates.
• Recall the example on slide 7.
Optimization
• Conventional optimization
– Push selection, projection as low as possible
– Join order optimization
• Operator-specific optimization
– Better implementations …
– Search for predicates which allow operator-specific
optimizations
• Temporal property optimization
– Ordering properties of input vs. operator speed vs.
ordering properties of the output.
Gigascope
• Fast and flexible network monitor
– Submit SQL-like queries to obtain a monitoring stream
– Monitor Gigabit Ethernet (1Gbps X 2 directions)
• Aggressive optimizations
– Execute some or all of the queries in the Network Interface Card
(NIC)
• Goals
– Execute queries over every byte of every packet in the link.
– Layer-7 queries
• Reconstruct TCP sessions, interpret streaming media control traffic,.
Etc.
• Gigascope is the motivation for the stream database
research.
• Demo in SIGMOD 2002
Gigascope Architecture
• Stream database
– Registry : record semantics of the executing query nodes.
– Stream manager : route tuples between query nodes, application
• Two layer architecture
– Low-level queries : input is a sniffed packet stream.
– High-level queries : input is a tuple stream.
HQ1
HQn
App1
Appm
Registry
Stream Manager
NIC1
lq1
lqn
lq1
lqn
NIC2
Query Processing Architecture
• Query nodes represent a single-block query, and are
generated code.
• All query nodes live in a run-time system, and follow an
API
– Callbacks : initialize, accept_tuple, accept_command, free
– Functions : post_tuple, standard and user-defined functions
• Low-level queries
– Limited set of query nodes (selection/projection, aggregation)
– Tight constraints on resource usage
• High-level queries
– Much wider variety of operators
– Use operator templates, specialize with generated functors.
– Accept_tuple callback routes tuples through operators in the query
node.
Splitting a query
• Network packets are presented only to low-level
queries
• The NIC has two 88Mhz processors, but only
1Mbyte of memory.
– Limited set of operators, available functions, etc.
• If a query cannot be executed entirely in the NIC,
it is split into low-level queries and high-level
queries
– That is, perform as much selection as possible in the
NIC
– Also perform partial aggregation. Complete the
aggregation in a high-level query.
Generating Code
• Parse the query
– Flex, Bison. Build the parse tree.
• Analyze the parse tree
– Build symbol tables
• Table references, column references, group-by variables, aggregate references,
etc.
– Determine type of query
• Selection, join, aggregation, etc.
– Analyze the predicates
• Convert to CNF
• Build query nodes (and query plan)
– Fill in placeholders (the selection predicate, etc.)
• Split the query
– Result is one or more queries
• Optimize the query plan
• Perform further code-generation time analysis
• Generate the code
Other nice features
• Every query can accept parameters
– Necessary flexibility, because changing low-level queries requires
rebuilding the RTS.
• More generally, each query accepts commands
– Load new parameters, report statistics (and errors), etc.
– High-level queries relay the command to the low-level queries.
• Stream-based architecture
– Easy to add nested queries on-the-fly
– Easy extension to distributed queries (we think)
• Executables are self-documenting
– The source code contains the schema and the query
– Library for parsing and interpreting the query.
Any Questions?