Reactive microservices development with Micronaut

There is some fresh air in the microservice development stack for developers. Java developers usually look at Spring Boot which provides a platform for building production grade services that helps implement microservice architecture and targets Spring developers. There are couple new frameworks to look for that could shape up this landscape in the coming years. Micronaut launched a year ago looks very promising and Quarkus launched this month is something to watch out for. The major difference I see with Quarkus is it embraces Eclipse MicroProfile standards, and uses programming model familiar with Enterprise Java developers (CDI/JAX-RS/Servlets).

There are tons of resources available about Micronaut, so my goal is not to make one that introduces it. One thing I am impressed with Micronaut is their official documentation which is comprehensive and their support in Gitter is super helpful.

I am going to focus on reactive services development in Micronaut in this post using RxJava. I have been wanting to learn RxJava for sometime, Micronaut makes it a joy to learn this programming model and building non-blocking services could not be simpler.

Let us implement a simple map service which has a Geo location service and a direction service. The interfaces for map service can be implemented in Micronaut.

Here is an example GeoClient interface which takes a place input and returns lat-lng data wrapped in a RxJava Single.

import io.micronaut.http.annotation.Get;
import io.micronaut.http.client.annotation.Client;
import io.reactivex.Single;

@Client("/")
public interface GeoClient {

    @Get("/places/{place}")
    Single<LatLng&gt; getLatLng(String place);
}

Here is an example DirectionClient interface which takes a map provider (apple or google maps) and pair of lat-lng data for source and destination and it returns a list of directions wrapped in a RxJava Single.

import io.micronaut.http.annotation.Post;
import io.micronaut.http.client.annotation.Client;
import io.reactivex.Single;

import javax.validation.constraints.NotBlank;
import java.util.List;

@Client("/")
public interface DirectionClient {

    @Post("/directions/{provider}")
    Single<List<Direction&gt;&gt; getDirections(@NotBlank MapProvider provider, LatLngPair latLngPair);
}

Micronaut services are implemented in a controller backed by a service which has the implementation details.

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.reactivex.Single;

@Controller
public class GeoController {

    private final GeoService geoService;

    public GeoController(GeoService geoService) {
        this.geoService = geoService;
    }

    @Get("/places/{place}")
    public Single<LatLng&gt; getLatLng(Places place) {
        return geoService.getLatLng(place);
    }
}

The backend service returns data from the provider. The sample data contains just 2 places: apple and google headquarters.

import io.reactivex.Single;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Singleton;

@Singleton
public class GeoService {

    private static final Logger LOG = LoggerFactory.getLogger(GeoService.class);

    public Single<LatLng&gt; getLatLng(Places place) {
        LOG.info("Fetching latlng for {}", place);
        if (place.equals(Places.apple)) {
            return Single.just(MapsSampleData.apple);
        } else if (place.equals(Places.google)) {
            return Single.just(MapsSampleData.google);
        } else {
            return Single.error(new RuntimeException("Invalid place. Supported places are apple and google"));
        }
    }
}

The direction controller has a similar setup as geo controller.

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.reactivex.Single;

import javax.validation.constraints.NotBlank;
import java.util.List;

@Controller
public class DirectionController {

    private final DirectionService directionService;

    public DirectionController(DirectionService directionService) {
        this.directionService = directionService;
    }

    @Post("/directions/{provider}")
    public Single<List<Direction&gt;&gt; getDirections(@NotBlank String provider, LatLngPair latLngPair) {
        return directionService.getDirections(provider, latLngPair);
    }

}

Direction service provides sample data set for our pre-defined places.

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.reactivex.Single;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Singleton;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static maps.service.MapsSampleData.appleToGoogle;
import static maps.service.MapsSampleData.googleToApple;

@Singleton
public class DirectionService {

    private static final Logger LOG = LoggerFactory.getLogger(DirectionService.class);

    private static ObjectMapper mapper = new ObjectMapper();

    private Map<LatLngPair, String&gt; appleDirections = new HashMap<&gt;();
    private Map<LatLngPair, String&gt; googleDirections = new HashMap<&gt;();

    public DirectionService() {
        LOG.info("Initializing sample data for maps ...");
        appleDirections.put(appleToGoogle, MapsSampleData.APPLE_MAPS_AAPL_HQ_TO_GOOG_HQ);
        appleDirections.put(googleToApple, MapsSampleData.APPLE_MAPS_GOOGL_HQ_TO_AAPL_HQ);
        googleDirections.put(appleToGoogle, MapsSampleData.GOOGLE_MAPS_AAPL_HQ_TO_GOOG_HQ);
        googleDirections.put(googleToApple, MapsSampleData.GOOGLE_MAPS_GOOG_HQ_TO_AAPL_HQ);
    }


    public Single<List<Direction&gt;&gt; getDirections(String provider, LatLngPair latLngPair) {

        LOG.info("Requesting directions from provider {} for co-ordinates : {}", provider, latLngPair);
        // mock up the response
        String json = provider.equals(MapProvider.google) ? googleDirections.get(latLngPair) : appleDirections.get(latLngPair);

        List<Direction&gt; directions;

        try {
            directions = mapper.readValue(json, new TypeReference<List<Direction&gt;&gt;() {
            });
        } catch (IOException e) {
            LOG.error("Failed to bind json", e);
            return Single.error(e);
        }

        return Single.just(directions);
    }
}

Now that we have 2 services, we can compose these services in our MapController backed by MapService, it implements different options for a map service : get directions/shortest/fastest. Micronaut programming model is simple and provides support for injection and validation for the beans.

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.reactivex.Single;

import javax.validation.constraints.NotBlank;
import java.util.List;

@Controller("/maps")
public class MapController {

    private final MapService mapService;

    public MapController(MapService mapService) {
        this.mapService = mapService;
    }

    @Get("/{provider}")
    public Single<List<Direction&gt;&gt; map(@NotBlank MapProvider provider, @NotBlank @QueryValue String src, @NotBlank @QueryValue String dest) {
        return mapService.map(provider, src, dest);
    }

    @Get("/shortest")
    public Single<Directions&gt; shortest(@NotBlank @QueryValue String src, @NotBlank @QueryValue String dest) {
        return mapService.shortest(src, dest);
    }

    @Get("/fastest")
    public Single<Directions&gt; fastest(@NotBlank @QueryValue String src, @NotBlank @QueryValue String dest) {
        return mapService.fastest(src, dest);
    }
}

The meat of the controller is in its service which builds a chain of reactive calls, which offers a complete non-blocking implementation executed via Netty’s event loop model.

import io.micronaut.http.annotation.QueryValue;
import io.reactivex.Single;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Singleton;
import javax.validation.constraints.NotBlank;
import java.util.List;
import java.util.Optional;

@Singleton
public class MapService {

    private static final Logger LOG = LoggerFactory.getLogger(MapService.class);

    private final GeoClient geoClient;
    private final DirectionClient directionClient;

    public MapService(GeoClient geoClient, DirectionClient directionClient) {
        this.geoClient = geoClient;
        this.directionClient = directionClient;
    }

    public Single<List<Direction&gt;&gt; map(@NotBlank MapProvider provider, @NotBlank @QueryValue String src, @NotBlank @QueryValue String dest) {
        LOG.info("Mapping directions from {} to {} using provider {}", src, dest, provider);
        return getLatLngPair(src, dest).flatMap(pair -&gt; directionClient.getDirections(provider, pair));
    }

    public Single<Directions&gt; shortest(@NotBlank @QueryValue String src, @NotBlank @QueryValue String dest) {
        LOG.info("Finding shortest route from {} to {}", src, dest);
        return getLatLngPair(src, dest).flatMap(pair -&gt; findShortestRoute(directionClient.getDirections(MapProvider.google, pair), directionClient.getDirections(MapProvider.apple, pair)));
    }

    public Single<Directions&gt; fastest(@NotBlank @QueryValue String src, @NotBlank @QueryValue String dest) {
        LOG.info("Finding fastest route from {} to {}", src, dest);
        return getLatLngPair(src, dest).flatMap(pair -&gt; findFastestRoute( directionClient.getDirections(MapProvider.google, pair), directionClient.getDirections(MapProvider.apple, pair)));
    }

    private Single<LatLngPair&gt; getLatLngPair(String src, String dest) {
        return Single.zip(geoClient.getLatLng(src), geoClient.getLatLng(dest), LatLngPair::new);
    }

    private Single<Directions&gt; findShortestRoute(Single<List<Direction&gt;&gt; googleMapsDirections, Single<List<Direction&gt;&gt; appleMapsDirections) {
        return Single.zip(googleMapsDirections, appleMapsDirections, (googleDirections, appleDirections) -&gt; {
                    double totalGoogleDistance = googleDirections.stream().mapToDouble(Direction::getDistanceInMiles).sum();
                    double totalAppleDistance = appleDirections.stream().mapToDouble(Direction::getDistanceInMiles).sum();
                    if (totalAppleDistance < totalGoogleDistance) {
                        return new Directions(appleDirections, Optional.of(totalAppleDistance), Optional.empty());
                    } else {
                        return new Directions(googleDirections, Optional.of(totalGoogleDistance), Optional.empty());
                    }
                }
        );
    }

    private Single<Directions&gt; findFastestRoute(Single<List<Direction&gt;&gt; googleMapsDirections, Single<List<Direction&gt;&gt; appleMapsDirections) {
        return Single.zip(googleMapsDirections, appleMapsDirections, (googleDirections, appleDirections) -&gt; {
                    long totalGoogleTime = googleDirections.stream().mapToLong(Direction::getTimeInMinutes).sum();
                    long totalAppleTime = appleDirections.stream().mapToLong(Direction::getTimeInMinutes).sum();
                    if (totalAppleTime < totalGoogleTime) {
                        return new Directions(appleDirections, Optional.empty(), Optional.of(totalAppleTime));
                    } else {
                        return new Directions(googleDirections, Optional.empty(), Optional.of(totalGoogleTime));
                    }
                }
        );
    }
}

Micronaut provides a test framework so services can be tested easily end to end. Here is a spock test that tests our map service implementation.

import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpResponse
import io.micronaut.http.client.RxHttpClient
import io.micronaut.runtime.server.EmbeddedServer
import io.micronaut.test.annotation.MicronautTest
import io.reactivex.Flowable
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification

import javax.inject.Inject

@MicronautTest
class MapControllerSpec extends Specification {

    @Inject
    EmbeddedServer embeddedServer

    @Shared
    @AutoCleanup
    RxHttpClient client

    void setup() {
        client = embeddedServer.applicationContext.createBean(RxHttpClient, embeddedServer.getURL())
    }

    void map() {
        given:
        HttpResponse<List<Direction&gt;&gt; response = client.toBlocking().exchange("/google?src=apple&amp;dest=google", List.class)

        expect:
        !response.body().empty
    }

    void shortest() {
        given:
        HttpResponse<Directions&gt; response = client.toBlocking().exchange("/shortest?src=apple&amp;dest=google", Directions.class)

        expect:
        !response.body().directions.empty
        response.body().timeInMinutes == null
        response.body().distanceInMiles.get() == 8.5
    }

    void fastest() {
        given:
        HttpResponse<Directions&gt; response = client.toBlocking().exchange("/fastest?src=apple&amp;dest=google", Directions.class)

        expect:
        !response.body().directions.empty
        response.body().distanceInMiles == null
        response.body().timeInMinutes.get() == 12
    }

    void "test reactive get shortest route"() {
        when:
        Flowable<HttpResponse<Directions&gt;&gt; call = client.exchange(HttpRequest.GET("/shortest?src=google&amp;dest=apple"), Directions.class)

        then:
        HttpResponse<Directions&gt; response = call.blockingFirst()
        Optional<Directions&gt; directions = response.getBody(Directions.class)
        directions.isPresent()
        !directions.get().directions.empty
        directions.get().timeInMinutes == null
        directions.get().distanceInMiles.get() == 9.0
    }

    void "test reactive get fastest route"() {
        when:
        Flowable<HttpResponse<Directions&gt;&gt; call = client.exchange(HttpRequest.GET("/fastest?src=google&amp;dest=apple"), Directions.class)

        then:
        HttpResponse<Directions&gt; response = call.blockingFirst()
        Optional<Directions&gt; directions = response.getBody(Directions.class)
        directions.isPresent()
        !directions.get().directions.empty
        directions.get().distanceInMiles == null
        directions.get().timeInMinutes.get() == 11
    }
}

Micronaut provides a mechanism to scale these services by introducing a gateway in front of these services and integrates with consul for discovery and prometheus for monitoring. I can’t wait to explore that in the next post.

Source code for the same application is available in GitHub.

2 thoughts on “Reactive microservices development with Micronaut

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s