Getting Started with Reactive Programming using Project Reactor: Kotlin Mono Examples
by Henry Brown
If you were convinced by my last post: “Why you may want to consider building your next API reactively” you may be interested to step up you reactive game by learning a reactive library. In this post, I would like to start that journey by looking at Project Reactor, one of the most popular reactive implementations in the Java World. It is also the default reactive library for the Spring framework.
Of course, there are many other implementations such as RxJava (which itself is a JVM implementation of the Reactive extensions project) and Akka, and Vert.x. Bringing all these implementations together is the Reactive Streams initiative which defines the Reactive Streams Specification. This provides a good jumping off point to start talking about reactive programming in the Java world.
Reactive Streams Specification
This specification provides a set of standard interfaces to provide asynchronous, non-blocking communication with an additional capability called: Back-pressure. The specification had input from many organisations that have knowledge in streaming data such as Lightbend, Netflix and Twitter. The specification limits itself to defining how a stream of data can be moved between the interfaces it defines without concerning itself with any operations that you may want to perform on that data. Implementors of the specification are free to add additional capabilities on top of these interfaces to handle operations that may be performed on top of the stream of data. to this end, the specification defines a set of core interfaces.
The Core Interfaces
The following core interfaces are defined by the specification:
Publisher
- a component that can produce/send data to any number of subscribers;Subscriber
- a component that can receive data from a publisher;Subscription
- a connection between a publisher and subscriber that specifies the rate that data can be sent from the publisher to the subscriber; andProcessor
- a component that is both a publisher, and a subscriber at the same time.
Most of the time, you are only concerned with using the first 3 with Processors
mostly implemented by the implementors of the specification. These interfaces are also very simply defined. A Publisher
defines only a single method:
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
The Publisher
only allows a subscriber to register itself with a publisher by calling this single method.
The Subscriber
interface defines the following 4 methods:
public interface Subscriber<T> {
void onSubscribe(Subscription subscription);
void onNext(T item);
void onError(Throwable throwable);
void onComplete();
}
The onSubscribe
method is called once the subscriber subscribes to a publisher and is provided an instance of a Subscription
.
The onNext
method is called to provide the subscriber with the next data element from the publisher.
The onError
method is called when an error occurs, and the publisher cannot produce more data for the subscriber.
The onComplete
method is called when the publisher has sent all the data to the publisher and no more data will be sent.
The Subscription
interface has 2 methods:
public interface Subscription {
void request(long n);
void cancel();
}
The request
method is called to let the Publisher know how many data elements to send to the subscriber and is the backbone of the Backpressure mechanism still to be discussed. The cancel
method allows the subscriber to signal to the publisher that it is not interested in the data anymore and that the publisher can stop sending data. The flow of data between these components are also well-defined by the specification.
Finally, and perhaps as expected, the Processor
interface defines no additional methods beyond those defined in the Subscriber
and Publisher
interfaces.
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
Flow of Data
The following sequence describes the flow of data between these components:
- The
subscribe
method is called on thePublisher
; - A
Subscription
object is created and theonSubscribe
method of theSubscriber
is called with thisSubscription
object; - To start receiving elements, the
Subscriber
calls therequest
method of theSubscription
indicating the number of elements it would like to receive; - The
Publisher
starts to send data to theSubscriber
using theonNext
method; This continues until one of the following happens:- The
Publisher
has no more elements to send and completes the subscription by calling theonComplete
method; or - An error occurs, and the publisher cancels the subscription by passing the error to the
onError
method; or - The publisher sends the subscriber the number of items requested, and the subscriber can either request more items by using the
request
method again or cancel the subscription by calling thecancel
method.
- The
Looking at this description of the components and how the data flows between them, you may be forgiven for thinking that it seems remarkably similar to the Observer pattern from the Gang of Four that you may already be familiar with. In this, you would be correct. There is however, one mechanism that is worth discussing that has already been hinted at several times: Back-pressure.
Back-pressure
One of the problems that can occur when building the kind of decoupled systems demanded by modern architectures, is that subscribers may not be able to keep up with the rate of data produced by publishers. In a reactive system, this is handled by allowing subscribers to specify to publishers how many elements they are able to handle and request more elements when they are ready to consume more. This places the control in the hands of the subscriber and thus ensures a more resilient system since a subscriber cannot be overwhelmed by a publisher producing data at a faster rate than it can handle.
Specifically, the request
method on the subscription method is always called to inform the publisher of the number of elements to be produced by the publisher for this subscriber. When no explicit request
call is made, an implicit call is still made informing the publisher to send an unbounded number of elements (technically the Long.MAX_VALUE
amount of elements). By periodically requesting the number of elements it can consume, the subscriber ensures that it is never given elements at a faster rate than it can process.
Now that we have covered the core interfaces and how they interact, we can turn our attention to the specific implementation provided by Project Reactor.
Project Reactor implementation
Project reactor provides 2 implementations of the Publisher
interface:
Mono
which supplies 0 or 1 element; andFlux
which supplies 0 to many items.
The reason the project provides 2 implementations of the Publisher
interface is more a matter of readability than anything else. This is because there are multiple occasions where a process can only produce a maximum of one element. It thus makes send to document this by returning a Mono
rather than a Flux
with only a single element.
Given this distinction, it makes sense to think of Mono
as a reactive version of a Java 8 Optional
and a Flux
as a reactive version of a List
. However, these reactive publishers do have one important distinction that sets them apart from their non-reactive “versions” That is, the publishers are lazy! That is to say that unless you call the subscribe
method these publishers will not produce any data.
Subscribing to a Publisher
In order to get data to flow from the publisher to the subscriber, you have to invoke one of the many versions of the subscribe
method. These include the following overloaded versions:
subscribe()
This first version of the subscribe method, which accept no arguments, is there simply to trigger the flow of data.
The second version of the subscribe
method allows us to provide a single Consumer
argument which is provided with the value sent to the onNext
method.
In the case of a Mono
this consumer will be invoked at most once whereas with a Flux
it may be invoked multiple times.
subscribe(Consumer<? super T> onNextConsumer)
The next version of the subscribe
method allows us to specify 2 consumers. Along with the consumer to handle the value provided by the
onNext
call, we can also provide a consumer to handle any error that may occur.
subscribe(
Consumer<? super T> onNextConsumer,
Consumer<? super t> errorConsumer
)
The next version fo the subscribe
method has the same 2 Consumer arguments as the first 2 parameters but accepts a third argument of type Runnable
.
This argument allows us to execute some code when the onComplete
method of the subscriber is called.
subscribe(
Consumer<? super T> onNextConsumer,
Consumer<? super t> errorConsumer,
Runnable completeConsumer
)
The final version of the subscribe
method adds a fourth parameter to the previous version which accepts another Consumer
.
This consumer allows us to perform some operations on the Subscription
we get when the onSubscribe
method of the subscription is invoked.
subscribe(
Consumer<? super T> c,
Consumer<? super t> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer>
)
As you can, the arguments to the various overloaded subscribe
methods map very closely to the 4 methods defined on the Subscriber
interface.
To get a better feel for these methods, let us focus on the simpler publisher: Mono
and see how we can construct a publisher to produce either a single or no value as well as how we can handle any errors that may occur.
Mono: Creating and subscribing
We can construct a Mono
that produces a value using the just
method.
@Test
internal fun `creating a Mono using just`() {
Mono.just("A")
.log()
.subscribe()
}
If we execute, the above we will see output similar to:
[Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
[Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
[Test worker] INFO reactor.Mono.Just.1 - | onNext(A)
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()
Looking at the output of this method we can see the invocations of all the methods discussed on the core interfaces. Firstly, we see that the onSubscribe
method was invoked with an implementation of the Subscription
interface. Then, we can see that even though the code we created did not explicitly make a call to the request
method we can see that the method has been invoked with an unbounded number. That occurs whenever we do not explicitly set the number of elements we want to receive from the publisher. We can also see that the onNext
method was invoked with the argument from our parameter to the just
method before concluding the data stream with a call to the onComplete
method.
Note that, all of these interactions only happened because we concluded our test with a call to the subscribe
method. If we remove/omit that call, then nothing would happen.
Incidentally, we were able to view this nice logging output to confirm our description of the data flow because we included the call to log
. This method can often come in useful when debugging when we want to see the order of interactions between our components.
In our first example, we provided no consumer to be able to handle the value from the publisher. We can change this by using a different overload of the subscribe
method:
@Test
fun `using the value from the consumer`() {
Mono.just("A")
.log()
.subscribe { println(it) }
}
The output confirms that between the invocation of the onNext
and onComplete
method, we can see the print statement:
[Test worker] INFO reactor.Mono.Just.1 - | onNext(A)
A
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()
Similarly, using another overload of the subscribe
method, we can run some code once the publisher has sent us all the items:
@Test
fun `running code on completion`() {
Mono.just("A")
.log()
.subscribe(
{ println(it) },
null,
{ println("Done!") }
)
}
Output:
[Test worker] INFO reactor.Mono.Just.1 - | onNext(A)
A
Done!
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()
Recall, that the consumer to be invoked on the completion is the third argument to teh subscribe
method.
Besides, creating a mono that produces a single value, we can also produce a Mono
that does not produce any value. We do this, using the empty
method:
@Test
internal fun `creating a Mono that does not produce a value`() {
Mono.empty<Unit>()
.log()
.subscribe()
}
The output confirms that there are no invocations of the onNext
method:
[Test worker] INFO reactor.Mono.Empty.4 - onSubscribe([Fuseable] Operators.EmptySubscription)
[Test worker] INFO reactor.Mono.Empty.4 - request(unbounded)
[Test worker] INFO reactor.Mono.Empty.4 - onComplete()
You may be wondering how creating a Mono that is empty could prove useful. An empty Mono
is useful in situations where you want to emulate a void
return type. For example, take the following contrived example:
@Test
internal fun `using an empty mono`() {
val result = if(System.currentTimeMillis() % 2 == 0L) {
Mono.just("A")
} else {
Mono.empty()
}
result
.log()
.subscribe()
}
Here we only create a value if the current time millis is even, otherwise we create an empty mono to represent the absence of a value.
We can also use the defaultIfEmpty
mono to supply a default value in cases where a Mono would produce an empty result:
@Test
fun `using defaultIfEmpty on an empty Mono`() {
Mono.empty<String>()
.defaultIfEmpty("B")
.log()
.subscribe { println("Got: $it") }
}
This produces:
[Test worker] INFO reactor.Mono.Just.1 - | onNext(B)
Got: B
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()
Besides, creating a Mono that a single value or none, we can also create a Mono that will produce an error:
@Test
fun `creating a Mono that produces an error`() {
Mono.error<RuntimeException>(RuntimeException("runtime error"))
.log()
.subscribe(
null
) { println("** ERROR: ${it.message}") }
}
Output:
[Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
[Test worker] INFO reactor.Mono.Error.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
[Test worker] INFO reactor.Mono.Error.1 - request(unbounded)
[Test worker] ERROR reactor.Mono.Error.1 - onError(java.lang.RuntimeException: runtime error)
[Test worker] ERROR reactor.Mono.Error.1 -
java.lang.RuntimeException: runtime error
at dev.hbrown.demo.MonoExamples.creating a Mono that produces an error(MonExamples.kt:65)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
// ... rest of stacktrace snipped ...
at java.base/java.lang.Thread.run(Thread.java:832)
** ERROR: runtime error
From the output above we can see that the error is thrown resulting in a stacktrace for the method.
We can also see that the error consumer has been invoked printing the message: ** ERROR: runtime error
to the console.
It turns out that we get the exact same behaviour for checked and unchecked exceptions:
@Test
fun `creating a Mono that produces a checked exception`() {
Mono.error<Exception>(Exception("checked exception"))
.log()
.subscribe(
null
) { println("** EXCEPTION: ${it.message}") }
}
The usual try/catch
mechanism is not used in reactive programming. Instead, as you have seen, you can use the error consumer to be able to get access to the exception. Furthermore, you can also replace an error with a different Mono:
@Test
fun `resuming after error with alternate Mono`() {
Mono.error<String>(Exception("error"))
.onErrorResume {
Mono.just("B")
}
.log()
.subscribe({
println("Next: $it")
}) {
println("Error: $it")
}
}
Output:
[Test worker] INFO reactor.Mono.OnErrorResume.6 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
[Test worker] INFO reactor.Mono.OnErrorResume.6 - request(unbounded)
[Test worker] INFO reactor.Mono.OnErrorResume.6 - onNext(B)
Next: B
[Test worker] INFO reactor.Mono.OnErrorResume.6 - onComplete()
Looking at the output above, you can see that the error has been swallowed and only the onNext
consumer has been invoked.
If we simply want to replace the value it may be more convenient to use the onErrorReturn
method:
@Test
fun `resuming after error with alternate value`() {
Mono.error<String>(Exception("error"))
.onErrorReturn("B")
.log()
.subscribe()
}
Output:
[Test worker] INFO reactor.Mono.OnErrorResume.4 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
[Test worker] INFO reactor.Mono.OnErrorResume.4 - request(unbounded)
[Test worker] INFO reactor.Mono.OnErrorResume.4 - onNext(B)
[Test worker] INFO reactor.Mono.OnErrorResume.4 - onComplete()
We get the same output as previously without having to artificially construct a Mono
to get the returned value.
Summary
In this post, we started exploring reactive programming using the Reactor project.
Specifically we looked at various ways we can construct a Mono
in Kotlin and how passing in different arguments to the subscribe
method allowed us to handle the different methods defined on the core interfaces defined by the Reactive Streams specification.
As usual, all the code examples are available on my Github repository.
If you prefer moving pictures to reading, check out accompanying video on my YouTube channel.
tags: kotlin - springboot - reactive - reactor