messaging: add a simple message broker

Add a simple message broker which uses the chain of responsibility design pattern for
message handling.

Inspired by this article: http://www.softwarematters.org/message-broker.html#java-implementation

In this design, a message handler can contain a reference to the next
message handler. If the current handler cannot handle the message given
to him by the MessageBroker, he passes this message to the next
MessageHandler.

The chain of responsibility pattern is used in order to decouple the creation of a
concrete message instance from the broker communication logic. This adheres to the Open
Closed Principle, because it is possible to add a new message type without modifying the
message broker.
This commit is contained in:
Thibaud Gasser 2019-10-19 17:48:53 +02:00
parent eea8074d32
commit c4477aca38
12 changed files with 376 additions and 0 deletions

36
messaging/pom.xml Normal file
View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>fr.gasser</groupId>
<artifactId>java-cookbook</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>messaging</artifactId>
<properties>
<hamcrest-junit.version>2.0.0.0</hamcrest-junit.version>
<mockito-mockito-core.version>3.1.0</mockito-mockito-core.version>
</properties>
<dependencies>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-junit</artifactId>
<version>${hamcrest-junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito-mockito-core.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,39 @@
package messagebroker;
import java.io.PrintWriter;
public abstract class BaseMessageHandler implements MessageHandler {
private MessageHandler nextHandler;
private final MessageHandlerContext context;
protected BaseMessageHandler(MessageHandlerContext context) {
this.context = context;
this.nextHandler = null;
}
public void handleMessage(String message) throws UnknownMessageFormatException {
if (nextHandler != null) {
nextHandler.handleMessage(message);
} else {
String msg = "Unable to handle message " + message;
throw new UnknownMessageFormatException(msg);
}
}
@Override
public void setNextHandler(MessageHandler nextHandler) {
this.nextHandler = nextHandler;
}
protected MessageHandlerContext getContext() {
return this.context;
}
protected void sendResponse(String response) {
try (PrintWriter writer = new PrintWriter(getContext().getOutputStream())) {
writer.println(response);
writer.flush();
}
}
}

View File

@ -0,0 +1,88 @@
package messagebroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.net.ServerSocket;
import java.net.Socket;
public class MessageBroker {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBroker.class);
private final ServerSocket serverSocket;
private MessageHandlerFactory messageHandlerFactory;
private final MessageHandlerContext context;
private MessageBroker(int port) throws IOException {
LOGGER.info("Initializing MessageBroker on port {}", port);
this.serverSocket = new ServerSocket(port);
context = new MessageHandlerContext(this);
this.messageHandlerFactory = new MessageHandlerFactory(context);
}
private void run() {
boolean running = true;
while (running) {
try {
Socket accept = serverSocket.accept();
ClientHandler handler = new ClientHandler(accept);
new Thread(handler).start();
} catch (IOException e) {
LOGGER.error("Accept failed", e);
running = false;
}
}
}
public void addHandler(String handlerName) throws ReflectiveOperationException {
messageHandlerFactory.addHandler(handlerName);
}
private class ClientHandler implements Runnable {
private final Socket socket;
ClientHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String message = reader.readLine();
LOGGER.info("Received message : {}", message);
var outputStream = socket.getOutputStream();
context.setOutputStream(outputStream);
try {
messageHandlerFactory.getHandler().handleMessage(message);
} catch (UnknownMessageFormatException e) {
LOGGER.warn("{}", e.getMessage());
}
socket.close();
} catch (IOException e) {
LOGGER.error("IOException on read");
}
}
}
public static void main(String[] args) throws IOException, ReflectiveOperationException {
if (args.length != 1) {
System.out.printf("usage : java %s <port>%n", MessageBroker.class.getName());
System.exit(0);
}
int port = Integer.parseInt(args[0]);
MessageBroker broker = new MessageBroker(port);
broker.addHandler("messagebroker.handlers.AddHandler");
broker.run();
}
}

View File

@ -0,0 +1,6 @@
package messagebroker;
public interface MessageHandler {
void handleMessage(String message) throws UnknownMessageFormatException;
void setNextHandler(MessageHandler nextHandler);
}

View File

@ -0,0 +1,29 @@
package messagebroker;
import java.io.OutputStream;
public class MessageHandlerContext {
private final MessageBroker broker;
private OutputStream outputStream;
MessageHandlerContext(MessageBroker broker) {
this(broker, null);
}
private MessageHandlerContext(MessageBroker broker, OutputStream outputStream) {
this.broker = broker;
this.outputStream = outputStream;
}
public MessageBroker getBroker() {
return broker;
}
public OutputStream getOutputStream() {
return outputStream;
}
public void setOutputStream(OutputStream outputStream) {
this.outputStream = outputStream;
}
}

View File

@ -0,0 +1,46 @@
package messagebroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
class MessageHandlerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerFactory.class);
private final MessageHandlerContext context;
private MessageHandler firstHandler = null;
private MessageHandler lastHandler = null;
MessageHandlerFactory(MessageHandlerContext context) {
this.context = context;
}
void addHandler(String handlerName)
throws ReflectiveOperationException {
Class<? extends MessageHandler> handlerClass = Class.forName(handlerName)
.asSubclass(MessageHandler.class);
LOGGER.debug("Class {} found for handler name {}", handlerClass, handlerName);
MessageHandler next = newMessageHandler(handlerClass);
if (firstHandler == null) {
firstHandler = next;
lastHandler = firstHandler;
} else {
lastHandler.setNextHandler(next);
lastHandler = next;
}
}
MessageHandler getHandler() {
return firstHandler;
}
private MessageHandler newMessageHandler(Class<? extends MessageHandler> handlerClass)
throws ReflectiveOperationException {
var ctor = handlerClass.getConstructor(MessageHandlerContext.class);
return ctor.newInstance(context);
}
}

View File

@ -0,0 +1,7 @@
package messagebroker;
public class UnknownMessageFormatException extends Exception {
public UnknownMessageFormatException(String message) {
super(message);
}
}

View File

@ -0,0 +1,40 @@
package messagebroker.handlers;
import messagebroker.BaseMessageHandler;
import messagebroker.MessageHandlerContext;
import messagebroker.UnknownMessageFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class AddHandlerMessageHandler extends BaseMessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(AddHandlerMessageHandler.class);
private static final Pattern msgPattern = Pattern.compile("CONTROL\\.ADD_HANDLER (\\S+)$");
public AddHandlerMessageHandler(MessageHandlerContext context) {
super(context);
}
@Override
public void handleMessage(String message) throws UnknownMessageFormatException {
Matcher matcher = msgPattern.matcher(message);
if (matcher.matches()) {
String handlerName = matcher.group(1);
LOGGER.info("Parsed handler name {}", handlerName);
try {
getContext().getBroker().addHandler(handlerName);
sendResponse("OK");
LOGGER.info("Added handler {} to broker.", handlerName);
} catch (ReflectiveOperationException e) {
String msg = "Unable to add handler";
LOGGER.error(msg + "{}", e.toString(), e);
sendResponse(msg);
}
} else {
super.handleMessage(message);
}
}
}

View File

@ -0,0 +1,31 @@
package messagebroker.handlers;
import messagebroker.BaseMessageHandler;
import messagebroker.MessageHandlerContext;
import messagebroker.UnknownMessageFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.util.regex.Pattern;
public class EchoMessageHandler extends BaseMessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(EchoMessageHandler.class);
private static final Pattern PATTERN = Pattern.compile("ECHO (.*)$");
public EchoMessageHandler(MessageHandlerContext context) {
super(context);
}
@Override
public void handleMessage(String message) throws UnknownMessageFormatException {
var matcher = PATTERN.matcher(message);
if (matcher.matches()) {
String parsed = matcher.group(1);
LOGGER.info("Parsed message {}", parsed);
sendResponse(parsed);
} else {
super.handleMessage(message);
}
}
}

View File

@ -0,0 +1,19 @@
package messagebroker.handlers;
import messagebroker.BaseMessageHandler;
import messagebroker.MessageHandlerContext;
import messagebroker.UnknownMessageFormatException;
public class NewHandler extends BaseMessageHandler {
private final MessageHandlerContext context;
public NewHandler(MessageHandlerContext context) {
super(context);
this.context = context;
}
@Override
public void handleMessage(String message) throws UnknownMessageFormatException {
super.handleMessage(message);
}
}

View File

@ -0,0 +1,34 @@
package messagebroker.handlers;
import messagebroker.MessageBroker;
import messagebroker.MessageHandlerContext;
import messagebroker.UnknownMessageFormatException;
import org.junit.Test;
import static org.mockito.Mockito.*;
public class AddHandlerMessageHandlerTest {
private MessageHandlerContext context = mock(MessageHandlerContext.class);
public AddHandlerMessageHandlerTest() {
var broker = mock(MessageBroker.class);
when(context.getBroker()).thenReturn(broker);
}
@Test
public void shouldHandleMessageWithAddHandlerPattern() throws ReflectiveOperationException, UnknownMessageFormatException {
var handlerName = "AnHandler";
AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context);
handler.handleMessage("CONTROL.ADD_HANDLER " + handlerName);
verify(context.getBroker()).addHandler(handlerName);
}
@Test(expected = UnknownMessageFormatException.class)
public void shouldThrowWhenMessageIsUnknown() throws ReflectiveOperationException, UnknownMessageFormatException {
var handlerName = "AnHandler";
AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context);
handler.handleMessage("BAD " + handlerName);
verify(context.getBroker()).addHandler(handlerName);
}
}

View File

@ -12,6 +12,7 @@
<module>async</module>
<module>lang</module>
<module>persistence</module>
<module>messaging</module>
</modules>
<properties>