From c4477aca3885c69a1a3bab98fa88fdd2ff57455c Mon Sep 17 00:00:00 2001 From: Thibaud Date: Sat, 19 Oct 2019 17:48:53 +0200 Subject: [PATCH] messaging: add a simple message broker Add a simple message broker which uses the chain of responsibility design pattern for message handling. Inspired by this article: http://www.softwarematters.org/message-broker.html#java-implementation In this design, a message handler can contain a reference to the next message handler. If the current handler cannot handle the message given to him by the MessageBroker, he passes this message to the next MessageHandler. The chain of responsibility pattern is used in order to decouple the creation of a concrete message instance from the broker communication logic. This adheres to the Open Closed Principle, because it is possible to add a new message type without modifying the message broker. --- messaging/pom.xml | 36 ++++++++ .../messagebroker/BaseMessageHandler.java | 39 ++++++++ .../java/messagebroker/MessageBroker.java | 88 +++++++++++++++++++ .../java/messagebroker/MessageHandler.java | 6 ++ .../messagebroker/MessageHandlerContext.java | 29 ++++++ .../messagebroker/MessageHandlerFactory.java | 46 ++++++++++ .../UnknownMessageFormatException.java | 7 ++ .../handlers/AddHandlerMessageHandler.java | 40 +++++++++ .../handlers/EchoMessageHandler.java | 31 +++++++ .../messagebroker/handlers/NewHandler.java | 19 ++++ .../AddHandlerMessageHandlerTest.java | 34 +++++++ pom.xml | 1 + 12 files changed, 376 insertions(+) create mode 100644 messaging/pom.xml create mode 100644 messaging/src/main/java/messagebroker/BaseMessageHandler.java create mode 100644 messaging/src/main/java/messagebroker/MessageBroker.java create mode 100644 messaging/src/main/java/messagebroker/MessageHandler.java create mode 100644 messaging/src/main/java/messagebroker/MessageHandlerContext.java create mode 100644 messaging/src/main/java/messagebroker/MessageHandlerFactory.java create mode 100644 messaging/src/main/java/messagebroker/UnknownMessageFormatException.java create mode 100644 messaging/src/main/java/messagebroker/handlers/AddHandlerMessageHandler.java create mode 100644 messaging/src/main/java/messagebroker/handlers/EchoMessageHandler.java create mode 100644 messaging/src/main/java/messagebroker/handlers/NewHandler.java create mode 100644 messaging/src/test/java/messagebroker/handlers/AddHandlerMessageHandlerTest.java diff --git a/messaging/pom.xml b/messaging/pom.xml new file mode 100644 index 0000000..301841a --- /dev/null +++ b/messaging/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + + fr.gasser + java-cookbook + 1.0-SNAPSHOT + + messaging + + + 2.0.0.0 + 3.1.0 + + + + + org.hamcrest + hamcrest-junit + ${hamcrest-junit.version} + test + + + + org.mockito + mockito-core + ${mockito-mockito-core.version} + test + + + + + \ No newline at end of file diff --git a/messaging/src/main/java/messagebroker/BaseMessageHandler.java b/messaging/src/main/java/messagebroker/BaseMessageHandler.java new file mode 100644 index 0000000..e0e07c9 --- /dev/null +++ b/messaging/src/main/java/messagebroker/BaseMessageHandler.java @@ -0,0 +1,39 @@ +package messagebroker; + +import java.io.PrintWriter; + +public abstract class BaseMessageHandler implements MessageHandler { + + private MessageHandler nextHandler; + private final MessageHandlerContext context; + + protected BaseMessageHandler(MessageHandlerContext context) { + this.context = context; + this.nextHandler = null; + } + + public void handleMessage(String message) throws UnknownMessageFormatException { + if (nextHandler != null) { + nextHandler.handleMessage(message); + } else { + String msg = "Unable to handle message " + message; + throw new UnknownMessageFormatException(msg); + } + } + + @Override + public void setNextHandler(MessageHandler nextHandler) { + this.nextHandler = nextHandler; + } + + protected MessageHandlerContext getContext() { + return this.context; + } + + protected void sendResponse(String response) { + try (PrintWriter writer = new PrintWriter(getContext().getOutputStream())) { + writer.println(response); + writer.flush(); + } + } +} diff --git a/messaging/src/main/java/messagebroker/MessageBroker.java b/messaging/src/main/java/messagebroker/MessageBroker.java new file mode 100644 index 0000000..4897e8c --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageBroker.java @@ -0,0 +1,88 @@ +package messagebroker; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.net.ServerSocket; +import java.net.Socket; + +public class MessageBroker { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageBroker.class); + + private final ServerSocket serverSocket; + private MessageHandlerFactory messageHandlerFactory; + private final MessageHandlerContext context; + + private MessageBroker(int port) throws IOException { + LOGGER.info("Initializing MessageBroker on port {}", port); + this.serverSocket = new ServerSocket(port); + context = new MessageHandlerContext(this); + this.messageHandlerFactory = new MessageHandlerFactory(context); + } + + private void run() { + boolean running = true; + while (running) { + try { + Socket accept = serverSocket.accept(); + + ClientHandler handler = new ClientHandler(accept); + new Thread(handler).start(); + + } catch (IOException e) { + LOGGER.error("Accept failed", e); + running = false; + } + } + } + + public void addHandler(String handlerName) throws ReflectiveOperationException { + messageHandlerFactory.addHandler(handlerName); + } + + private class ClientHandler implements Runnable { + private final Socket socket; + + ClientHandler(Socket socket) { + this.socket = socket; + } + + @Override + public void run() { + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String message = reader.readLine(); + LOGGER.info("Received message : {}", message); + + var outputStream = socket.getOutputStream(); + context.setOutputStream(outputStream); + + try { + messageHandlerFactory.getHandler().handleMessage(message); + } catch (UnknownMessageFormatException e) { + LOGGER.warn("{}", e.getMessage()); + } + + socket.close(); + } catch (IOException e) { + LOGGER.error("IOException on read"); + } + } + } + + public static void main(String[] args) throws IOException, ReflectiveOperationException { + if (args.length != 1) { + System.out.printf("usage : java %s %n", MessageBroker.class.getName()); + System.exit(0); + } + + int port = Integer.parseInt(args[0]); + MessageBroker broker = new MessageBroker(port); + broker.addHandler("messagebroker.handlers.AddHandler"); + broker.run(); + } +} diff --git a/messaging/src/main/java/messagebroker/MessageHandler.java b/messaging/src/main/java/messagebroker/MessageHandler.java new file mode 100644 index 0000000..1290dab --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageHandler.java @@ -0,0 +1,6 @@ +package messagebroker; + +public interface MessageHandler { + void handleMessage(String message) throws UnknownMessageFormatException; + void setNextHandler(MessageHandler nextHandler); +} diff --git a/messaging/src/main/java/messagebroker/MessageHandlerContext.java b/messaging/src/main/java/messagebroker/MessageHandlerContext.java new file mode 100644 index 0000000..1727546 --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageHandlerContext.java @@ -0,0 +1,29 @@ +package messagebroker; + +import java.io.OutputStream; + +public class MessageHandlerContext { + private final MessageBroker broker; + private OutputStream outputStream; + + MessageHandlerContext(MessageBroker broker) { + this(broker, null); + } + + private MessageHandlerContext(MessageBroker broker, OutputStream outputStream) { + this.broker = broker; + this.outputStream = outputStream; + } + + public MessageBroker getBroker() { + return broker; + } + + public OutputStream getOutputStream() { + return outputStream; + } + + public void setOutputStream(OutputStream outputStream) { + this.outputStream = outputStream; + } +} diff --git a/messaging/src/main/java/messagebroker/MessageHandlerFactory.java b/messaging/src/main/java/messagebroker/MessageHandlerFactory.java new file mode 100644 index 0000000..b42bf50 --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageHandlerFactory.java @@ -0,0 +1,46 @@ +package messagebroker; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.*; + +class MessageHandlerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerFactory.class); + + private final MessageHandlerContext context; + + private MessageHandler firstHandler = null; + private MessageHandler lastHandler = null; + + MessageHandlerFactory(MessageHandlerContext context) { + this.context = context; + } + + void addHandler(String handlerName) + throws ReflectiveOperationException { + Class handlerClass = Class.forName(handlerName) + .asSubclass(MessageHandler.class); + + LOGGER.debug("Class {} found for handler name {}", handlerClass, handlerName); + MessageHandler next = newMessageHandler(handlerClass); + if (firstHandler == null) { + firstHandler = next; + lastHandler = firstHandler; + } else { + lastHandler.setNextHandler(next); + lastHandler = next; + } + } + + MessageHandler getHandler() { + return firstHandler; + } + + private MessageHandler newMessageHandler(Class handlerClass) + throws ReflectiveOperationException { + var ctor = handlerClass.getConstructor(MessageHandlerContext.class); + return ctor.newInstance(context); + } +} diff --git a/messaging/src/main/java/messagebroker/UnknownMessageFormatException.java b/messaging/src/main/java/messagebroker/UnknownMessageFormatException.java new file mode 100644 index 0000000..bcf23a0 --- /dev/null +++ b/messaging/src/main/java/messagebroker/UnknownMessageFormatException.java @@ -0,0 +1,7 @@ +package messagebroker; + +public class UnknownMessageFormatException extends Exception { + public UnknownMessageFormatException(String message) { + super(message); + } +} diff --git a/messaging/src/main/java/messagebroker/handlers/AddHandlerMessageHandler.java b/messaging/src/main/java/messagebroker/handlers/AddHandlerMessageHandler.java new file mode 100644 index 0000000..8734c8b --- /dev/null +++ b/messaging/src/main/java/messagebroker/handlers/AddHandlerMessageHandler.java @@ -0,0 +1,40 @@ +package messagebroker.handlers; + +import messagebroker.BaseMessageHandler; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class AddHandlerMessageHandler extends BaseMessageHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(AddHandlerMessageHandler.class); + private static final Pattern msgPattern = Pattern.compile("CONTROL\\.ADD_HANDLER (\\S+)$"); + + public AddHandlerMessageHandler(MessageHandlerContext context) { + super(context); + } + + @Override + public void handleMessage(String message) throws UnknownMessageFormatException { + Matcher matcher = msgPattern.matcher(message); + if (matcher.matches()) { + String handlerName = matcher.group(1); + LOGGER.info("Parsed handler name {}", handlerName); + try { + getContext().getBroker().addHandler(handlerName); + sendResponse("OK"); + LOGGER.info("Added handler {} to broker.", handlerName); + } catch (ReflectiveOperationException e) { + String msg = "Unable to add handler"; + LOGGER.error(msg + "{}", e.toString(), e); + sendResponse(msg); + } + } else { + super.handleMessage(message); + } + } + +} diff --git a/messaging/src/main/java/messagebroker/handlers/EchoMessageHandler.java b/messaging/src/main/java/messagebroker/handlers/EchoMessageHandler.java new file mode 100644 index 0000000..879d25f --- /dev/null +++ b/messaging/src/main/java/messagebroker/handlers/EchoMessageHandler.java @@ -0,0 +1,31 @@ +package messagebroker.handlers; + +import messagebroker.BaseMessageHandler; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.util.regex.Pattern; + +public class EchoMessageHandler extends BaseMessageHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(EchoMessageHandler.class); + private static final Pattern PATTERN = Pattern.compile("ECHO (.*)$"); + + public EchoMessageHandler(MessageHandlerContext context) { + super(context); + } + + @Override + public void handleMessage(String message) throws UnknownMessageFormatException { + var matcher = PATTERN.matcher(message); + if (matcher.matches()) { + String parsed = matcher.group(1); + LOGGER.info("Parsed message {}", parsed); + sendResponse(parsed); + } else { + super.handleMessage(message); + } + } +} diff --git a/messaging/src/main/java/messagebroker/handlers/NewHandler.java b/messaging/src/main/java/messagebroker/handlers/NewHandler.java new file mode 100644 index 0000000..4844af1 --- /dev/null +++ b/messaging/src/main/java/messagebroker/handlers/NewHandler.java @@ -0,0 +1,19 @@ +package messagebroker.handlers; + +import messagebroker.BaseMessageHandler; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; + +public class NewHandler extends BaseMessageHandler { + private final MessageHandlerContext context; + + public NewHandler(MessageHandlerContext context) { + super(context); + this.context = context; + } + + @Override + public void handleMessage(String message) throws UnknownMessageFormatException { + super.handleMessage(message); + } +} diff --git a/messaging/src/test/java/messagebroker/handlers/AddHandlerMessageHandlerTest.java b/messaging/src/test/java/messagebroker/handlers/AddHandlerMessageHandlerTest.java new file mode 100644 index 0000000..f93381a --- /dev/null +++ b/messaging/src/test/java/messagebroker/handlers/AddHandlerMessageHandlerTest.java @@ -0,0 +1,34 @@ +package messagebroker.handlers; + +import messagebroker.MessageBroker; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class AddHandlerMessageHandlerTest { + + private MessageHandlerContext context = mock(MessageHandlerContext.class); + + public AddHandlerMessageHandlerTest() { + var broker = mock(MessageBroker.class); + when(context.getBroker()).thenReturn(broker); + } + + @Test + public void shouldHandleMessageWithAddHandlerPattern() throws ReflectiveOperationException, UnknownMessageFormatException { + var handlerName = "AnHandler"; + AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context); + handler.handleMessage("CONTROL.ADD_HANDLER " + handlerName); + verify(context.getBroker()).addHandler(handlerName); + } + + @Test(expected = UnknownMessageFormatException.class) + public void shouldThrowWhenMessageIsUnknown() throws ReflectiveOperationException, UnknownMessageFormatException { + var handlerName = "AnHandler"; + AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context); + handler.handleMessage("BAD " + handlerName); + verify(context.getBroker()).addHandler(handlerName); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index a148649..8f935c1 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ async lang persistence + messaging