Real code example: Spring boot with OpenFeign client, Hystrix and Spring Service Locator pattern — Part 3
First part: Spring service locator pattern
https://sunhong22187.medium.com/real-code-spring-boot-with-openfeign-client-hystrix-and-spring-service-locator-pattern-part-1-19b089d4fe56
Second part: OpenFeign client
https://sunhong22187.medium.com/real-code-example-spring-boot-with-openfeign-client-hystrix-and-spring-service-locator-pattern-1f0e59012348
Hystrix Command
In this article, we are gonna use Hystrix command for building asynchronous Feign client call and reactive Feign client call.
References:
https://github.com/Netflix/Hystrix/wiki/How-To-Use
AsynchronousService interface
I created this interface for an abstraction of asynchronous service. Currently, we use Hystrix to implement, however, we may be able to use another technology to do so. At that time, we only need to declare another implementation as our AsynchronousService bean, we can leave those consumer(s) unchanged.
In this interface, we have two abstract method which are need to be implemented.
public List<U> asynchronousCreatePost(List<T> postRequest, Function<T, U> fallBackMethod) throws InterruptedException, ExecutionException;
This method is to do an asynchronous execution. Input list of T and output list of U.
public List<Observable<U>> reactiveCreatePost(List<T> postRequest, Function<T, U> fallBackMethod) throws InterruptedException, ExecutionException;
This method to do a reactive execution. Input list of T and output list of Observable of U. They can be “cold” observable or “hot” observable, depends on our real implementation later.
PostAsynchronousService
This is implementation for our abstract AsynchronousService interface. In this class, we use Hystrix command as our main execution.
I created an inner class HystrixAsynchronousCommand which extends HystrixCommand abstract class. This class will call the Feign client POST method.
This inner class will override run(…) method of HystrixCommand abstract class. In run(…) method, we simply use our Feign client to execute its POST method.
@Override
protected PostModel run() throws Exception{
log.info("HystrixAsynchronousCommand: calling create post connector...");
return postClient.createPost(postRequest);
}
Additionally, we also can override getFallback() method to return or execute something in case our run(…) method occurs exception.
In this demo, we simply recall the Function interface which is provided by the consumer class.
The fallback method simply save a error message into PostModel request.
@Override
protected PostModel getFallback() {
log.info("HystrixAsynchronousCommand: calling create post connector fail, using fallback...");
return fallBackMethod.apply(postRequest);
}
Asynchronous execution
For executing asynchronous with Hystrix, we call queue() method of Hystrix command, this method returns a Future result.
Whenever, we call Future.get(), it will execute HystrixCommand in another thread other than main thread.
In our demo, we have asynchronousCreatePost(…) method that will execute Hystrix command asynchronously.
We will build a list of Future of create PostModel
List<Future<PostModel>> futureCreateResult = postRequests.stream(). .map(request -> new HystrixAsynchronousCommand(request, fallBackMethod).queue()).collect(Collectors.toList());
After that, we will call Future get() for each Future result in parallel stream
futureCreateResult.parallelStream().forEach(futureResult -> {
try {
results.add(futureResult.get());
} catch (Exception e) {
log.error("Error execute HystrixAsynchronousCommand...", e);
}
});
Future result will be added into a concurrent array list. After all Future result (Hystrix command) are executed done, we store the result into that concurrent array list.
In case a Hystrix command executes unsuccessfully, its getFallBack() method will be called then executing the implementation inside this method.
Reactive execution
For reactive execution with Hystrix, we call observable() or toObservable() to return an observable result.
observable(): return “HOT” observable result. The command is executed immediately and result is emitted.
toObservable(): return “COLD” observable result. The command is executed when being subscribed and result is emitted.
Here is an article to explain difference between them, easily understand: https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339
In this demo, I return “cold” observable, because “the producer” is created outside HystrixCommand, for every execution, we are gonna reuse this “producer”. Additionally, I want the HystrixCommand be executed only when it is subscribed.
We have a list of “cold” observable
@Override
public List<Observable<PostModel>> reactiveCreatePost(List<PostModel> postRequests, Function<PostModel, PostModel> fallBackMethod) throws InterruptedException, ExecutionException {
return postRequests.stream().map(request -> new HystrixAsynchronousCommand(request, fallBackMethod).toObservable()).collect(Collectors.toList());
}
For testing purpose in unit test, because we really want the result(s) from Feign client PostClient, for asserting the implementation, so we convert those observable into BlockingObservable to block the current thread, wait until all observable instances are completed or error.
List<PostModel> resultList = observaleResultList.parallelStream()
.map(observable -> observable.toBlocking().single())
.collect(Collectors.toList());
In real project, for non-blocking execution, we can subscribe to those observable instances with Observer instance.
List<PostModel> results = new CopyOnWriteArrayList<>();Action1<PostModel> createObserver = new Action1<PostModel>() {
@Override
public void call(PostModel t) {
results.add(t);
}
};observaleResultList.parallelStream().forEach(observable -> observable.subscribe(createObserver));Thread.currentThread().sleep(2000);assertFalse(results.isEmpty());
In example above, we subscribe createObserver with each observable instance and in createObserver action, we store emitted result to a concurrent array list for retrieving later.
We have to make current thread sleep for few seconds, in order to wait for those observable instance complete or error, then send notifications to our createObserver instance. This action is non-blocking, that’s why we have to manually block the main thread.