Transcript Document
GLAST Data Handling Pipeline
Daniel Flath, Stanford Linear Accelerator Center
For the GLAST Collaboration
Overview
The GLAST Data Handling Pipeline (the Pipeline) provides a uniform interface to the diverse data handling needs of the GLAST collaboration. Its goal is to generically process graphs of dependent tasks, maintaining a full record of its state, history and data
products. It will be used to process the down-linked data acquired from the satellite instrument after launch in 2007, acquiring the data from the Mission Operations Center and delivering science products to the GLAST collaboration and the Science Support
Center. It is currently used to perform Monte Carlo Simulations, and Analysis of data taken from the instrument during integration and commissioning.
This historical information serves as the base from which formal data catalogs will be built. In cataloging the relationship between data, analysis results, the software versions that produced them, as well as statistics (memory usage, cpu usage) of the processing
we are able to fully track the provenance of all the data products. We are then able to investigate and diagnose problems in software versions and determine a solution which may include reprocessing of data.
Data reprocessing is inherently simple with the Pipeline. A representation of the processing chain and data flow is stored internally in ORACLE and specified by the user in an xml file which is uploaded to the Pipeline with an ancillary utility. Minor changes to the
xml file allow for similar processing to be retried using a patched version of code.
The bulk of code is written in Oracle PL/SQL language in the form of stored procedures compiled into the Database. A middle tier of code linking the database to disk access and the SLAC LSF Batch Farm (3000+ CPUs) is implemented in perl. An automatically
generated library of perl subroutines wrap each Stored Procedure and provide a simple entry-point interface between the two code-bases.
We will address lessons learned in operations of the Pipeline in terms of needed configuration flexibility and long term maintenance.
Stored Procedures & Functions
Did you know?
PROCEDURE deleteTaskProcessByPK(TaskProcess_PK_in IN int) IS
RecordInfoList CURSORTYPE;
RI_PK_iter RecordInfo.RecordInfo_PK%TYPE;
BEGIN
-- Delete the RI_TaskProcess and RecordInfo Rows:
OPEN RecordInfoList FOR
select RecordInfo_FK
from RI_TaskProcess
where TaskProcess_FK = TaskProcess_PK_in;
Oracle has an internal language called PL/SQL
•PL/SQL is a strongly typed language, much like
pascal, that blends pure SQL with conditional and
program logic
•PL/SQL supports both stored procedures that
perform an operation and stored functions that return
a scalar value or cursor containing table rows
•
•
•
Run states in blue
Job states in gold (1 Run has a sequence of jobs)
Utility scripts in Pink
1.
2.
New run(s) created with createRun utility
Scheduler wakes up and collects waiting runs
•
Run promoted to Running state
•
Job Prepared and passed to BatchSub facility
(see N. Golpayegani poster)
•
Error results in job failure
BatchSub scheduler wakes up and collects waiting jobs
•
Job submitted to LSF compute farm
•
Submission error results in job failure, or
•
Pipeline notified via callback, marks job Submitted
LSF scheduler decides to run job
•
Job assigned to farm machine
•
LSF failure results in job failure, or
•
BatchSub notified, invokes callback to pipeline which marks
job Started
Job finishes or runs out of CPU allocation
•
BatchSub notified, invokes callback to pipeline with exit
status of job
•
On Failure, pipeline marks job Failed and marks run
End_Failure and processing halts until user intervenes
(see 6. below)
•
On success, pipeline marks job Ended
•
If last job in sequence Run is marked
End_Success (processing done), or
•
If more jobs in sequence Run is marked Waiting
for scheduling of next job
(return to 2.)
Failed run(s) rescheduled with rollBackFailedRun utility
•
Failed job and all data is deleted
•
Run is marked waiting as if last successful job just finished
(return to 2.)
3.
LOOP
•This powerful language allows SQL query results to
be manipulated before returning them, or
bookkeeping to be performed to log user access
history
FETCH RecordInfoList INTO RI_PK_iter;
EXIT WHEN RecordInfoList%NOTFOUND;
delete from RI_TaskProcess
where RecordInfo_FK = RI_PK_iter;
delete from RecordInfo
where RecordInfo_PK = RI_PK_iter;
END LOOP;
•The code is compiled and stored in the database for
rapid execution
•The GLAST Pipeline includes over 130 stored
procedures and functions and over 2000 lines of
PL/SQL code
4.
5.
CLOSE RecordInfoList;
-- Delete TP_DS rows:
delete from TP_DS
where TaskProcess_FK = TaskProcess_PK_in;
•The pipeline does no direct manipulation of
database tables, all database access is
accomplished through the use of these native
subroutines
-- Delete the TaskProcess:
delete from TaskProcess
where TaskProcess_PK = TaskProcess_PK_in;
•Stored PL/SQL can be invoked from perl, python,
java, even JSP via JDBC
6.
END;
Perl PL/SQL Wrappers
•We have developed a utility to generate perl wrapper subroutines for
every stored PL/SQL routine used by the pipeline
•These subroutines incorporate all of the tedious Perl::DBI calls to translate
data types, communicate with the database and handle exceptions
•The user only has to include the generated perl module, and call the
subroutines to gain all functionality that the PL/SQL code provides
IN
End_Failure
Waiting
Run marked running…
•Retrieval during data mining to support analysis
•Schema contains three distinct tables types:
•Regular tables store the bolus of records
•Enumerated tables allow rows in “regular tables” to specify one of several possible metadata values without duplicating strings. This
helps with query speed and promotes consistency
•Relational tables allow many-to-many record relationships between two regular tables
•Management tables for defining pipeline tasks
•Task table defines attributes of a task like real-data-analysis or simulation
IN
IN
•Dataset table defines the data products of a task, including the locations they should be stored
•TPI_DSI entries link TaskProcess and Dataset records to define the read/write relationships between processing and data for a
task
•Instance tables:
# input arguments as hashref with keys=parameters
my $args=shift;
•Each Run table record describes a specific (named) instantiation of processing for some task. There are many runs for every task.
As an example: a monte carlo task may contain 1000 runs each simulating and reconstructing 5000 particle/detector interaction
events
my $dbh=$self->{dbh};
•For every Run, a TPInstance record is created for each TaskProcess in the Run’s parent task. A TPInstance entry records
statistics of the compute-job that did the processing defined in the corresponding TaskProcess.
# function return value:
my $func_result = undef;
•Similarly; for every Run, DSInstance records (recording disk file location and statistics) are created for each file corresponding to a
Dataset Record in the Task that owns the run
# PLSQL function call
my $sth1=$dbh->prepare('
BEGIN
:func_result := DPF.CREATETASK(:TASKTYPE_FK_IN, :TASKNAME_IN, :GLASTUSER_FK_IN,
:NOTATION_IN, :BASEFILEPATH_IN, :RUNLOGFILEPATH_IN);
END;
');
•TPI_DSI records relate the TPInstance and DSInstance processing/data history for a run
•Access to data:
•Management tables are administered by users via an xml upload/download web-based utility
•Tasks are defined in an explicitly specified XML format (backed by an extensible XSD schema)
# Bind Function Return Value:
$sth1->bind_param_inout(":func_result", \$func_result, 0);
•Specify paths to executables
#Binding parameters:
•Specify required input and output data files
$sth1->bind_param(":TASKTYPE_FK_IN", $args->{TASKTYPE_FK_IN});
$sth1->bind_param(":TASKNAME_IN", $args->{TASKNAME_IN});
$sth1->bind_param(":GLASTUSER_FK_IN", $args->{GLASTUSER_FK_IN});
$sth1->bind_param(":NOTATION_IN", $args->{NOTATION_IN});
$sth1->bind_param(":BASEFILEPATH_IN", $args->{BASEFILEPATH_IN});
$sth1->bind_param(":RUNLOGFILEPATH_IN", $args->{RUNLOGFILEPATH_IN});
# execute
$sth1->execute() || die $!;
•Specify order of execution
•Specify LSF batch queue and batch group
•Specify other meta-data (“TaskType”, Dataset Type”, “Dataset File Type”)
•Existing tasks can be downloaded in this xml format, used as a template, and modifications re-uploaded to define a new task. This
saves significant time
•Instance tables are accessed by both pipeline software and a web front end
•Pipeline software access is only by stored procedures and functions through the pipeline scheduler and command line utility scripts
•Scheduler manages and records processing
•Utility scripts allow users to create new runs, reschedule failed runs, etc.
•Web front end using read-only access provides a view of processing to user for monitoring. Processing log-files can be viewed and
if the user notices a problem he/she can then administrate using the utility command-line scripts. Eventually these utilities will be
made available directly from the front-end interface.
# return stored function result:
return $func_result;
}
Web Based User Interface Provides World-wide Administration
•Pipeline monitoring is performed by humans via a web base user interface
consisting of three main views
•The Task Summary View provides an overview of processing for all
tasks (leftmost picture)
•Each task is listed with the status of the runs belonging to it
•Filtering can be performed by task name to reduce the number of
displayed tasks
•The results can be sorted by name or by status counts
•Clicking a task name drills down to the Run Summary View for that
task
•The Run Summary View displays more specific information about the
ongoing processing for a task (middle picture)
•Each stage of processing (TaskProcess) within the task is
displayed along with it’s job instances totaled by processing state
•Clicking on a TaskProcess name brings up the detail view for jobs
of that type
•Clicking on one of the totals brings up the detail view for jobs of
that TaskProcess type in only the corresponding processing state
•The Individual Runs View displays details about the jobs selected in the
Run Summary View (Rightmost picture)
•Statistics for each job are displayed including current status
•FTP Links to processing log-files are provided and can be clicked
to display the corresponding log. These include:
•Log: Job output (stdout/stderr)
•Files: A list of links to all log files for download
•Out: Standard output of pipeline decision logic during job
management and scheduling
•Err: Standard error of pipeline decision logic, if any
•This interface will soon be updated to include buttons which invoke
administrative utilities that are currently available only on the command line,
including deletion of runs, rescheduling of failed runs, scheduling of new runs
…and next job scheduled
Success of
all but final
job marks
run for
scheduling
of next job in
sequence
•Management and administration of active processing, and
•TaskProcess table defines a sequence of processing steps for each task
Any job failure
marks a run
failure
Running
Database
IN
Run marked ready for
rescheduling attempt
Run created and
marked ready for
scheduling
•Schema consists of two tiers:
NUMBER(22)
VARCHAR2
IN
NUMBER(22)
VARCHAR2
IN
VARCHAR2
VARCHAR2
Failed Job
Deleted
rollBackFailedRun
createRun
•Schema is normalized to provide efficient data access for both:
(See Stored Procedures & Functions)
# FUNCTION: CREATETASK
# RETURNS: NUMBER(22)
# INPUT: TASKTYPE_FK_IN
# INPUT: TASKNAME_IN
# INPUT: GLASTUSER_FK_IN
# INPUT: NOTATION_IN
# INPUT: BASEFILEPATH_IN
# INPUT: RUNLOGFILEPATH_IN
sub CREATETASK
{
my $self=shift;
Scheduling / Processing Logic
Prepared
Submitted
Failed
x
Job failure possible
during each running state
Started
Last job success
marks run success
Ended
End_Success