Links, materials and notes about reactive programming with reactor
Reactive programming [...] is a subset of asynchronous programming and a paradigm where the availability of new information drives the logic forward rather than having control flow driven by a thread-of-execution. [ref]
They are not the same thing!
A reactive system is an architectural style that allows multiple individual applications to coalesce as a single unit, reacting to its surroundings, while remaining aware of each other—this could manifest as being able to scale up/down, load balancing, and even taking some of these steps proactively. [ref]
We are focusing on Reactive Programming (the coding part) here not on the System.
- Reactive Streams: The API + TCK
- RxJava 2: Java 6 (and Android) compatible implementation of Reactive Streams
- Reactor: Java 8 based implementation of Reactive Streams. Developed by the Spring Team, to be used in Spring 5 and onwards.
- Java 9 Flow API: The Reactive Streams API copy pasted into java.util.concurrent.Flow class. More details here
We are focusing on Reactor here.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
public interface Subscription {
void request(long n);
void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Example:
[ref]
the ability for the consumer to signal the producer that the rate of emission is too high for it to keep up [ref]

Represents stream of 0 to many elements. Sends 0..N signals + an error or completion signal.
It is similar to java.util.Stream but even more similar to java.util.Iterable as a Flux is reusable.
Example:
Flux<String> flux = Flux.just("Hello,", " world!"); // creates Flux with a static method
flux.subscribe(); // need to subscribe to start the data flow
flux.block(); // subscribes + waits... this operation exits the reactor world.
Represents 0 to 1 elements. Sends 0..1 signals + an error or completion signal.
It is similar to java.util.Optional and java.util.concurrent.CompletableFuture. Mono is reusable as well.
Example:
Mono<String> mono = Mono.just("Hello, world!");
mono.subscribe(); // need to subscribe to start the data flow
mono.block(); // subscribes + waits... this operation exits the reactor world.Nothing happens asynchronously - unless we say so explicitly.
This where Schedulers come into picture: just think about them as thread pools. You use different kind of thread pools for different kind of operations.
Schedulers.parallel()-> CPU intensive operationsSchedulers.elastic()-> Everything that involves waiting, for example db/network/file/ldap write/read.
Differences to Collection.parallelStream() and CompletableFuture.supplyAsync(). The problem with default ForkJoinPool.
Example:
Flux<Integer> flux = Flux.just(10, 15)
.log() // built in logging, uses slf4j if on classpath
.publishOn(Schedulers.parallel()) // or subscribeOn()
.log()
.flatMap(i -> Mono.fromSupplier(() -> i * 2).log().publishOn(Schedulers.parallel()))
.log()
.publishOn(Schedulers.parallel())
.log()
.map(i -> i - 2)
.log();
flux.blockLast();Output:
[main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[main] INFO reactor.Flux.PublishOn.2 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.FlatMap.3 - onSubscribe(FluxFlatMap.FlatMapMain)
[main] INFO reactor.Flux.PublishOn.4 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[main] INFO reactor.Flux.MapFuseable.5 - | request(unbounded)
[main] INFO reactor.Flux.PublishOn.4 - | request(unbounded)
[main] INFO reactor.Flux.FlatMap.3 - request(256)
[main] INFO reactor.Flux.PublishOn.2 - | request(256)
[main] INFO reactor.Flux.Array.1 - | request(256)
[main] INFO reactor.Flux.Array.1 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onNext(15)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(10)
[main] INFO reactor.Flux.Array.1 - | onComplete()
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onNext(15)
[parallel-3] INFO reactor.Flux.FlatMap.3 - onNext(20)
[parallel-2] INFO reactor.Flux.PublishOn.2 - | onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(20)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(18)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onNext(30)
[parallel-4] INFO reactor.Flux.FlatMap.3 - onComplete()
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onNext(30)
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onNext(28)
[parallel-1] INFO reactor.Flux.PublishOn.4 - | onComplete()
[parallel-1] INFO reactor.Flux.MapFuseable.5 - | onComplete()
Another example:
Scheduler emitter = Schedulers.newElastic("emitter");
Scheduler transformer = Schedulers.newElastic("transformer");
Scheduler consumer = Schedulers.newParallel("consumer");
Flux<Integer> flux = Flux.<Integer>push(e -> {
// imagine reading from DB row by row or from file line by line
for (int fi = 0; fi < 30; fi++) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
e.next(fi);
}
})
.log()
.subscribeOn(emitter)
.log();
flux.flatMap(
// could be some other IO like reading from a second database
i -> Mono.fromSupplier(() -> i + " - " + i * 2)
.log()
.subscribeOn(transformer)
.log()
.publishOn(consumer))
.log()
.collectList()
.log().block();Another example (what is the use case?):
Flux<LocalDateTime> flux = Flux.<LocalDateTime>create(e -> {
Schedulers.newSingle("brc", true)
.schedulePeriodically(
() -> {
LOGGER.info("calculating...");
e.next(LocalDateTime.now(ZoneOffset.UTC));
},
0, 100, TimeUnit.MILLISECONDS);
}, OverflowStrategy.LATEST).cache(1);
// ...
flux.blockFirst();synchronizedLock,ReentrantLock,SemaphoreCountDownLatch,CyclicBarrierThreadPoolExecutor,ScheduledThreadPoolExecutorFuture,CompletableFuture
As with any concurrent programming tool anything (typicall library) that's relying the thread not changing will break if you do it on multiple threds.
ThreadLocal. A typical programmer never (or at least very, very rarely) should useThreadLocaldirectly. However libraries do use it:- Slf4j MDC (Mapped Diagnostic Context).
- Transaction handling with Spring JDBC or Hibernate or MQ-s: Now this is a tricky one. If you have a transaction then you have the following options (as I see it):
- Leave the whole operation single threaded
- Create a single threaded
Scheduler/ transaction and push every transactional operation onto that scheduler (including transaction begin and transaction commit/rollback) - You say goodbye to your battle proven libraries and roll your own transaction handling which does not depend on
ThreadLocal- only do this if you feel invincible.
- Make sure you don't overflow your downstreams (which does not yet support reactive programming)
- Reactive Programming wikipedia
- The Reactive Manifesto
- Reactive Streams
- Reactive programming vs. Reactive systems
- An update on Reactive Streams and what's coming in Java 9 by Viktor Klang
- Introduction to Reactive Programming
- Servlet vs Reactive Stacks in Five Use Cases - presentation
- The Reactive Scrabble benchmarks by akarnokd blog
- An Introduction to Functional Reactive Programming
- Introduction to Reactive Streams for Java Developers
- RxMarbles: Interactive diagrams of Rx Observables
- Marble diagrams examples
- Reactor reference
- Reactor javadoc
- Project Reactor
- Reactor github
- Reactor by Example
- Lite Rx API Hands-On with Reactor Core 3
- Reactive Programming With Spring Reactor
- Reactor – Simple Ways to create Flux/Mono
- Reactive Programming with Reactor 3 (interview + playground)
- Building Reactive Rest APIs with Spring WebFlux and Reactive MongoDB
- File Reading in Reactor