Implement Writable(Serialization for network/MR

Download Report

Transcript Implement Writable(Serialization for network/MR

Scala/Spark Review/demos
Theme: web search results are wrong
Small Scala
AVRO/Parquet
Server Actors
Scala Arrays
http://codingbat.com/java/Array-1
 Rotate; use a split
def rotateLeft3(nums:Array[Int]):Array[Int]= {
val splitArray = nums.splitAt(2)
splitArray(1)+splitArray(0)
}

Scala Rotate websearch
http://aperiodic.net/phil/scala/s-99/p19.scala
object P19 {
def rotate[A](n: Int, ls: List[A]): List[A] = {
val nBounded = if (ls.isEmpty) 0 else n % ls.length
if (nBounded < 0) rotate(nBounded + ls.length, ls)
else (ls drop nBounded) ::: (ls take nBounded)
}
}
ReduceRight
def
reduceRight[B >: A](op: (A, B) ⇒ B): B
Applies a binary operator to all elements of this
sequence, going right to left.
//scala> a.reduceRight((elem,result)=>
{println("elem:"+elem+" result:"+result);elem})
//elem:5 result:6
//elem:4 result:5
//elem:3 result:4
//elem:2 result:3
//elem:1 result:2
ReduceRight question
//Given an int array length 3, if there is a 2 in the
array immediately followed by a 3, set the 3
element to 0. Return the changed array.
//fix23({1, 2, 3}) → {1, 2, 0}
//fix23({2, 3, 5}) → {2, 0, 5}
//fix23({1, 2, 1}) → {1, 2, 1}
//how to set the elem to a value?
def fix23(nums:Array[Int]):Array[Int]= {
nums.reduceRight((elems,result) =>{})
}
Spark Build with Hadoop


Need YARN, Hadoop 2.2+
Should see this under assembly:
NewHadoop RDD ops

Understand small scale ops to define larger
scale APIs on data. Define API, implement in
Scala primitives, iterate to improve performance


Repartition data or define new primitives
RDD API:

Data Manipulation



Count(),collect(), filter(),flatMap(), foreach(), groupBy(),
itersection(), zip(),zipWithIndex(),
ZipPartitions().foreachPartition(),mapPartitionsWithIndex(
), countApproxDistinct(), toLocalIterator(), no traversable
iterator
pipe()
RDD Ops

Cluster



repartition(), manual vs. auto, hotspot; cache(),
checkpoint(stores DAGs), compute(), dependencies(),
etc...
RDD process of parallelization across YARN
executors; accumulators, broadcast variables
Different RDDs, CoGroupedRDD, JdbcRDD,
OrderedRDD, HadoopRDD,
PartitionPruningRDD, ShuffledRDD, UnionRDD
Zip example; need index pos


//Given an array of ints, return the number of
times that two 6's are next to each other
//in the array. Also count instances where the
second "6" is actually a 7.
//array667({6, 6, 2}) → 1
//array667({6, 6, 2, 6}) → 1
//array667({6, 7, 2, 6}) → 1
Zip
def array667(nums:Array[Int]):Int ={
if(nums.length%2!=0) nums++Array(0) //add
dummy int to make even so we get pairs
//create pairs from array
val listPairs = nums zip nums.tail
listPairs.toList.filter(x=>((x._2==6)||(x._2==7))).siz
e
}
AVRO/Parquet




AVRO, comes from MR programs
Important bc:
 To address design problem in db conversion

Design for parallel processing in Hadoop AND Spark

Java/Python support, Data Scientists want Python
Given records in DB; how to convert to parallel MR
operations/queries?
Easy obvious step: convert db rows/objects into Hadoop
Objects. Can use primitive data types to map DB
primitives, Varchar->Text, Int->IntWritable.
Avro


Schema(At object level not DB/API level)
Definition? RPC messages? Used to hand code
these and define APIS/per application. Define
AVRO Protocols
Standardize using AVRO.

Human readable JSON option

Language indepndent schemas, code generation

input splittable for MR
Avro Java example



No Hadoop
http://www.ibm.com/developerworks/library/bdavrohadoop/index.html?ca=drs3 components to Avro,

1) create avsc or avro schema file
{

"namespace":"com.example.avroSample.model",

"type":"record",

"name":"Automobile",

"fields":[

Omitted... string (AVRO type)
Avro Maven
Autogenerate the POJO

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.5</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>
${project.basedir}/src/main/avro/
</sourceDirectory>
<outputDirectory>
${project.basedir}/src/main/java/
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
POJO/Java Class
public class Automobile extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {

public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Au$

public static org.apache.avro.Schema getClassSchema() { return
SCHEMA$; }

@Deprecated public java.lang.CharSequence modelName;

@Deprecated public java.lang.CharSequence make;

@Deprecated public int modelYear;

@Deprecated public int passengerCapacity;


These aren't Hadoop Types. Requirements for Hadoop?
Implement Writable(Serialization for network/MR), InputFormat/Split,
RecordReader
Avro Hadoop Example



Objects are input splittable,
Object processing in M/R; have to define
partitioning/sorting/grouping for M/R pipeline
processing. Translate default AVRO ordering to
application level sorting
Spark is different, have to define RDD
transformation stages with your AVRO and
POJO objects. Also backward compatability to
M/R parallelism
AVRO WC
public static class Reduce extends MapReduceBase
implements Reducer<Text,IntWritable,AvroWrapper<Pair<CharSequence,Integer>>,
NullWritable>{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<AvroWrapper<Pair<CharSequence,Integer>>,NullWritable>
output,Reporter reporter) throws IOException{
int sum=0;
while(values.hasNext()){
sum+=values.next().get();
}
output.collect(new AvroWrapper<Pair<CharSequence, Integer>>(
Balance between native Java types for cross language vs. Writables for Hadoop
processing
Customize AVRO




http://patrick.jaromin.com/
e.g. allows you to do joins; many approaches
here. Note to self:Don't waste time here
Be careful of web search; very old information.
Doesn't mean you should do this
Spark not like this: define Scala API(filter,
collect, map, groupBy); build higher level APIs o
n this with performance numbers. Compare vs.
SQL algebra, projection; join, etc...
Spark AVRO


Kyro Serializer
Not sure if this is true:
Doesn't work all the time:
http://apache-spark-developerslist.1001551.n3.nabble.com/bug-using-kryo-asclosure-serializer-td6473.html

To enable, override spark.serializer in
spark.conf, or use environment variable setting

SPARK_JAVA_OPTS from
https://groups.google.com/forum/#!topic/sparkusers/bXXG3-JHXHk
Spark doesn't operate at this API
level



Have to learn to make RDDs, indexedRDDs
and create functional I/fs for these RDDs with
your dataset. Have to support AVRO/Parquet in
the RDDs
RDDs talk about Pairs and K/V:
http://spark.apache.org/docs/latest/programmin
g-guide.html#rdd-operations
Should have different RDDs to implement APIs
on different data collections; eg. IndexedRDD
for search indexes, collection/partitioned RDDS
for large data sets requiring fast read times.
Most of the performance available through RDD
Actors


JobServer/Actors. Initialize the Actor system to
first enable loading of actors.
Modified example from Spray Examples. Had to
create build.sbt. This doesn't exist in
SprayExamples
seq(Revolver.settings: _*)
name := "TestSpray"
organization := "com.example"
version := "0.1-SNAPSHOT"
scalaVersion := "2.10.2"
scalacOptions := Seq("-encoding", "utf8",
"-target:jvm-1.7"
)
Build.sbt for HelloWorldServer
Stole this from: github link(lost), have to modify. Not completely correct
resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"Spray Repository" at "http://repo.spray.io/"
)
libraryDependencies ++= {
val akkaVersion = "2.2.0"
val sprayVersion = "1.2-20130727"
Seq(
"com.typesafe.akka"
%% "akka-actor"
% akkaVersion,
"com.typesafe.akka"
%% "akka-slf4j"
% akkaVersion,
"io.spray"
% "spray-can"
% sprayVersion,
"io.spray"
% "spray-routing"
% sprayVersion,
"io.spray"
% "spray-client"
% sprayVersion,
"io.spray"
%% "spray-json"
% "1.2.5",
"com.github.nscala-time" %% "nscala-time"
% "0.4.2",
"ch.qos.logback"
% "logback-classic"
% "1.0.12"
)
}
Main server
package com.example
import akka.actor.ActorSystem
import akka.io.IO
import spray.can.Http
import akka.actor.Props
object Main extends App{
//
implicit val system = ActorSystem()
val handler = system.actorOf(Props[DemoService],"handler")
IO(Http) ! Http.Bind(listener=handler, interface="0.0.0.0",port=5000)
}
REST GET w/HTTP Actor per
Connection from Spray-Can
class DemoService extends Actor{
implicit val timeput: Timeout = 1.second //do we need this?
import context.dispatcher
def receive = {
case _: Http.Connected => sender ! Http.Register(self)
case HttpRequest(GET,Uri.Path("/"), _, _, _) => sender ! index
case HttpRequest(GET,Uri.Path("/ping"), _, _, _) => sender !
HttpResponse(entity="PONG")
case HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
val peer = sender // since the Props creator is executed asyncly we need to
save the sender r$
context actorOf Props(new Streamer(peer, 25))
case _: HttpRequest => sender ! HttpResponse(status = 404, entity =
"Unknown resource!")
}
Response
lazy val index = HttpResponse(
entity = HttpEntity(`text/html`,
<html>
<body>
<h1>Say hello to <i>spray-can</i>!</h1>
<p>Defined resources:</p>
<ul>
<li><a href="/ping">/ping</a></li>
<li><a href="/stream">/stream</a></li>
</ul>
</body>
</html>.toString()
)
)
Sreaming Actor
class Streamer(client: ActorRef, count: Int) extends Actor with ActorLogging {
log.debug("Starting streaming response ...")
client ! ChunkedResponseStart(HttpResponse(entity = " " *
2048)).withAck(Ok(count))
def receive = {
case Ok(0) =>
log.info("Finalizing response stream ...")
client ! MessageChunk("\nStopped...")
client ! ChunkedMessageEnd
context.stop(self)
case Ok(remaining) =>
log.info("Sending response chunk ...")
context.system.scheduler.scheduleOnce(100 millis span) {
client ! MessageChunk(DateTime.now.toIsoDateTimeString + ",
").withAck(Ok(remaining - 1))
}
case x: Http.ConnectionClosed =>
log.info("Canceling response stream due to {} ...", x)
context.stop(self)
} … left off rest