A very small taste of WebSockets with WebFlux, Reactive MongoDB data on Spring 5

Hello again.

Today post is about a small experiment using Spring 5 to play with WebFux, I've used it to create a small WebSocket controller to "simulate" an e-mail inbox. The idea was to send some dummy text to the backend, persist it on MongoDB and from time to time check for new messages and send it to the clients connected to the WebSocket endpoint.

The Cast

Let's just see an overview of all actors involved in this small experiment, this text probably will be much longer than the code necessary to have everything working with Spring 5.

I'll assume that Spring doesn't need any introduction even because is hard to define it nowadays, if we were back to 2002 ish I would say that is a Ioc container blah blah blah but today, maybe magic for Java?

MongoDB

MongoDB is a free and open-source cross-platform document-oriented database program. Classified as a NoSQL database program, MongoDB uses JSON-like documents with schemata.

MongoDB Reactive Streams Java Driver

MongoDB Reactive Streams Java Driver, providing asynchronous stream processing with non-blocking back pressure for MongoDB.

Project Reactor

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactive Extensions inspired API and efficient event streaming support.

The reactive design pattern is an event-based architecture for asynchronous handling of a large volume of concurrent service requests coming from single or multiple service handlers.

And the Spring Reactor project is based on this pattern and has the clear and ambitious goal of building asynchronous, reactive applications on the JVM.

WebFlux

Spring WebFlux is an asynchronous framework from the bottom up. It can run on Servlet Containers using the Servlet 3.1 non-blocking IO API as well as other async runtime environments such as netty or undertow.

WebSocket

WebSocket is a computer communications protocol, providing full-duplex communication channels over a single TCP connection.

The WebSocket protocol enables interaction between a web client (such as a browser) and a web server with lower overheads, facilitating real-time data transfer from and to the server. This is made possible by providing a standardized way for the server to send content to the client without being first requested by the client, and allowing messages to be passed back and forth while keeping the connection open. In this way, a two-way ongoing conversation can take place between the client and the server.

The actual code

pom.xml

Let's start with the pom.xml file, this way you'll have what it takes to play by yourself with your WebSockets. Let's just check the most important parts.

The parent pom or the BOM file used to predefine the dependencies versions, this way you can add your dependencies without having to figure out about version compatibility between the components.

BOM stands for Bill Of Materials. A BOM is a special kind of POM that is used to control the versions of a project’s dependencies and provide a central place to define and update those versions.

BOM provides the flexibility to add a dependency to our module without worrying about the version that we should depend on.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.4.RELEASE</version>
</parent>

The next part is the dependencies I've used, observe that except for the most specific dependencies I didn't configure any version, it's all done by the parent pom.

<dependencies>
    <!-- Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- Tooling -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
    </dependency>

    <!-- Websocket -->
    <dependency>
        <groupId>org.webjars</groupId>
        <artifactId>webjars-locator-core</artifactId>
    </dependency>

    <!-- Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.easytesting</groupId>
        <artifactId>fest-assert</artifactId>
        <version>1.4</version>
        <scope>test</scope>
    </dependency>
</dependencies>

I won't add the whole pom here because if you want, you can generate your own one, certainly updated doesn't matter when you end up here from the SPRING INITIALIZR page. Pick any functionality you need to implement your project and it will just generate a skeleton for you.

application.properties

This one is a nice to have, especially if you're going to use database connections, you can place your configurations here instead of having them hardcoded and spread through your classes. In case you need more configuration and you don't know exactly what property you need, you can check the Common application properties.

To play today, what I needed was just some port definition, logging, and MongoDB URI connection.

server.port=8080

#logging
logging.level.org.springframework.data=debug
logging.level.=debug

#mongodb
spring.data.mongodb.uri=mongodb://r640:27017/chat

WebSocket configurations

The code below is all you need to have your initial WebSocket configuration, I saw some more complex ones essentially doing the exact same thing but I suppose that they were from previous versions of Spring, to play on Spring 5, that's all you need to start.

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

   @Override
   public void configureMessageBroker(MessageBrokerRegistry config) {
      config.enableSimpleBroker("/topic");
      config.setApplicationDestinationPrefixes("/app");
   }

   @Override
   public void registerStompEndpoints(StompEndpointRegistry registry) {
      registry.addEndpoint("/email");
      registry.addEndpoint("/email").withSockJS();
      registry.addEndpoint("/emails");
      registry.addEndpoint("/emails").withSockJS();
   }
}

The interesting points here are first the @EnableWebSocketMessageBroker annotation, here you're basically telling Spring that you want to provide WebSockets capabilities on your application, config.enableSimpleBroker("/topic") tell spring that anything that starts with /topic is a WebSocket endpoint where clients will be able to subscribe to, config.setApplicationDestinationPrefixes("/app") again gives Spring some information, now it tells that clients will send data to your application through any endpoint starting with /app, registry.addEndpoint("/email") here you're telling Spring about the endpoints you want to use the STOMP protocol, notice that we're overriding registerStompEndpoints these endpoints will probably match some @MessageMapping on your controller.

Model

Here we're basically providing the definitions of how the MongoDB collection will look like, notice @Document and @Id annotations, they are basically telling Spring that this class is about entries into a collection called email identified by the id field, if you want the collection to have a different name you could define your document like that @Document(collection = "another_name")

@Data
@Builder
@Document
public class Email {
   @Id
   private String id;
   private String content;
   private final Date sentAt = new Date();
}

Repository

This part is shocking, if you don't need any special from your repository, the only thing you need is an interface extending ReactiveMongoRepository, basic operations are provided out of the box.

To get just the newest emails since the last verification all that I had to do was to provide an abstract method with a @Query annotation, anything else is handled by Spring, what a wonderful world we're living these days :)

@Repository
public interface EmailRepository extends ReactiveMongoRepository {
   @Query("{ 'sentAt' : { $gte : ?0 }}")
   Flux findLastOnes(Date lastExecution);
}

The controller

Please ignore the private Date lastExecution = new Date() over there, by default Spring creates singleton beans so we're safe here and please, stop thinking about a service layer if you're doing it, we don't need it here too.

@Controller
@EnableScheduling
public class EmailController {

   @Autowired
   private EmailRepository repository;

   @Autowired
   private SimpMessagingTemplate template;

   private Date lastExecution = new Date();

   @MessageMapping("/email")
   public void email(final Email message) {
      repository.save(message).block();
   }

   @MessageMapping("/emails")
   public void emails() {
      repository.findAll().subscribe(new EmailSubscriber<>());
   }

   @Scheduled(fixedRate = 10000)
   public void updateClients() {
      repository.findLastOnes(lastExecution).subscribe(new EmailSubscriber<>());
   }

   private class EmailSubscriber extends BaseSubscriber {
      @Override
      protected void hookOnComplete() {
         lastExecution = new Date();
      }

      @Override
      protected void hookOnError(Throwable throwable) {
         template.convertAndSend("/topic/email/errors", throwable.getMessage());
      }

      @Override
      protected void hookOnNext(T value) {
         final Email email = (Email) value;
         email.setContent(HtmlUtils.htmlEscape(email.getContent()));
         template.convertAndSend("/topic/email/updates", email);
      }
   }
}

Now, the interesting parts here, the @MessageMapping annotated methods are the ones that clients will send requests too, just remember that from the client side, they need to hit for example /app/email instead of just /email, to you remember WebSocketConfig, config.setApplicationDestinationPrefixes("/app")?

To simulate that small delay from the point that someone sends us an email and the point that we receive it, I've configured an schedule that runs every 10 seconds, take a look at updateClients. Here some interesting things are going on, first we're asking the repository to give us the new messages since the last verification but instead of wait for the answer, we've subscribed to be notified when something comes back, this way we're not blocking the execution, yes, this is about to not block the execution and push content to the connected clients when and just when we have something to push.

subsctibe is a method provided by Flux that in the Reactor world represents a Reactive stream of size from 0 to N, for 0 or single result Mono is the representation. Ok, we have some results, what happens now? The Subscriber hookOnNext method will be called for each of the found results, here we can do some heavy work like escape characters. Because we need at least one method with more than one line right? From this point just sent it to the proper topic and the subscribed clients will be updated.

Testing

Testing it was simple too since with a simple @SpringBootTest annotation the whole stack was available, I just had to do some initial setup to have an integration test up and running.

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class EmailControllerTest {

   private static final String SUBSCRIBE_TOPIC_ENDPOINT = "/topic/email/updates";
   private static final String SEND_EMAIL_ENDPOINT = "/app/email";

   @Autowired
   private EmailRepository repository;

   @Autowired
   private ReactiveCrudRepository operations;

   @Value("${local.server.port}")
   private int port;
   private String URL;

   private CompletableFuture completableFuture;
   private WebSocketStompClient stompClient;
   private StompSession stompSession;

   @Before
   public void setup() throws InterruptedException, ExecutionException, TimeoutException {
      completableFuture = new CompletableFuture<>();
      URL = String.format("ws://localhost:%d/email", port);

      stompClient = new WebSocketStompClient(new SockJsClient(createTransportClient()));
      stompClient.setMessageConverter(new MappingJackson2MessageConverter());
      stompSession = stompClient.connect(URL, new StompSessionHandlerAdapter() {
      }).get(1, TimeUnit.SECONDS);
   }

   @Test
   public void receiveEmails() throws InterruptedException, ExecutionException, TimeoutException {
      // Adding a dummy email
      final Email email = Email
            .builder()
            .id(UUID.randomUUID().toString())
            .content(RandomStringUtils.randomAlphabetic(10))
            .build();

      // Send the email through webSocket to be persisted
      stompSession.send(SEND_EMAIL_ENDPOINT, email);

      // Subscribing to the notification endpoint as a client
      stompSession.subscribe(SUBSCRIBE_TOPIC_ENDPOINT, new MessageStompFrameHandler());

      // Waiting up to 10s for a notification to match the scheduler
      final Email emailReceived = completableFuture.get(10, TimeUnit.SECONDS);

      // Validating content
      assertThat(emailReceived).isNotNull();
      assertThat(emailReceived.getId()).isEqualTo(email.getId());
      assertThat(emailReceived.getContent()).isEqualTo(email.getContent());
      assertThat(emailReceived.getSentAt()).isEqualTo(email.getSentAt());

      // Removing dummy
      operations.delete(email).block();

      // Verifying if is really gone
      final Email foundEmail = repository.findById(email.getId()).block();
      assertThat(foundEmail).isNull();
   }

   private List createTransportClient() {
      final List transports = new ArrayList<>(1);
      transports.add(new WebSocketTransport(new StandardWebSocketClient()));
      return transports;
   }

   private class MessageStompFrameHandler implements StompFrameHandler {

      @Override
      public Type getPayloadType(StompHeaders stompHeaders) {
         return Email.class;
      }

      @Override
      public void handleFrame(StompHeaders stompHeaders, Object o) {
         completableFuture.complete((Email) o);
      }
   }
}

The client

To play here I've wrote some simple html, js and css files, placed them inside the src/main/resources/static folder and started the project, when hitting localhost:8080 the client get connected automatically and the fun is complete, I'm not proud of them but you can have them index.html, index.js and index.css.

Conclusion

Once again Spring did a very good job to provide ways to use great technology with low effort, with few lines of code we could see a new paradigm in action happening in Java in every single layer and this is not just available for WebSockets related endpoints, you can use basically the same dependencies and approach to see similar results happening for example for rest endpoints.

Hope it gave you a glance of what's possible to achieve with this new Spring version and how much fun you can have playing around with these new toys :)

References

Spring Boot

Spring 5

WebFux

MongoDB

WebSocket

WebSocket Support on Spring 5

Stomp

Project Reactor

Reactive Streams

Spring Initializr

Common Spring application properties

comments powered by Disqus