public static void

Download Report

Transcript public static void

Observables
Reactive Libs
Review Observable/Design Pattern
Observables Streams
JavaRx Demo
ScalaRxShell Demo
RxLibs

Microsoft open source



https://rx.codeplex.com/
Open source implementations of ReactiveX:

https://github.com/ReactiveX/RxJava

https://github.com/ReactiveX/RxScala
Netflix open source:

http://techblog.netflix.com/2013/02/rxjava-netflixapi.html

Additions on ReactiveX, competition to akka
Observable/Observer not reactive

Java Design pattern. Not useful for async or
distributed systems. Pre 2000.
package com.example;
import java.util.Observable;
import java.util.Observer;
//models customer going to coffeeshop. can change name. leave observer
public class MyObserver implements Observer{
private String name=null;
private boolean atCoffeeShop=false;
MyObserver(String n){
this.name=n;
}
@Override
public void update(Observable o, Object arg) {
// TODO Auto-generated method stub
System.out.println("calling update callback for:"+name);
}
Observable
package com.example;
import java.util.Observable;
public class CoffeeShop extends Observable{
public void enterCoffeeShop(MyObserver obs){
addObserver(obs);
}
public void notifyCustomers(){
setChanged();
notifyObservers();
}
}
Main.java
package com.example;
public class Main{
public static void main(String []args){
CoffeeShop cs = new CoffeeShop();
cs.addObserver(new MyObserver("cust1"));
cs.addObserver(new MyObserver("cust2"));
cs.addObserver(new MyObserver("cust3"));
cs.notifyCustomers();
}
}
Output:
calling update callback for:cust3
calling update callback for:cust2
calling update callback for:cust1
Run instr:




Download zip:
https://github.com/dougc333/TestCode
Unarchive, cd TestCodemaster/TestObservable, mvn compile
$ mvn exec:java Dexec.mainClass=com.example.Main
[WARNING] Warning: killAfter is now deprecated. Do you need
it ? Please comment on MEXEC-6.

calling update callback for:cust3

calling update callback for:cust2

calling update callback for:cust1
RxJava







NOT ASYNC
the Coffeeshop/Observable notifies all the observers via callback
function. JavaRx, JavaRxScala add to this design pattern for event
streams and to make async programming possible.
EventStreams=Subscriptions
Async: Try/onNext,onComplete
No Locks, Threads, etc...abstracted away
Note: code style cleaner than Observer/Observable. Everything in
subscription class. No separate driver program needed
http://docs.couchbase.com/prebuilt/java-sdk-2.0-beta/topics/observables.html
RxJava


Create an observerable stream using
.just(1,2,3)
Add a subscriber(lazy eval), similar to
observer but add the duality of iterables.
Iterable/Observer stream programming.
Iterables block/bad perf. JavaRx adds threads +
async

onNext <-> next()

onComplete ↔ hasNext()

onError ↔ throws exception
RxJava Create stream, print
package com.example;
import rx.Observable;
public class TestObservable{
public static void main(String []args){
System.out.println("asdf");
Observable.just(5,6,7,8).subscribe(new MySubscriber<Integer>());
//endless stream
//Observable.just(5,6,7,8).repeat().subscribe(new
MySubscriber<Integer>());
}
}
Subscriber
import rx.Subscriber;
class MySubscriber<Integer> extends Subscriber<Integer>{
@Override
public void onCompleted() {
// TODO Auto-generated method stub
System.out.println("onCompleted");
}
@Override
public void onError(Throwable throwable) {
// TODO Auto-generated method stub
System.err.println(Error: " + throwable.getMessage());
}
@Override
public void onNext(Object arg0) {
// TODO Auto-generated method stub
System.out.println("onNext: " + arg0);
}
}
Run instr:



cd TestObservable/TestJavaRx
mvn clean;mvn compile
$ mvn exec:java Dexec.mainClass=com.example.TestObservabl
e
[WARNING] Warning: killAfter is now deprecated. Do you need it ? Please
comment on MEXEC-6.
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onComplete
RxJava Create


static <T> Observable<T>
create(Observable.OnSubscribe<T> f)
Returns an Observable that will execute the
specified function when a Subscriber
subscribes to it.
Create a separate class
package com.example;
import rx.*;
//implement logic when to call onNext(), onError(), onComplete() by using create()
//demo only
public class CreateObservable<Integer> implements Observable.OnSubscribe<Integer>{
@Override
public void call(Subscriber<? super Integer> arg0) {
// TODO Auto-generated method stub
Subscriber subscriber = (Subscriber) arg0;
try{
if(!subscriber.isUnsubscribed()){
for(int i=0;i<5; i++){
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}catch(Exception e){
subscriber.onError(e);
}
}
}
Add subscribe. No drivers!!!
package com.example;
import rx.*;
import rx.functions.Action1;
public class ObservableCreate {
public static void main(String args[]){
Observable.create(new CreateObservable<Integer>()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("onNext: " + integer);
}
});
}
}
Run Instr:

Run from eclipse
Run from eclipse

Run function Action1 everytime subscriber
subscribes. Lazy eval, have to call subscribe to
start data processing
RxJava API Example

Some things are easier in RxJava, create a
counter once per second. No locks, no threads.
Errors and cancellation(unsubscribe) built into
the API. Less custom code and less testing.
Timer.java
package com.example;
import rx.*;
import rx.functions.Action1;
import java.util.concurrent.*;
public class Timer {
static CountDownLatch latch = new CountDownLatch(5);
public static void main(String args[]) throws InterruptedException{
Observable.interval(1,TimeUnit.SECONDS).subscribe(new Action1<Long>(){
public void call(Long counter){
latch.countDown();
System.out.println("Timer Secs:"+counter);
}
});
latch.await();
}
}
Run Inst;

Run in eclipse
RxJava Map Java7





Uses functions to replace the x=>f(x) notation
Modify the timer example to use a map to print
out the thread it is operating on
Use the testing code.
The countdown latch for interval sequences
when another thread creates the sequence.
Interval is threaded, see scheduler operator
table:https://github.com/ReactiveX/RxJava/wiki/
Scheduler#using-schedulers
Timer threads

The observable does the work in one thread
and the results via the subscriber are displayed
in another thread.
class RxThread[T](o:Observable[T]) {
def execAsync[T] = {
o.subscribeOn(Schedulers.newThread)
.observeOn(AndroidSchedulers.mainThread())
.materialize
}
}
/**
* Convert implicitly a Java Observable in a Scala Observable.
*/
object RxThread {
implicit def Observable2Notification[T](o: Observable[T]) = new RxThread(o)
}
Add func
public static void debugTimer() throws InterruptedException{
Observable.interval(1,TimeUnit.SECONDS).map(new Func1<Long,Long>(){
@Override
public Long call(Long i){
System.out.println(Thread.currentThread().getName());
return i;
}
}).subscribe(new Action1<Long>(){
public void call(Long counter){
latch.countDown();
System.out.println("Timer Secs:"+counter);
}
});
latch.await();
}
How about a sequence?
public static void debugSeq() {
Observable.just(1,2,3,4,5).map(new Func1<Integer,Integer>(){
@Override
public Integer call(Integer i){
System.out.println(Thread.currentThread().getName());
return i;
}
}).subscribe(new Action1<Integer>(){
@Override
public void call(Integer i){
System.out.println("onNext: " + i);
}
});
}
Run inst;

Comment out either debugTimer() or
debugSeq() , run Timer.java in Eclipse or
maven
RxJava Twitter Ex. Building APIs



http://java.dzone.com/articles/turning-twitter4jrxjavas
Turning Twitter4J into Observable. Creates API
for stream processing/multiuser nonblocking.
Goal: API for user, rolling count of tweets
Making RxJava Async and Parallel


Trick from lecture: if the return type is <T> this
is blocking. If Observable<T> or Future<T>
then this is async.
Look at BlockingObservable return types.
Marble diagram=>parallel
Marble=>parallelism
JavaRx parallel


JavaRx controls thread pools using schedulers
as describe in lecture when EM compared
ExecutionContext from Promises/Futures to
Observables.
Just using flatMap doesn't guarantee multiple
threads. A flatMap returns Observable<T>
which is the first criteria for parallelism
Schedulers add parallelism




Debug:Thread.getCurrentThread().getName(),
getCount()
Debug threads in JavaRx/Java adding to map()
using Func1
Debug in Rx/Scala adding .debug() to
package.scala
http://www.grahamlea.com/2014/07/rxjavathreading-examples/
Multiple ways to add threads

Docs not clear, look at the JavaRx test code

Test code does this: Creating your own threads



In the observable call?

OnNext()?
Blog does this: Using the thread pool from the
scheduler
Add spark executors or create your own
executor framework using zookeeper(TBD)
Rx/Scala

Scala wrapper around a RxJava


https://github.com/ReactiveX/RxScala
Scala REPL for Observables

Not avail from websearch

Change $TOOL_PATH under $SCALA_HOME/bin
or copy the jars from
suggestions/lib_managed/com.netflix.rxjava and
rxjava-scala into $SCALA_HOME/lib dir.
JavaRx


The APIs are different between
JavaRx/JavaRxScala/JavaRxNET
From lecture:

Create a stream, notify subscribers on change to
represent processing,

schedulers manage thread pools vs.
ExecutionContext in Futures,

apply ops to streams which return Observables
Repl lib path
Scala REPL bug
scala> import rx.lang.scala._
scala> val ob = Observable(1,2,3,4)
java.lang.NoSuchMethodError:
scala.collection.JavaConverters$.asJavaIterableC
onverter(Lscala/collection/Iterable;)Lscala/collectio
n/convert/Decorators$AsJava;
at
rx.lang.scala.Observable$.apply(Observable.scala
:1924)
Upgrade to latest scala version;
2.11.2; 2 I/Fs, RxScala
scala> import rx.lang.scala._
import rx.lang.scala._
scala> val ob = Observable(1,2,3,4)
ob: rx.lang.scala.Observable[Int] =
rx.lang.scala.Observable$$anon$9@750cdd5e
scala>
Importing the wrong lib, RxJava
scala> import rx._
import rx._
scala> val foo = Observable.just(1,2,3,4)
foo: rx.Observable[(Int, Int, Int, Int)] =
rx.Observable@18a8422d
scala>
Upgrade to latest scala version;
2.11.2
scala> import rx.lang.scala._
import rx.lang.scala._
scala> val ob = Observable(1,2,3,4)
ob: rx.lang.scala.Observable[Int] =
rx.lang.scala.Observable$$anon$9@750cdd5e
scala>
Import the netflix jars(no scheduler)

Download from maven central, use the .20.4
versions not the 0.15.0 versions in the hw

rxjava-async-util-0.20.4.jar

rxjava-computation-expressions-0.20.4.jar

rxjava-core-0.15.0.jar

rxjava-core-0.20.4.jar

rxjava-scala-0.15.0.jar

rxjava-scala-0.20.4.jar
Subscribe to the Observable, much
easier than Java7
scala> foo.subscribe(x=>println(x))
1
2
3
4
res0: rx.lang.scala.Subscription =
rx.lang.scala.subscriptions.Subscription$$anon$1
@4f91659c
Observables

Repeat Rx in Scala

Observables require subscribers to send
notifications to when data is ready in the
Observable. An observable is the dual of an iterator
but can return 0 or many events or an Error. Key is
the 0 event.
Doesn't have to be async

An observable interface contains:



onNext
onError
onCompleted
RxScala

Homework

apply() (for maps, flatMap)

Add what on top of adding subscribers to
observables?
Subjects


For async programming, a subject is like a
promise. A promise contains a future just as a
future has to wait for a promise to write the data
A subject contains an Observable<T> or
Observer<T>?