Transcript Slides

Data Cloud Frameworks
Author - Shailendra Mishra
Head Data Architecture (Paypal)
Data at Paypal
•
•
•
•
Enable Online, Offline, and Mobile payment
128M customers worldwide
$160B payment volume processed annually
Major retail locations accepting PayPal
20K today  2M end of 2013
• PayPal Here launching in US and international
markets
• PetaBytes of high value data and growing
The Data Landscape
Enterprise Data - Painpoints
•
Enterprise integration platform
•
•
OPPORTUNITIES
•
•
•
Cost effective, scalable ETL server grid
Comprehensive capabilities
Eliminate need for DB Standbys
Minimize multiple versions / copies of data
Enterprise agreement on cross network
architecture for data integration backend.
•
•
•
•
•
Transformation logic in BTEQ scripts in TD
Inefficient handshake between CDC & ETL
Landing data into files multiple times
Limited visibility into impact / lineage
Limited to data movement
Enterprise Data Landscape (ii)
Achievements & Opportunities
•
Enterprise integration platform
•
•
ACHIEVEMENTS
OPPORTUNITIES
•
•
•
Cost effective, scalable ETL server grid
Comprehensive capabilities
Eliminated need for DB Standbys
Minimized multiple versions / copies of data
Enterprise agreement on cross network
architecture for data integration backend.
•
•
•
•
•
Transformation logic in BTEQ scripts in TD
Inefficient handshake between CDC & ETL
Landing data into files multiple times
Limited visibility into impact / lineage
Limited to data movement
Business view
Data
governance
Real time
Decision
Analytical data
Stores &
Warehousing
Operational
reporting
Data
analysts
profiling
Cloud
Data stores
Quality
customers
Analytical
tools
metadata
Pipe
Source
Of record
Information
delivery
Mask & subset
files
databases
message bus
Data cloud
services
Users
Enterprise data
Modeling
•
GOALS
•
•
•
•
•
Real time integration
Self-service data delivery
Enterprise Data Governance
Interactive querying
Analytics & Information Products Lifecycle
Text search & Analytics
•
•
•
•
•
•
Unified data and metadata dictionary
Data lineage and comprehensive data quality functions
PD-DM Partnership for Data Centric Solutions
Streaming Analytics & Real time dashboards
Enterprise approach across all DM disciplines
Machine Learning
Cloud computing
Data cloud Architecture
Development Environment
IDE
Dashboard Builder
Web Services
Data Cloud
QoS Latency,Uptime services
Applications
Interactive Query
Reporting
Streaming Analytics
M/c Learning
Graph Algorithms
Natural.Lang-Proc
Analytics
Txt Search
Data Acquisition & Indexing Services
ETL
Adapters
Bulk Data Acquisition
Indexing Svcs
Infrastructure Services
Monitoring
Scheduling
Orchestration
Core Services
Distributed
Memory Store
Distributed
File System
Big Table DB
OLTP DB
DW DB
Stream
Processing
Runtime
JVM
App.Containers
OCC
Data Storage
HBase
Data Storage
 Improved handshake between CDC & ETL
 Ability to process some event data
 Data services
 Metadata
 Quality
 Profiling
 Mask
 Subset
 Near real-time data
 All data transformations in centralized grid
 Hadoop capability ( Read / Write / Process)
 Data interchange (external)
Data Integration Grid (Real time)
ETL
Feeds
Distrib.Cache
GG User
Exits
GG
HBase
Data Integration Grid (Batch)
GG + ETL
Flume
GG
MR (Load)
MR (Index)
PIG
Hive
HBase
Data Stream Processing
HBase
Stream
Processor
Data Stream Processing
• Data stream is defined as sequence of elements (”events”)
of the form (K, A) where K, and A are the tuple-valued keys
and attributes respectively
• Objective is to create a stream computing platform which
consumes such a stream, computes intermediate values,
and possibly emits other streams
• The Design Elements of the stream computing platform
are:
• Processing Elements (PEs) – Basic computational element
or building blocks
• Processing Nodes (PNs) - These are logical hosts to PEs
• Communication layer (CL) - Provides cluster management
and automatic failover to standby nodes and maps physical
nodes to logical nodes
Processing Elements
• Processing Elements (PEs) are basic computational element which
are identified by following properties:
– Functionality as defined by a PE (i.e.) class and associated
configuration
– Types of events that it consumes
– Keyed attribute in those events
– Value of the keyed attribute in events which it consumes
– A library of PEs is available for standard tasks
– PE objects are accorded a TTL, so if no event arrives at a PE for the
TTL, the PE is reaped
• Special PEs
•
•
Keyless PEs can consume all events of the type that they are associated. These
normally are used as input PEs, where the key is still being assigned
Abstract PE has only three components of its identity (functionality, event type,
keyed attribute);
–
–
Attribute value is unassigned
It is configured on initialization and, for any value V, it is capable of cloning itself to create fully
qualified PE of that class with identical configuration and value V for the keyed attribute
Processing Nodes
•
•
•
•
•
•
•
•
•
Processing Nodes (PNs) are logical hosts to PEs
Responsible for listening to events, executing operations on incoming events,
dispatching events with the assistance of the communication layer, and emitting
output events
Each event is routed to PNs based on hash function of values of known keyed
attributes in that event
Single event may be routed to multiple PNs.
Set of all possible keying attributes is known from the configuration of the cluster
Event listener in the PN passes incoming events to the processing element
container (PEC) which invokes the appropriate PEs
All events with a particular value of a keyed attribute are guaranteed to arrive at a
particular corresponding PN, and be routed to the corresponding PE instances
within it.
Every keyed PE can be mapped to exactly one PN based on the value of the hash
function applied to the value of the keyed attribute of that PE
Keyless PEs may be instantiated on every PN.
Processing Node
Processing Node
Processing Element Container
PE1
PEn
PE2
Event
Listener
Dispatcher
Emitter
Communication Layer
Routing and Load Balancing
Failover Management
Transport Protocols
Zookeeper
Programming Model
• The high-level programming paradigm is to write generic,
reusable and configurable PEs that can be used across
various applications
• PEs are assembled into applications
• The PE API is fairly simple and flexible consisting of
handlers such as onCreate, onTime, onEvent etc,
setDownstream and a facility to define state variables
– onEvent is invoked for each incoming event of the types the PE
has subscribed to. It implements the logic for input event
handling, typically an update of the internal PE state.
– onTime method is invoked by the PE timer. By default it is
synchronized with onEvent, onTrigger methods
– onTrigger method is used for count based windows. It adds a
new slot when the current slot reaches capacity
Communication Layer
Communication layer :
• Provides cluster management and automatic failover to standby nodes
and maps physical nodes to logical nodes.
• Automatically detects hardware failures and accordingly updates the
mapping
• Emitters specify only logical nodes when sending messages
• Emitters are unaware of physical nodes or when logical nodes are remapped due to failures
• API can be used to send input events in a round-robin fashion to nodes in
an S4 cluster. These input events are then processed by keyless Pes
• Uses a pluggable architecture to select network protocol. Events may be
sent with or without a guarantee
– Control messages may require guaranteed delivery while data may be sent
without a guarantee to maximize throughput
• Uses ZooKeeper to help coordinate between PNs in a cluster
Graph processing
HBase
Apache
Giraph
Large scale graph processing
• Giraph provides libraries for large scale graph
processing
– Modeled after Google Pregel
– Bulk synchronous parallel execution model
– Fault tolerant using checkpointing
– Computation is executed in memory
– Can be a job in a map-reduce pipeline (Hadoop,
Hive)
– Uses Zookeeper for synchronization
Example usage
• User rank (page rank)
– Can be personalized for a user or “type” of user
– Determining popular users, news, jobs, etc.
• Shortest paths
– Many variants single-source, s-t shortest paths, all-to-all shortest
(memory/storage prohibitive)
– How are users, groups connected?
• Clustering, semi-clustering
– Max clique, triangle closure, label propagation algorithms
– Finding related people, groups, interests
• Hidden inferences in communities
– Discover inferences through graph approach extract them
Communication
Barrier Synchronization
Processor-5
Superstep
Fjsfsfjsf;sdfjsfjsj;sjsjsfjs
Processor-4
Processor-3
Processor-2
Local
Computation
Processor-1
Application Client - Giraph
Giraph basics
•
•
Deployment on big data processing infrastructure (no need to create/maintain separate graph
processing cluster)
Dynamic resource management
–
–
Handle failures gracefully
Integrate new resources when available
•
Based on Bulk synchronous parallel model
•
3 main attributes
– Components that process and/or provide storage
– Router to deliver point-to-point messages
– Synchronization of all or a subset of components through regular intervals (supersteps)
•
Computation is done when all components are done
•
Parallelization of computation/messaging during a superstep
–
•
Components can only communicate by messages delivered out-of-order in the next superstep
Fault-tolerant/dynamic resource utilization
–
–
–
Supersteps are atomic units of parallel computation
Any superstep can be restarted from a checkpoint (need not be user defined)
A new superstep provides an opportunity for rebalancing of components among available resources
Data Science
HBase
Cloudera ML
Apache Mahout
• Mahout provides scalable machine learning libraries.
• Oldest product amongst machine learning Algos widely
deployed – an incomplete list is as under:
–
–
–
–
–
–
–
–
User and item based recommenders
k-means and fuzzy k-means clustering
Means shift clustering
Dirichlet process clustering
Latent Dirichlet allocation
Singular value decomposition
Parallel frequent pattern mining
Random forest decision tree based qualifier
• Challenge - Delta between latest ML and Mahout
implementations
MRV2 and Yarn
•
•
•
•
Eliminates Job tracker bottlenecks
Separates Resource tracker and scheduler
Moves map/reduce to user
Allows Hadoop to run all sorts of jobs
– Native BSP (Giraph)
– AllReduce, Graflab
Apache Crunch
• Apache Crunch provides a framework for writing,
testing and running MapReduce pipelines
• It makes tasks like joining and data aggregation
that are tedious to implement on plain
MapReduce
• The APIs are useful when processing data that
does not fit naturally into relational model, such
as time series, serialized object formats like
protocol buffers or Avro records, and HBase rows
and columns
Apache crunch API
• Crunch API is centered around three interfaces that represent
distributed datasets:
– PCollection<T> represents a distributed, unordered collection of
elements of type T e.g. file is represented as Pcollection of strings
• Pcollection provides a parallelDo operation that applies a function to each
element in PCollection
– PTable<K, V> is a sub-interface of Pcollection which represents
unordered multimap.
• Ptable in addition to parallelDo provides groupByKey operation
• groupByKey triggers the sort phase of a map-reduce job
• Result of groupByKey is PGroupedTable<K, V> which is a sorted distributed
map of type K to iterable collection of values of type V
– PCollection, PTable, and PGroupedTable all support a union operation,
which takes a series of distinct PCollections and treats them as a
single, virtual Pcollection
• Required by operations to combine multiple inputs
Example
public class WordCount {
public static void main(String[] args) throws Exception {
Pipeline pipeline = new MRPipeline(WordCount.class);
PCollection<String> lines = pipeline.readTextFile(args[0]);
PCollection<String> words = lines.parallelDo("my splitter", new DoFn<String, String>() {
public void process(String line, Emitter<String> emitter) {
for (String word : line.split("\\s+")) {
emitter.emit(word);
}
}
}, Writables.strings());
PTable<String, Long> counts = Aggregate.count(words);
pipeline.writeTextFile(counts, args[1]);
pipeline.run();
}
}
Cloudera ML
• Cloudera ML provides a command line tool for running data
preparation and model evaluation tasks
–
–
–
–
–
–
–
–
–
–
summary
sample
pivot
header
normalize
showvec
ksketch
kmeans
lloyds
Kassign
• Cloudera ML is just a start but coupled with Apache crunch one can
try out Java ML algos
&