Implement Writable(Serialization for network/MR
Download
Report
Transcript Implement Writable(Serialization for network/MR
Scala/Spark Review/demos
Theme: web search results are wrong
Small Scala
AVRO/Parquet
Server Actors
Scala Arrays
http://codingbat.com/java/Array-1
Rotate; use a split
def rotateLeft3(nums:Array[Int]):Array[Int]= {
val splitArray = nums.splitAt(2)
splitArray(1)+splitArray(0)
}
Scala Rotate websearch
http://aperiodic.net/phil/scala/s-99/p19.scala
object P19 {
def rotate[A](n: Int, ls: List[A]): List[A] = {
val nBounded = if (ls.isEmpty) 0 else n % ls.length
if (nBounded < 0) rotate(nBounded + ls.length, ls)
else (ls drop nBounded) ::: (ls take nBounded)
}
}
ReduceRight
def
reduceRight[B >: A](op: (A, B) ⇒ B): B
Applies a binary operator to all elements of this
sequence, going right to left.
//scala> a.reduceRight((elem,result)=>
{println("elem:"+elem+" result:"+result);elem})
//elem:5 result:6
//elem:4 result:5
//elem:3 result:4
//elem:2 result:3
//elem:1 result:2
ReduceRight question
//Given an int array length 3, if there is a 2 in the
array immediately followed by a 3, set the 3
element to 0. Return the changed array.
//fix23({1, 2, 3}) → {1, 2, 0}
//fix23({2, 3, 5}) → {2, 0, 5}
//fix23({1, 2, 1}) → {1, 2, 1}
//how to set the elem to a value?
def fix23(nums:Array[Int]):Array[Int]= {
nums.reduceRight((elems,result) =>{})
}
Spark Build with Hadoop
Need YARN, Hadoop 2.2+
Should see this under assembly:
NewHadoop RDD ops
Understand small scale ops to define larger
scale APIs on data. Define API, implement in
Scala primitives, iterate to improve performance
Repartition data or define new primitives
RDD API:
Data Manipulation
Count(),collect(), filter(),flatMap(), foreach(), groupBy(),
itersection(), zip(),zipWithIndex(),
ZipPartitions().foreachPartition(),mapPartitionsWithIndex(
), countApproxDistinct(), toLocalIterator(), no traversable
iterator
pipe()
RDD Ops
Cluster
repartition(), manual vs. auto, hotspot; cache(),
checkpoint(stores DAGs), compute(), dependencies(),
etc...
RDD process of parallelization across YARN
executors; accumulators, broadcast variables
Different RDDs, CoGroupedRDD, JdbcRDD,
OrderedRDD, HadoopRDD,
PartitionPruningRDD, ShuffledRDD, UnionRDD
Zip example; need index pos
//Given an array of ints, return the number of
times that two 6's are next to each other
//in the array. Also count instances where the
second "6" is actually a 7.
//array667({6, 6, 2}) → 1
//array667({6, 6, 2, 6}) → 1
//array667({6, 7, 2, 6}) → 1
Zip
def array667(nums:Array[Int]):Int ={
if(nums.length%2!=0) nums++Array(0) //add
dummy int to make even so we get pairs
//create pairs from array
val listPairs = nums zip nums.tail
listPairs.toList.filter(x=>((x._2==6)||(x._2==7))).siz
e
}
AVRO/Parquet
AVRO, comes from MR programs
Important bc:
To address design problem in db conversion
Design for parallel processing in Hadoop AND Spark
Java/Python support, Data Scientists want Python
Given records in DB; how to convert to parallel MR
operations/queries?
Easy obvious step: convert db rows/objects into Hadoop
Objects. Can use primitive data types to map DB
primitives, Varchar->Text, Int->IntWritable.
Avro
Schema(At object level not DB/API level)
Definition? RPC messages? Used to hand code
these and define APIS/per application. Define
AVRO Protocols
Standardize using AVRO.
Human readable JSON option
Language indepndent schemas, code generation
input splittable for MR
Avro Java example
No Hadoop
http://www.ibm.com/developerworks/library/bdavrohadoop/index.html?ca=drs3 components to Avro,
1) create avsc or avro schema file
{
"namespace":"com.example.avroSample.model",
"type":"record",
"name":"Automobile",
"fields":[
Omitted... string (AVRO type)
Avro Maven
Autogenerate the POJO
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.5</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>
${project.basedir}/src/main/avro/
</sourceDirectory>
<outputDirectory>
${project.basedir}/src/main/java/
</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
POJO/Java Class
public class Automobile extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Au$
public static org.apache.avro.Schema getClassSchema() { return
SCHEMA$; }
@Deprecated public java.lang.CharSequence modelName;
@Deprecated public java.lang.CharSequence make;
@Deprecated public int modelYear;
@Deprecated public int passengerCapacity;
These aren't Hadoop Types. Requirements for Hadoop?
Implement Writable(Serialization for network/MR), InputFormat/Split,
RecordReader
Avro Hadoop Example
Objects are input splittable,
Object processing in M/R; have to define
partitioning/sorting/grouping for M/R pipeline
processing. Translate default AVRO ordering to
application level sorting
Spark is different, have to define RDD
transformation stages with your AVRO and
POJO objects. Also backward compatability to
M/R parallelism
AVRO WC
public static class Reduce extends MapReduceBase
implements Reducer<Text,IntWritable,AvroWrapper<Pair<CharSequence,Integer>>,
NullWritable>{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<AvroWrapper<Pair<CharSequence,Integer>>,NullWritable>
output,Reporter reporter) throws IOException{
int sum=0;
while(values.hasNext()){
sum+=values.next().get();
}
output.collect(new AvroWrapper<Pair<CharSequence, Integer>>(
Balance between native Java types for cross language vs. Writables for Hadoop
processing
Customize AVRO
http://patrick.jaromin.com/
e.g. allows you to do joins; many approaches
here. Note to self:Don't waste time here
Be careful of web search; very old information.
Doesn't mean you should do this
Spark not like this: define Scala API(filter,
collect, map, groupBy); build higher level APIs o
n this with performance numbers. Compare vs.
SQL algebra, projection; join, etc...
Spark AVRO
Kyro Serializer
Not sure if this is true:
Doesn't work all the time:
http://apache-spark-developerslist.1001551.n3.nabble.com/bug-using-kryo-asclosure-serializer-td6473.html
To enable, override spark.serializer in
spark.conf, or use environment variable setting
SPARK_JAVA_OPTS from
https://groups.google.com/forum/#!topic/sparkusers/bXXG3-JHXHk
Spark doesn't operate at this API
level
Have to learn to make RDDs, indexedRDDs
and create functional I/fs for these RDDs with
your dataset. Have to support AVRO/Parquet in
the RDDs
RDDs talk about Pairs and K/V:
http://spark.apache.org/docs/latest/programmin
g-guide.html#rdd-operations
Should have different RDDs to implement APIs
on different data collections; eg. IndexedRDD
for search indexes, collection/partitioned RDDS
for large data sets requiring fast read times.
Most of the performance available through RDD
Actors
JobServer/Actors. Initialize the Actor system to
first enable loading of actors.
Modified example from Spray Examples. Had to
create build.sbt. This doesn't exist in
SprayExamples
seq(Revolver.settings: _*)
name := "TestSpray"
organization := "com.example"
version := "0.1-SNAPSHOT"
scalaVersion := "2.10.2"
scalacOptions := Seq("-encoding", "utf8",
"-target:jvm-1.7"
)
Build.sbt for HelloWorldServer
Stole this from: github link(lost), have to modify. Not completely correct
resolvers ++= Seq("Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"Spray Repository" at "http://repo.spray.io/"
)
libraryDependencies ++= {
val akkaVersion = "2.2.0"
val sprayVersion = "1.2-20130727"
Seq(
"com.typesafe.akka"
%% "akka-actor"
% akkaVersion,
"com.typesafe.akka"
%% "akka-slf4j"
% akkaVersion,
"io.spray"
% "spray-can"
% sprayVersion,
"io.spray"
% "spray-routing"
% sprayVersion,
"io.spray"
% "spray-client"
% sprayVersion,
"io.spray"
%% "spray-json"
% "1.2.5",
"com.github.nscala-time" %% "nscala-time"
% "0.4.2",
"ch.qos.logback"
% "logback-classic"
% "1.0.12"
)
}
Main server
package com.example
import akka.actor.ActorSystem
import akka.io.IO
import spray.can.Http
import akka.actor.Props
object Main extends App{
//
implicit val system = ActorSystem()
val handler = system.actorOf(Props[DemoService],"handler")
IO(Http) ! Http.Bind(listener=handler, interface="0.0.0.0",port=5000)
}
REST GET w/HTTP Actor per
Connection from Spray-Can
class DemoService extends Actor{
implicit val timeput: Timeout = 1.second //do we need this?
import context.dispatcher
def receive = {
case _: Http.Connected => sender ! Http.Register(self)
case HttpRequest(GET,Uri.Path("/"), _, _, _) => sender ! index
case HttpRequest(GET,Uri.Path("/ping"), _, _, _) => sender !
HttpResponse(entity="PONG")
case HttpRequest(GET, Uri.Path("/stream"), _, _, _) =>
val peer = sender // since the Props creator is executed asyncly we need to
save the sender r$
context actorOf Props(new Streamer(peer, 25))
case _: HttpRequest => sender ! HttpResponse(status = 404, entity =
"Unknown resource!")
}
Response
lazy val index = HttpResponse(
entity = HttpEntity(`text/html`,
<html>
<body>
<h1>Say hello to <i>spray-can</i>!</h1>
<p>Defined resources:</p>
<ul>
<li><a href="/ping">/ping</a></li>
<li><a href="/stream">/stream</a></li>
</ul>
</body>
</html>.toString()
)
)
Sreaming Actor
class Streamer(client: ActorRef, count: Int) extends Actor with ActorLogging {
log.debug("Starting streaming response ...")
client ! ChunkedResponseStart(HttpResponse(entity = " " *
2048)).withAck(Ok(count))
def receive = {
case Ok(0) =>
log.info("Finalizing response stream ...")
client ! MessageChunk("\nStopped...")
client ! ChunkedMessageEnd
context.stop(self)
case Ok(remaining) =>
log.info("Sending response chunk ...")
context.system.scheduler.scheduleOnce(100 millis span) {
client ! MessageChunk(DateTime.now.toIsoDateTimeString + ",
").withAck(Ok(remaining - 1))
}
case x: Http.ConnectionClosed =>
log.info("Canceling response stream due to {} ...", x)
context.stop(self)
} … left off rest