hbrown.dev

Welcome to my developer page. Contact me on: henry.g.brown@hey.com

View on GitHub
25 April 2021

Getting Started with Reactive Programming using Project Reactor: Kotlin Mono Examples

by Henry Brown

If you were convinced by my last post: “Why you may want to consider building your next API reactively” you may be interested to step up you reactive game by learning a reactive library. In this post, I would like to start that journey by looking at Project Reactor, one of the most popular reactive implementations in the Java World. It is also the default reactive library for the Spring framework.

Of course, there are many other implementations such as RxJava (which itself is a JVM implementation of the Reactive extensions project) and Akka, and Vert.x. Bringing all these implementations together is the Reactive Streams initiative which defines the Reactive Streams Specification. This provides a good jumping off point to start talking about reactive programming in the Java world.

Reactive Streams Specification

This specification provides a set of standard interfaces to provide asynchronous, non-blocking communication with an additional capability called: Back-pressure. The specification had input from many organisations that have knowledge in streaming data such as Lightbend, Netflix and Twitter. The specification limits itself to defining how a stream of data can be moved between the interfaces it defines without concerning itself with any operations that you may want to perform on that data. Implementors of the specification are free to add additional capabilities on top of these interfaces to handle operations that may be performed on top of the stream of data. to this end, the specification defines a set of core interfaces.

The Core Interfaces

The following core interfaces are defined by the specification:

  1. Publisher - a component that can produce/send data to any number of subscribers;
  2. Subscriber - a component that can receive data from a publisher;
  3. Subscription - a connection between a publisher and subscriber that specifies the rate that data can be sent from the publisher to the subscriber; and
  4. Processor - a component that is both a publisher, and a subscriber at the same time.

Most of the time, you are only concerned with using the first 3 with Processors mostly implemented by the implementors of the specification. These interfaces are also very simply defined. A Publisher defines only a single method:

public interface Publisher<T> {
   void subscribe(Subscriber<? super T> subscriber);
}

The Publisher only allows a subscriber to register itself with a publisher by calling this single method. The Subscriber interface defines the following 4 methods:

public interface Subscriber<T> {
   void onSubscribe(Subscription subscription);
   void onNext(T item);
   void onError(Throwable throwable);
   void onComplete();
}

The onSubscribe method is called once the subscriber subscribes to a publisher and is provided an instance of a Subscription. The onNext method is called to provide the subscriber with the next data element from the publisher. The onError method is called when an error occurs, and the publisher cannot produce more data for the subscriber. The onComplete method is called when the publisher has sent all the data to the publisher and no more data will be sent.

The Subscription interface has 2 methods:

public interface Subscription {
    void request(long n);
    void cancel();
}

The request method is called to let the Publisher know how many data elements to send to the subscriber and is the backbone of the Backpressure mechanism still to be discussed. The cancel method allows the subscriber to signal to the publisher that it is not interested in the data anymore and that the publisher can stop sending data. The flow of data between these components are also well-defined by the specification.

Finally, and perhaps as expected, the Processor interface defines no additional methods beyond those defined in the Subscriber and Publisher interfaces.

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

Flow of Data

The following sequence describes the flow of data between these components:

  1. The subscribe method is called on the Publisher;
  2. A Subscription object is created and the onSubscribe method of the Subscriber is called with this Subscription object;
  3. To start receiving elements, the Subscriber calls the request method of the Subscription indicating the number of elements it would like to receive;
  4. The Publisher starts to send data to the Subscriber using the onNext method; This continues until one of the following happens:
    1. The Publisherhas no more elements to send and completes the subscription by calling the onCompletemethod; or
    2. An error occurs, and the publisher cancels the subscription by passing the error to the onError method; or
    3. The publisher sends the subscriber the number of items requested, and the subscriber can either request more items by using the requestmethod again or cancel the subscription by calling the cancelmethod.

Looking at this description of the components and how the data flows between them, you may be forgiven for thinking that it seems remarkably similar to the Observer pattern from the Gang of Four that you may already be familiar with. In this, you would be correct. There is however, one mechanism that is worth discussing that has already been hinted at several times: Back-pressure.

Back-pressure

One of the problems that can occur when building the kind of decoupled systems demanded by modern architectures, is that subscribers may not be able to keep up with the rate of data produced by publishers. In a reactive system, this is handled by allowing subscribers to specify to publishers how many elements they are able to handle and request more elements when they are ready to consume more. This places the control in the hands of the subscriber and thus ensures a more resilient system since a subscriber cannot be overwhelmed by a publisher producing data at a faster rate than it can handle.

Specifically, the request method on the subscription method is always called to inform the publisher of the number of elements to be produced by the publisher for this subscriber. When no explicit request call is made, an implicit call is still made informing the publisher to send an unbounded number of elements (technically the Long.MAX_VALUE amount of elements). By periodically requesting the number of elements it can consume, the subscriber ensures that it is never given elements at a faster rate than it can process.

Now that we have covered the core interfaces and how they interact, we can turn our attention to the specific implementation provided by Project Reactor.

Project Reactor implementation

Project reactor provides 2 implementations of the Publisherinterface:

  1. Mono which supplies 0 or 1 element; and
  2. Flux which supplies 0 to many items.

The reason the project provides 2 implementations of the Publisherinterface is more a matter of readability than anything else. This is because there are multiple occasions where a process can only produce a maximum of one element. It thus makes send to document this by returning a Mono rather than a Fluxwith only a single element.

Given this distinction, it makes sense to think of Mono as a reactive version of a Java 8 Optionaland a Flux as a reactive version of a List. However, these reactive publishers do have one important distinction that sets them apart from their non-reactive “versions” That is, the publishers are lazy! That is to say that unless you call the subscribemethod these publishers will not produce any data.

Subscribing to a Publisher

In order to get data to flow from the publisher to the subscriber, you have to invoke one of the many versions of the subscribemethod. These include the following overloaded versions:

subscribe()

This first version of the subscribe method, which accept no arguments, is there simply to trigger the flow of data.

The second version of the subscribe method allows us to provide a single Consumer argument which is provided with the value sent to the onNext method. In the case of a Mono this consumer will be invoked at most once whereas with a Flux it may be invoked multiple times.

subscribe(Consumer<? super T> onNextConsumer)

The next version of the subscribe method allows us to specify 2 consumers. Along with the consumer to handle the value provided by the onNext call, we can also provide a consumer to handle any error that may occur.

subscribe(
	Consumer<? super T> onNextConsumer,
	Consumer<? super t> errorConsumer
)

The next version fo the subscribe method has the same 2 Consumer arguments as the first 2 parameters but accepts a third argument of type Runnable. This argument allows us to execute some code when the onComplete method of the subscriber is called.

subscribe(
	Consumer<? super T> onNextConsumer,
	Consumer<? super t> errorConsumer,
	Runnable completeConsumer
)

The final version of the subscribe method adds a fourth parameter to the previous version which accepts another Consumer. This consumer allows us to perform some operations on the Subscription we get when the onSubscribe method of the subscription is invoked.

subscribe(
	Consumer<? super T> c,
	Consumer<? super t> errorConsumer,
	Runnable completeConsumer,
	Consumer<? super Subscription> subscriptionConsumer>
)

As you can, the arguments to the various overloaded subscribe methods map very closely to the 4 methods defined on the Subscriber interface. To get a better feel for these methods, let us focus on the simpler publisher: Mono and see how we can construct a publisher to produce either a single or no value as well as how we can handle any errors that may occur.

Mono: Creating and subscribing

We can construct a Mono that produces a value using the just method.

@Test
internal fun `creating a Mono using just`() {
   Mono.just("A")
       .log()
       .subscribe()
}

If we execute, the above we will see output similar to:

[Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
[Test worker] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[Test worker] INFO reactor.Mono.Just.1 - | request(unbounded)
[Test worker] INFO reactor.Mono.Just.1 - | onNext(A)
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()

Looking at the output of this method we can see the invocations of all the methods discussed on the core interfaces. Firstly, we see that the onSubscribe method was invoked with an implementation of the Subscription interface. Then, we can see that even though the code we created did not explicitly make a call to the requestmethod we can see that the method has been invoked with an unbounded number. That occurs whenever we do not explicitly set the number of elements we want to receive from the publisher. We can also see that the onNext method was invoked with the argument from our parameter to the just method before concluding the data stream with a call to the onComplete method.

Note that, all of these interactions only happened because we concluded our test with a call to the subscribe method. If we remove/omit that call, then nothing would happen.

Incidentally, we were able to view this nice logging output to confirm our description of the data flow because we included the call to log. This method can often come in useful when debugging when we want to see the order of interactions between our components.

In our first example, we provided no consumer to be able to handle the value from the publisher. We can change this by using a different overload of the subscribe method:

    @Test
fun `using the value from the consumer`() {
   Mono.just("A")
       .log()
       .subscribe { println(it) }
}

The output confirms that between the invocation of the onNext and onComplete method, we can see the print statement:

[Test worker] INFO reactor.Mono.Just.1 - | onNext(A)
A
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()

Similarly, using another overload of the subscribe method, we can run some code once the publisher has sent us all the items:

    @Test
    fun `running code on completion`() {
        Mono.just("A")
            .log()
            .subscribe(
                { println(it) },
                null,
                { println("Done!") }
            )
    }

Output:

[Test worker] INFO reactor.Mono.Just.1 - | onNext(A)
A
Done!
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()

Recall, that the consumer to be invoked on the completion is the third argument to teh subscribe method.

Besides, creating a mono that produces a single value, we can also produce a Mono that does not produce any value. We do this, using the empty method:

   @Test
   internal fun `creating a Mono that does not produce a value`() {
      Mono.empty<Unit>()
          .log()
          .subscribe()
   }

The output confirms that there are no invocations of the onNext method:

[Test worker] INFO reactor.Mono.Empty.4 - onSubscribe([Fuseable] Operators.EmptySubscription)
[Test worker] INFO reactor.Mono.Empty.4 - request(unbounded)
[Test worker] INFO reactor.Mono.Empty.4 - onComplete()

You may be wondering how creating a Mono that is empty could prove useful. An empty Mono is useful in situations where you want to emulate a void return type. For example, take the following contrived example:

    @Test
    internal fun `using an empty mono`() {
        val result = if(System.currentTimeMillis() % 2 == 0L) {
            Mono.just("A")
        } else {
            Mono.empty()
        }

        result
            .log()
            .subscribe()
    }

Here we only create a value if the current time millis is even, otherwise we create an empty mono to represent the absence of a value.

We can also use the defaultIfEmpty mono to supply a default value in cases where a Mono would produce an empty result:

@Test
fun `using defaultIfEmpty on an empty Mono`() {
   Mono.empty<String>()
      .defaultIfEmpty("B")
      .log()
      .subscribe { println("Got: $it") }
}

This produces:

[Test worker] INFO reactor.Mono.Just.1 - | onNext(B)
Got: B
[Test worker] INFO reactor.Mono.Just.1 - | onComplete()

Besides, creating a Mono that a single value or none, we can also create a Mono that will produce an error:

@Test
fun `creating a Mono that produces an error`() {
   Mono.error<RuntimeException>(RuntimeException("runtime error"))
      .log()
      .subscribe(
         null
      ) { println("** ERROR: ${it.message}") }
}

Output:

[Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
[Test worker] INFO reactor.Mono.Error.1 - onSubscribe([Fuseable] Operators.EmptySubscription)
[Test worker] INFO reactor.Mono.Error.1 - request(unbounded)
[Test worker] ERROR reactor.Mono.Error.1 - onError(java.lang.RuntimeException: runtime error)
[Test worker] ERROR reactor.Mono.Error.1 - 
java.lang.RuntimeException: runtime error
	at dev.hbrown.demo.MonoExamples.creating a Mono that produces an error(MonExamples.kt:65)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
// ... rest of stacktrace snipped ...
	at java.base/java.lang.Thread.run(Thread.java:832)
** ERROR: runtime error

From the output above we can see that the error is thrown resulting in a stacktrace for the method. We can also see that the error consumer has been invoked printing the message: ** ERROR: runtime error to the console.

It turns out that we get the exact same behaviour for checked and unchecked exceptions:

@Test
fun `creating a Mono that produces a checked exception`() {
   Mono.error<Exception>(Exception("checked exception"))
      .log()
      .subscribe(
         null
      ) { println("** EXCEPTION: ${it.message}") }
}

The usual try/catch mechanism is not used in reactive programming. Instead, as you have seen, you can use the error consumer to be able to get access to the exception. Furthermore, you can also replace an error with a different Mono:

@Test
fun `resuming after error with alternate Mono`() {
   Mono.error<String>(Exception("error"))
      .onErrorResume {
         Mono.just("B")
      }
      .log()
      .subscribe({
         println("Next: $it")
      }) {
         println("Error: $it")
      }
}

Output:

[Test worker] INFO reactor.Mono.OnErrorResume.6 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
[Test worker] INFO reactor.Mono.OnErrorResume.6 - request(unbounded)
[Test worker] INFO reactor.Mono.OnErrorResume.6 - onNext(B)
Next: B
[Test worker] INFO reactor.Mono.OnErrorResume.6 - onComplete()

Looking at the output above, you can see that the error has been swallowed and only the onNext consumer has been invoked. If we simply want to replace the value it may be more convenient to use the onErrorReturn method:

@Test
fun `resuming after error with alternate value`() {
   Mono.error<String>(Exception("error"))
      .onErrorReturn("B")
      .log()
      .subscribe()
}

Output:

[Test worker] INFO reactor.Mono.OnErrorResume.4 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
[Test worker] INFO reactor.Mono.OnErrorResume.4 - request(unbounded)
[Test worker] INFO reactor.Mono.OnErrorResume.4 - onNext(B)
[Test worker] INFO reactor.Mono.OnErrorResume.4 - onComplete()

We get the same output as previously without having to artificially construct a Mono to get the returned value.

Summary

In this post, we started exploring reactive programming using the Reactor project. Specifically we looked at various ways we can construct a Mono in Kotlin and how passing in different arguments to the subscribe method allowed us to handle the different methods defined on the core interfaces defined by the Reactive Streams specification.

As usual, all the code examples are available on my Github repository.

If you prefer moving pictures to reading, check out accompanying video on my YouTube channel.

tags: kotlin - springboot - reactive - reactor