Slawomir Kmiec

Download Report

Transcript Slawomir Kmiec

Parallel Computation of
Skyline Queries
Implementation
COSC6490A Fall 2007
Slawomir Kmiec
Presentation Outline
 Skyline Concepts
 The Parallel Algorithm
 Data & Configuration
 Implementation Details
 Goals and Objectives
 Deminstration & Questions
Skyline Concepts
In a set of points (or records) identify points
that are better than (i.e. not worse than) any
of the others by a given set of their attributes.
Name
Rating
Avg. Price
Parthenon
5
$45.00
Olympus
4
$40.00
Coliseum
4
$30.00
Pyramid
3
$25.00
Bombay
5
$35.00
Paris
5
$40.00
Roma
4
$35.00
Palermo
3
$30.00
Point pa is said to dominate point pb if for all i such that
1 ≤ i ≤ d we have xi(pa) ≤ xi(pb) , and at least one of
those inequalities is strict.
A point p is a skyline point if it is not dominated by any
other point in S. The skyline of S is denoted sky(S).
The Parallel Algorithm
 nested-loops O(d*n2) 10
attributes * 100k→1011
 the choice of the local skyline
algorithm is orthogonal and can
even be dynamic
 d-dimensional data space the
skyline size is O(d!)
 p interconnected and
independent processors with
O(n/p) memory
 Processors can be physically
separate nodes
The Parallel Algorithm (cont.)
 Principles:
→ data divided equally and distributed
→ local skyline is computed at each peer
→ size of the local skyline is shared with
peers
→ if combined results fit on any processor
→ local skylines are exchanged with peers
then
→ processor pi picks ith chunk of the
combined skyline and eliminates points
in it that the combined skyline
dominates
→ local results are sent to the central
process
→ end // of processing
The Parallel Algorithm (cont.)
The Parallel Algorithm (cont.)
 Principles (continued)
→ else // combined results do not fit on
some pi
→ loop until required number of results is
available or all pi have finished do
→ each processor pi picks a random set of
points (in proportion of his local skyline)
→ this set is submitted to all peers that
mark point that they dominate and
marked points are returned to sender
→ each processor pi collects back points
submitted to peers and removes marked
ones from the original set but sends the
remaining ones to the central processor
→ end loop
→ end // of processing
The Parallel Algorithm (cont.)
Data and Configuration
• Configuration file:
localhost,40000
../data/set1.txt
../data/set1.sky
5
localhost,40001
localhost,40002
localhost,40003
localhost,40004
localhost,40005
• Input text file set1.txt:
100000
9
441084,675002,105152,606616,90578,963749,748812,739998,625168
679542,662041,183694,274049,571353,513841,673841,136017,348093
913693,908848,273936,405560,228917,540670,8469,549431,868311
…
• Output text file set1.sky:
990161,447432,254614,908555,355890,594119,35340,149796,191499
178453,428473,872989,121626,57614,318734,748950,287311,124463
673578,11433,327204,110384,946426,887381,714928,51188,511141
170357,699425,57272,468474,988612,425985,193800,234079,641191
…
Implementation Details
• Java classes used:
java.util.List;
java.util.ArrayList;
java.io.InputStreamReader;
java.io.BufferedReader;
java.io.FileReader;
java.io.FileWriter;
java.io.PrintStream;
java.net.InetAddress;
java.net.Socket;
java.net.ServerSocket;
javax.swing.JFrame;
javax.swing.JLabel;
javax.swing.JProgressBar;
javax.swing.JScrollPane;
javax.swing.JTextArea;
• The developed classes:
SkylineMain
SkylineMainListener
SkylineMainHandler
SkylineWorker
SkylineWorkerListener
SkylineWorkerHandler
Implementation Details (cont.)
• 3 types of classes
SkylineMain and SkylineWorker - workflow classes
“Listener” classes - request managing classes
“Handler” classes - request handling classes
SkylineMain
Thread
SkylineMainListener
ServerSocket
SkylineMainHandler
Socket
SkylineWorker
Thread
SkylineWorkerListener
ServerSocket
SkylineWorkerHandler
Socket
Implementation Details (cont.)
SkylineWorkerListener
SkylineWorker parent;
int port;
void run( );
public void run( )
{
ServerSocket listener = new ServerSocket( port );
while ( true )
{
Socket data = listener.accept( );
SkylineMainHandler handler =
new SkylineWorkerHandler( parent, data );
handler.start( );
}
}
Implementation Details (cont.)
SkylineWorkerHandler
SkylineMain parent;
Socket data;
void run( );
void receiveData( );
void receiveLocalSkylineSize( );
void receiveLocalSkyline( );
void receiveChunk( );
void mergeChunk( );
void doTerminate( );
void doStop( );
public void run( ) {
String dataType = dataInp.readLine( );
if ( dataType.equals( "data" ) )
receiveData( );
else if ( dataType.equals( "local_skyline_size" ) )
receiveLocalSkylineSize( );
else if ( dataType.equals( "local_skyline" ) )
receiveLocalSkyline( );
else if ( dataType.equals( "chunk_data" ) )
receiveChunk( );
else if ( dataType.equals( "chunk_result" ) )
mergeChunk( );
else if ( dataType.equals( "stop" ) )
doStop( );
else if ( dataType.equals( "termination" ) )
doTerminate( );
else
System.out.println( "Unsupported data: " + dataType );
data.close( ); }
Implementation Details (cont.)
SkylineWorker
…
…
public void run( ) {
listener.start( );
waitForData( );
calculateLocalSkyline( );
sendLocalSkylineSizeToAll( );
waitForLocalSkylineSizesFromAll( );
if ( niTotal <= npMax ) {
sendLocalSkylineToAll( );
waitForLocalSkylinesFromAll( );
consolidateLocalSkylines( );
selectIthConsolidatedSkylineChunk( );
filterSelectSkylineChunk( );
reportFilterSelectSkylineChunk( );
}
else {
chunkLocalSkyline( );
while ( !stopped && !terminated &&
siChunkIndex * siChunkSize < siLocal.length ) {
sendChunkToAll( siChunkIndex );
waitForChunkFromAll( );
reportFilterSelectSkylinePart( );
}
}
reportEndOfProcess( );
waitForTermination( );
}
Implementation Details (cont.)
• the volume of work – the data described in the
algorithm are very high level and resulted in a
lot of actual work and code to implement them
• stopping and termination – to gracefully handle
the termination of processing when the app
stops i.e. it needed to stop its own data
processing but be open to outside queries, as
well as, when the app terminates and stops
processing its own data and outside queries
• the application was developed so that the
worker processes can run on separate
machines thus the SkylineWorker class needed
to be developed and tested as a standalone
application features included
• it needed to be flexible as well to run for the
runtime given peer and limit configurations
• asynchronous communications and message
broadcast and receipt coordination
Further Goals and Objectives
Can generic reusable higher-level
operations be developed that could be
used in other parallel computations?
all-to-all messaging
all-peer result consolidation
3-threaded processors
transmission of large datasets
process state maintenance and synchronization
Can some a template design pattern be
generalized for similar divide-distributeand-conquer parallel computations?
Can the count of dominated points be
incorporated in the result?
Can idle time on processors be utilized to
assist peers or to do work-ahead or
speculative preprocessing?
Demonstration & Questions
???