Spring Boot Starter : Chapter 4, monitoring aspects and asynchronous processing

TSBS Project repository and archives

As a shortcut, TSBS stands for Tooling Spring Boot Starter until I find a better name.

You can find TSBS sources at GitHub project page and this chapter archive under its own tag Chapter 4 archive

See Chapter 1 introduction to read more about this project.

Objectives

We now have some basics to monitor our Spring App. But we have to explicitly call monitoring methods, and this is a bit unefficient, moreover we could miss something.

In this chapter, we will use Spring AOP to bring automatic monitoring to our App. Aspect Oriented Programming applied to Spring components allows to intercept specific methods calls, as Spring Controller methods for instance.

Once done, we will apply the same logic to @Async methods with some modifitications to fit asynchronous processing requirements.

Check previous chapter if you missed it !

Controller methods interception

ControllerLogAspect is our Controller intercepor class. Annotated with @Aspect, it contains a method called when certain conditions are reached, as defined inside @Around annotation :

@Around("@annotation(org.springframework.web.bind.annotation.RequestMapping)"
 + " && !@annotation(net.kprod.tooling.spring.starter.annotation.MonitoringDisable)")

This Aspect will be called before @RequestMapping annotated methods unless it’s also annoted with @MonitoringDisable.

A method annotated as `@MonitoringDisable` will be skipped by monitoring system.

Our aspect code is quite simple and do the following :

  • Get the Controller name

  • Get the controller method name

  • Keep system time in milleseconds

  • Start the monitoring

  • Run the original method, using Object proceed = joinPoint.proceed();

  • End monitoring and log execution time

ControllerLogAspect class
public Object controllerInterceptor(ProceedingJoinPoint joinPoint) throws Throwable {
    //serviceName is composed of Controller name and Controller method name
    //get the method name
    String methodName = joinPoint.getSignature().getName();

    //get the controller name ; it could be either a RestController or a Controller
    String controllerName = null;
    Controller controllerAnnotation = joinPoint.getTarget().getClass().getAnnotation(Controller.class);
    RestController restControllerAnnotation = joinPoint.getTarget().getClass().getAnnotation(RestController.class);
    if(restControllerAnnotation != null) {
        controllerName = restControllerAnnotation.value();
    } else if(controllerAnnotation != null) {
        controllerName = controllerAnnotation.value();
    }
    if (StringUtils.isEmpty(controllerName)) {
        controllerName = joinPoint.getTarget().getClass().getSimpleName();
    }

    //keep current time
    long start = System.currentTimeMillis();
    //start monitoring
    monitoringService.start(controllerName, methodName);
    //execute controller method
    Object proceed = joinPoint.proceed();
    //get execution time
    Long elapsedTime = System.currentTimeMillis() - start;

    monitoringService.end(elapsedTime);
    return proceed;
}

In summary, this Aspect class intercepts Controller methods, apply monitoring features before the processing, and keep track of processing ending.

Monitoring is now automatically started and ended through controllerInterceptor aspect method. We can remove any explicit calls to MonitoringService.start(…​) and MonitoringService.end(), as in FooService.

A controller method without monitorService explicit calls
@RequestMapping(value = "/foo", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<Response> foo() throws Exception {
    //monitoringService.start(this.getClass().getSimpleName() + "#foo");
    Response response = fooService.foo();
    LOG.info("Response [{}]", response.getMessage());
    //monitoringService.end();
    return ResponseEntity.ok(response);
}

Asynchronous processing

Spring @Async

Spring provides a simple way to initiate asynchronous processing, using @Async annotation. See this Spring guide to know more.

In short : A method annotated with @Async will be detached to a new thread once called. It must be public and in a class outside its caller.

In our case, MDC context is lost (it belongs to the local thread), meaning that our Monitoring is lost too, as the following shows :

Testing @Async
//caller service class
@Override
public Response bar() throws ServiceException {
    LOG.info("bar method");
    asyncService.asyncProcess();
    return new Response("processing");
}

//async service class
@Async
@Override
public void asyncProcess() throws ServiceException {
    LOG.info("asyncProcess method");
    try{
        Thread.sleep(2000);
    } catch (Exception e) {
        LOG.error("sleep have gone wrong");
    }
    LOG.info("async processing completed");
}
Log with lost Async monitoring context
[...] [a9743c7fea49] [ControllerRoot.bar] [] Start monitoring processName [ControllerRoot.bar] id [5a94a1f0-f66c-4071-8392-a9743c7fea49]
[...] [a9743c7fea49] [ControllerRoot.bar] [] bar method
[...] [a9743c7fea49] [ControllerRoot.bar] [] Response [processing]
[...] [] [] [] asyncProcess method
[...] [a9743c7fea49] [ControllerRoot.bar] [] End monitoring processName [ControllerRoot.bar] id [5a94a1f0-f66c-4071-8392-a9743c7fea49] took [6737] ms
[...] [] [] [] async processing completed

Futures

We use java Future capabilities with a CompletableFuture. We use them to gain control over the end of an async process.

@Async Supplier

In order to keep monitoring active when detaching our async method, we need to re-create MDC context once async thread is live. We can’t do this transparently, but we provide a simple way to achieve this, with SupplyAsync.

SupplyAsync is a Supplier functional interface, which is use to keep track of our monitoring context when detaching a process when calling an @Async method. It’s constructor signature is the following : SupplyAsync(MonitoringService monitoringService, MonitoringData monitorData, AsyncRunnable runnable), with :

  • MonitoringService instance

  • current MonitoringData

  • async method to execute as a AsyncRunnable, which is a Runnable.

Usage as in AsyncService :

Call @Async method using SupplyAsync
public CompletableFuture<AsyncResult> asyncProcess(MonitoringData monitoringData) throws ServiceException {
    SupplyAsync sa = new SupplyAsync(monitoringService, monitoringData, () -> runAsyncProcess());
    return CompletableFuture.supplyAsync(sa);
}

SupplyAsync is a wrapper to make the use of CompletableFuture.supplyAsync easier when applied to our Monitoring features.

SupplyAsync class
public class SupplyAsync implements Supplier {
    private MonitoringData monitorData;
    private AsyncRunnable runnable;
    private MonitoringService monitoringService;

    public SupplyAsync(MonitoringService monitoringService, MonitoringData monitorData, AsyncRunnable runnable) throws ServiceException {
        if(monitoringService == null ||
                monitorData == null ||
                runnable == null) {
            //FIXME
            throw new ServiceException("All parameters must be defined");
        }
        this.monitoringService = monitoringService;
        this.monitorData = monitorData;
        this.runnable = runnable;
    }

    @Override
    public Object get() {
        monitoringService.keep(monitorData,"async");
        long start = System.currentTimeMillis();

        try {
            runnable.runThrows();
        } catch (Exception e) {
            return AsyncResult.failure(System.currentTimeMillis() - start, e);
        }

        return AsyncResult.success(System.currentTimeMillis() - start);
    }
}

@Async interceptor

We will write another Aspect interceptor, ControllerLogAsyncAspect.

Its purpose is to intercept @Async annotated methods, unless @MonitoringDisable is set, and wait for the process to complete, using Completable<AsyncResult> as a result to the execution. This allows to mark the end of the monitoring, unless something as failed.

ControllerLogAsyncAspect class
@Around("@annotation(org.springframework.scheduling.annotation.Async)"
        + " && !@annotation(net.kprod.tooling.spring.starter.annotation.MonitoringDisable)")
public void asyncMethodInterceptor(ProceedingJoinPoint joinPoint) throws Throwable {
    Object proceed = joinPoint.proceed();
    if(proceed instanceof CompletableFuture) {
        CompletableFuture future = (CompletableFuture) proceed;
        future.thenAccept(futureReturnValue -> {

            if (futureReturnValue instanceof AsyncResult) {
                AsyncResult result = (AsyncResult) futureReturnValue;
                if(result.isSuccessful()) {
                    monitoringService.end(result.getRunTime());
                } else {
                    monitoringService.processException(result.getException());
                }
            }
        });
    } else {
        LOG.error("Async method does not return CompletableFuture");
    }
}

@Async methods

Our async methods may return a Completable<AsyncResult> in order to be processed by ControllerLogAsyncAspect interceptor.

Asynchronous testing

Setup

A new service is created AsyncService.

We defined two methods :

  • asyncProcess, a nominal test, mapped to /bar

  • asyncProcessError, a failure test mapped to /barError

We’ve also modified our main controller ControllerRoot and FooService to map test requests.

Note that each async call use SupplyAsync.

AsyncService class
@Async
@Override
public CompletableFuture<AsyncResult> asyncProcess(MonitoringData monitoringData) throws ServiceException {
    SupplyAsync sa = new SupplyAsync(monitoringService, monitoringData, () -> runAsyncProcess());
    return CompletableFuture.supplyAsync(sa);
}

@Async
@Override
public CompletableFuture<AsyncResult> asyncProcessError(MonitoringData monitoringData) throws ServiceException {
    SupplyAsync sa = new SupplyAsync(monitoringService, monitoringData, () -> runAsyncProcessError());
    return CompletableFuture.supplyAsync(sa);
}

private void runAsyncProcess() {
    //...
}

private void runAsyncProcessError() {
    throw new NullPointerException("bang");
}

Nominal test

Request localhost:8080/bar will end nicely :

Async log
[02d8de4b6c2a] [ControllerRoot.bar] [] Start monitoring processName [ControllerRoot.bar] id [8dc95c84-c1ba-4662-bbdf-02d8de4b6c2a]
[02d8de4b6c2a] [ControllerRoot.bar] [] bar method
[02d8de4b6c2a] [ControllerRoot.bar] [async] Start monitoring processName [ControllerRoot.bar] id [8dc95c84-c1ba-4662-bbdf-02d8de4b6c2a]
[02d8de4b6c2a] [ControllerRoot.bar] [async] asyncProcess method
[02d8de4b6c2a] [ControllerRoot.bar] [] Response [processing]
[02d8de4b6c2a] [ControllerRoot.bar] [] End monitoring processName [ControllerRoot.bar] id [8dc95c84-c1ba-4662-bbdf-02d8de4b6c2a] took [545] ms
[02d8de4b6c2a] [ControllerRoot.bar] [async] async processing completed
[02d8de4b6c2a] [ControllerRoot.bar] [async] End monitoring processName [ControllerRoot.bar] id [8dc95c84-c1ba-4662-bbdf-02d8de4b6c2a] took [2001] ms

Note that client response is sent before async process.

Failure test

Async failure log
[2c146651256a] [ControllerRoot.barError] [] Start monitoring processName [ControllerRoot.barError] id [b5cc5cc9-7125-4260-add2-2c146651256a]
[2c146651256a] [ControllerRoot.barError] [] bar method
[2c146651256a] [ControllerRoot.barError] [async] Start monitoring processName [ControllerRoot.barError] id [b5cc5cc9-7125-4260-add2-2c146651256a]
[2c146651256a] [ControllerRoot.barError] [async] asyncProcess method
[2c146651256a] [ControllerRoot.barError] [] Response [processing]
[2c146651256a] [ControllerRoot.barError] [] End monitoring processName [ControllerRoot.barError] id [b5cc5cc9-7125-4260-add2-2c146651256a] took [504] ms
[2c146651256a] [ControllerRoot.barError] [async] Exception message [bang]
[2c146651256a] [ControllerRoot.barError] [async] java.lang.NullPointerException: bang
        at net.kprod.tooling.spring.app.service.impl.AsyncServiceImpl.runAsyncProcessError(AsyncServiceImpl.java:61)
        at net.kprod.tooling.spring.app.service.impl.AsyncServiceImpl.lambda$asyncProcessError$1(AsyncServiceImpl.java:34)
        at net.kprod.tooling.spring.starter.async.SupplyAsync.get(SupplyAsync.java:49)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1771)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1763)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

Note that client response is still sent ; our async method fails with its proper stacktrace.

Summary of this chapter

We simplified our monitoring using @Aspect interceptor, and we also provide a quite simple way to set-up monitoring in @Async context.

We test those new cases with our tooling-app embed test Application, with a satisfying logging.

To be continued…​

In chapter 5…​ We’ll see that later :)

All available chapters

  • chapter 1 : Custom Spring Boot 2 Starter with Spring 5, chapter 1

  • chapter 2 : Spring Boot Starter : Chapter 2, error handling

  • chapter 3 : Spring Boot Starter : Chapter 3, logger and process unique id

  • chapter 4 : Spring Boot Starter : Chapter 4, aspects and async processing

comments powered by Disqus