messaging: add a simple message broker
Add a simple message broker which uses the chain of responsibility design pattern for message handling. Inspired by this article: http://www.softwarematters.org/message-broker.html#java-implementation In this design, a message handler can contain a reference to the next message handler. If the current handler cannot handle the message given to him by the MessageBroker, he passes this message to the next MessageHandler. The chain of responsibility pattern is used in order to decouple the creation of a concrete message instance from the broker communication logic. This adheres to the Open Closed Principle, because it is possible to add a new message type without modifying the message broker.
This commit is contained in:
parent
eea8074d32
commit
c4477aca38
36
messaging/pom.xml
Normal file
36
messaging/pom.xml
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>fr.gasser</groupId>
|
||||||
|
<artifactId>java-cookbook</artifactId>
|
||||||
|
<version>1.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<artifactId>messaging</artifactId>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<hamcrest-junit.version>2.0.0.0</hamcrest-junit.version>
|
||||||
|
<mockito-mockito-core.version>3.1.0</mockito-mockito-core.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.hamcrest</groupId>
|
||||||
|
<artifactId>hamcrest-junit</artifactId>
|
||||||
|
<version>${hamcrest-junit.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-core</artifactId>
|
||||||
|
<version>${mockito-mockito-core.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
</project>
|
@ -0,0 +1,39 @@
|
|||||||
|
package messagebroker;
|
||||||
|
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
|
||||||
|
public abstract class BaseMessageHandler implements MessageHandler {
|
||||||
|
|
||||||
|
private MessageHandler nextHandler;
|
||||||
|
private final MessageHandlerContext context;
|
||||||
|
|
||||||
|
protected BaseMessageHandler(MessageHandlerContext context) {
|
||||||
|
this.context = context;
|
||||||
|
this.nextHandler = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleMessage(String message) throws UnknownMessageFormatException {
|
||||||
|
if (nextHandler != null) {
|
||||||
|
nextHandler.handleMessage(message);
|
||||||
|
} else {
|
||||||
|
String msg = "Unable to handle message " + message;
|
||||||
|
throw new UnknownMessageFormatException(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNextHandler(MessageHandler nextHandler) {
|
||||||
|
this.nextHandler = nextHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MessageHandlerContext getContext() {
|
||||||
|
return this.context;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void sendResponse(String response) {
|
||||||
|
try (PrintWriter writer = new PrintWriter(getContext().getOutputStream())) {
|
||||||
|
writer.println(response);
|
||||||
|
writer.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
88
messaging/src/main/java/messagebroker/MessageBroker.java
Normal file
88
messaging/src/main/java/messagebroker/MessageBroker.java
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
package messagebroker;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.net.ServerSocket;
|
||||||
|
import java.net.Socket;
|
||||||
|
|
||||||
|
public class MessageBroker {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBroker.class);
|
||||||
|
|
||||||
|
private final ServerSocket serverSocket;
|
||||||
|
private MessageHandlerFactory messageHandlerFactory;
|
||||||
|
private final MessageHandlerContext context;
|
||||||
|
|
||||||
|
private MessageBroker(int port) throws IOException {
|
||||||
|
LOGGER.info("Initializing MessageBroker on port {}", port);
|
||||||
|
this.serverSocket = new ServerSocket(port);
|
||||||
|
context = new MessageHandlerContext(this);
|
||||||
|
this.messageHandlerFactory = new MessageHandlerFactory(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void run() {
|
||||||
|
boolean running = true;
|
||||||
|
while (running) {
|
||||||
|
try {
|
||||||
|
Socket accept = serverSocket.accept();
|
||||||
|
|
||||||
|
ClientHandler handler = new ClientHandler(accept);
|
||||||
|
new Thread(handler).start();
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOGGER.error("Accept failed", e);
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addHandler(String handlerName) throws ReflectiveOperationException {
|
||||||
|
messageHandlerFactory.addHandler(handlerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ClientHandler implements Runnable {
|
||||||
|
private final Socket socket;
|
||||||
|
|
||||||
|
ClientHandler(Socket socket) {
|
||||||
|
this.socket = socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
||||||
|
String message = reader.readLine();
|
||||||
|
LOGGER.info("Received message : {}", message);
|
||||||
|
|
||||||
|
var outputStream = socket.getOutputStream();
|
||||||
|
context.setOutputStream(outputStream);
|
||||||
|
|
||||||
|
try {
|
||||||
|
messageHandlerFactory.getHandler().handleMessage(message);
|
||||||
|
} catch (UnknownMessageFormatException e) {
|
||||||
|
LOGGER.warn("{}", e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOGGER.error("IOException on read");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException, ReflectiveOperationException {
|
||||||
|
if (args.length != 1) {
|
||||||
|
System.out.printf("usage : java %s <port>%n", MessageBroker.class.getName());
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
int port = Integer.parseInt(args[0]);
|
||||||
|
MessageBroker broker = new MessageBroker(port);
|
||||||
|
broker.addHandler("messagebroker.handlers.AddHandler");
|
||||||
|
broker.run();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,6 @@
|
|||||||
|
package messagebroker;
|
||||||
|
|
||||||
|
public interface MessageHandler {
|
||||||
|
void handleMessage(String message) throws UnknownMessageFormatException;
|
||||||
|
void setNextHandler(MessageHandler nextHandler);
|
||||||
|
}
|
@ -0,0 +1,29 @@
|
|||||||
|
package messagebroker;
|
||||||
|
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
public class MessageHandlerContext {
|
||||||
|
private final MessageBroker broker;
|
||||||
|
private OutputStream outputStream;
|
||||||
|
|
||||||
|
MessageHandlerContext(MessageBroker broker) {
|
||||||
|
this(broker, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MessageHandlerContext(MessageBroker broker, OutputStream outputStream) {
|
||||||
|
this.broker = broker;
|
||||||
|
this.outputStream = outputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageBroker getBroker() {
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public OutputStream getOutputStream() {
|
||||||
|
return outputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOutputStream(OutputStream outputStream) {
|
||||||
|
this.outputStream = outputStream;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,46 @@
|
|||||||
|
package messagebroker;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
class MessageHandlerFactory {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerFactory.class);
|
||||||
|
|
||||||
|
private final MessageHandlerContext context;
|
||||||
|
|
||||||
|
private MessageHandler firstHandler = null;
|
||||||
|
private MessageHandler lastHandler = null;
|
||||||
|
|
||||||
|
MessageHandlerFactory(MessageHandlerContext context) {
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
void addHandler(String handlerName)
|
||||||
|
throws ReflectiveOperationException {
|
||||||
|
Class<? extends MessageHandler> handlerClass = Class.forName(handlerName)
|
||||||
|
.asSubclass(MessageHandler.class);
|
||||||
|
|
||||||
|
LOGGER.debug("Class {} found for handler name {}", handlerClass, handlerName);
|
||||||
|
MessageHandler next = newMessageHandler(handlerClass);
|
||||||
|
if (firstHandler == null) {
|
||||||
|
firstHandler = next;
|
||||||
|
lastHandler = firstHandler;
|
||||||
|
} else {
|
||||||
|
lastHandler.setNextHandler(next);
|
||||||
|
lastHandler = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MessageHandler getHandler() {
|
||||||
|
return firstHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MessageHandler newMessageHandler(Class<? extends MessageHandler> handlerClass)
|
||||||
|
throws ReflectiveOperationException {
|
||||||
|
var ctor = handlerClass.getConstructor(MessageHandlerContext.class);
|
||||||
|
return ctor.newInstance(context);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,7 @@
|
|||||||
|
package messagebroker;
|
||||||
|
|
||||||
|
public class UnknownMessageFormatException extends Exception {
|
||||||
|
public UnknownMessageFormatException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,40 @@
|
|||||||
|
package messagebroker.handlers;
|
||||||
|
|
||||||
|
import messagebroker.BaseMessageHandler;
|
||||||
|
import messagebroker.MessageHandlerContext;
|
||||||
|
import messagebroker.UnknownMessageFormatException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
public class AddHandlerMessageHandler extends BaseMessageHandler {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(AddHandlerMessageHandler.class);
|
||||||
|
private static final Pattern msgPattern = Pattern.compile("CONTROL\\.ADD_HANDLER (\\S+)$");
|
||||||
|
|
||||||
|
public AddHandlerMessageHandler(MessageHandlerContext context) {
|
||||||
|
super(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(String message) throws UnknownMessageFormatException {
|
||||||
|
Matcher matcher = msgPattern.matcher(message);
|
||||||
|
if (matcher.matches()) {
|
||||||
|
String handlerName = matcher.group(1);
|
||||||
|
LOGGER.info("Parsed handler name {}", handlerName);
|
||||||
|
try {
|
||||||
|
getContext().getBroker().addHandler(handlerName);
|
||||||
|
sendResponse("OK");
|
||||||
|
LOGGER.info("Added handler {} to broker.", handlerName);
|
||||||
|
} catch (ReflectiveOperationException e) {
|
||||||
|
String msg = "Unable to add handler";
|
||||||
|
LOGGER.error(msg + "{}", e.toString(), e);
|
||||||
|
sendResponse(msg);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
super.handleMessage(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
package messagebroker.handlers;
|
||||||
|
|
||||||
|
import messagebroker.BaseMessageHandler;
|
||||||
|
import messagebroker.MessageHandlerContext;
|
||||||
|
import messagebroker.UnknownMessageFormatException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.PrintWriter;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
public class EchoMessageHandler extends BaseMessageHandler {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(EchoMessageHandler.class);
|
||||||
|
private static final Pattern PATTERN = Pattern.compile("ECHO (.*)$");
|
||||||
|
|
||||||
|
public EchoMessageHandler(MessageHandlerContext context) {
|
||||||
|
super(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(String message) throws UnknownMessageFormatException {
|
||||||
|
var matcher = PATTERN.matcher(message);
|
||||||
|
if (matcher.matches()) {
|
||||||
|
String parsed = matcher.group(1);
|
||||||
|
LOGGER.info("Parsed message {}", parsed);
|
||||||
|
sendResponse(parsed);
|
||||||
|
} else {
|
||||||
|
super.handleMessage(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
package messagebroker.handlers;
|
||||||
|
|
||||||
|
import messagebroker.BaseMessageHandler;
|
||||||
|
import messagebroker.MessageHandlerContext;
|
||||||
|
import messagebroker.UnknownMessageFormatException;
|
||||||
|
|
||||||
|
public class NewHandler extends BaseMessageHandler {
|
||||||
|
private final MessageHandlerContext context;
|
||||||
|
|
||||||
|
public NewHandler(MessageHandlerContext context) {
|
||||||
|
super(context);
|
||||||
|
this.context = context;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleMessage(String message) throws UnknownMessageFormatException {
|
||||||
|
super.handleMessage(message);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package messagebroker.handlers;
|
||||||
|
|
||||||
|
import messagebroker.MessageBroker;
|
||||||
|
import messagebroker.MessageHandlerContext;
|
||||||
|
import messagebroker.UnknownMessageFormatException;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
|
public class AddHandlerMessageHandlerTest {
|
||||||
|
|
||||||
|
private MessageHandlerContext context = mock(MessageHandlerContext.class);
|
||||||
|
|
||||||
|
public AddHandlerMessageHandlerTest() {
|
||||||
|
var broker = mock(MessageBroker.class);
|
||||||
|
when(context.getBroker()).thenReturn(broker);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldHandleMessageWithAddHandlerPattern() throws ReflectiveOperationException, UnknownMessageFormatException {
|
||||||
|
var handlerName = "AnHandler";
|
||||||
|
AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context);
|
||||||
|
handler.handleMessage("CONTROL.ADD_HANDLER " + handlerName);
|
||||||
|
verify(context.getBroker()).addHandler(handlerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = UnknownMessageFormatException.class)
|
||||||
|
public void shouldThrowWhenMessageIsUnknown() throws ReflectiveOperationException, UnknownMessageFormatException {
|
||||||
|
var handlerName = "AnHandler";
|
||||||
|
AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context);
|
||||||
|
handler.handleMessage("BAD " + handlerName);
|
||||||
|
verify(context.getBroker()).addHandler(handlerName);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user