diff --git a/messaging/pom.xml b/messaging/pom.xml new file mode 100644 index 0000000..301841a --- /dev/null +++ b/messaging/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + + fr.gasser + java-cookbook + 1.0-SNAPSHOT + + messaging + + + 2.0.0.0 + 3.1.0 + + + + + org.hamcrest + hamcrest-junit + ${hamcrest-junit.version} + test + + + + org.mockito + mockito-core + ${mockito-mockito-core.version} + test + + + + + \ No newline at end of file diff --git a/messaging/src/main/java/messagebroker/BaseMessageHandler.java b/messaging/src/main/java/messagebroker/BaseMessageHandler.java new file mode 100644 index 0000000..e0e07c9 --- /dev/null +++ b/messaging/src/main/java/messagebroker/BaseMessageHandler.java @@ -0,0 +1,39 @@ +package messagebroker; + +import java.io.PrintWriter; + +public abstract class BaseMessageHandler implements MessageHandler { + + private MessageHandler nextHandler; + private final MessageHandlerContext context; + + protected BaseMessageHandler(MessageHandlerContext context) { + this.context = context; + this.nextHandler = null; + } + + public void handleMessage(String message) throws UnknownMessageFormatException { + if (nextHandler != null) { + nextHandler.handleMessage(message); + } else { + String msg = "Unable to handle message " + message; + throw new UnknownMessageFormatException(msg); + } + } + + @Override + public void setNextHandler(MessageHandler nextHandler) { + this.nextHandler = nextHandler; + } + + protected MessageHandlerContext getContext() { + return this.context; + } + + protected void sendResponse(String response) { + try (PrintWriter writer = new PrintWriter(getContext().getOutputStream())) { + writer.println(response); + writer.flush(); + } + } +} diff --git a/messaging/src/main/java/messagebroker/MessageBroker.java b/messaging/src/main/java/messagebroker/MessageBroker.java new file mode 100644 index 0000000..4897e8c --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageBroker.java @@ -0,0 +1,88 @@ +package messagebroker; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.net.ServerSocket; +import java.net.Socket; + +public class MessageBroker { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageBroker.class); + + private final ServerSocket serverSocket; + private MessageHandlerFactory messageHandlerFactory; + private final MessageHandlerContext context; + + private MessageBroker(int port) throws IOException { + LOGGER.info("Initializing MessageBroker on port {}", port); + this.serverSocket = new ServerSocket(port); + context = new MessageHandlerContext(this); + this.messageHandlerFactory = new MessageHandlerFactory(context); + } + + private void run() { + boolean running = true; + while (running) { + try { + Socket accept = serverSocket.accept(); + + ClientHandler handler = new ClientHandler(accept); + new Thread(handler).start(); + + } catch (IOException e) { + LOGGER.error("Accept failed", e); + running = false; + } + } + } + + public void addHandler(String handlerName) throws ReflectiveOperationException { + messageHandlerFactory.addHandler(handlerName); + } + + private class ClientHandler implements Runnable { + private final Socket socket; + + ClientHandler(Socket socket) { + this.socket = socket; + } + + @Override + public void run() { + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String message = reader.readLine(); + LOGGER.info("Received message : {}", message); + + var outputStream = socket.getOutputStream(); + context.setOutputStream(outputStream); + + try { + messageHandlerFactory.getHandler().handleMessage(message); + } catch (UnknownMessageFormatException e) { + LOGGER.warn("{}", e.getMessage()); + } + + socket.close(); + } catch (IOException e) { + LOGGER.error("IOException on read"); + } + } + } + + public static void main(String[] args) throws IOException, ReflectiveOperationException { + if (args.length != 1) { + System.out.printf("usage : java %s %n", MessageBroker.class.getName()); + System.exit(0); + } + + int port = Integer.parseInt(args[0]); + MessageBroker broker = new MessageBroker(port); + broker.addHandler("messagebroker.handlers.AddHandler"); + broker.run(); + } +} diff --git a/messaging/src/main/java/messagebroker/MessageHandler.java b/messaging/src/main/java/messagebroker/MessageHandler.java new file mode 100644 index 0000000..1290dab --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageHandler.java @@ -0,0 +1,6 @@ +package messagebroker; + +public interface MessageHandler { + void handleMessage(String message) throws UnknownMessageFormatException; + void setNextHandler(MessageHandler nextHandler); +} diff --git a/messaging/src/main/java/messagebroker/MessageHandlerContext.java b/messaging/src/main/java/messagebroker/MessageHandlerContext.java new file mode 100644 index 0000000..1727546 --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageHandlerContext.java @@ -0,0 +1,29 @@ +package messagebroker; + +import java.io.OutputStream; + +public class MessageHandlerContext { + private final MessageBroker broker; + private OutputStream outputStream; + + MessageHandlerContext(MessageBroker broker) { + this(broker, null); + } + + private MessageHandlerContext(MessageBroker broker, OutputStream outputStream) { + this.broker = broker; + this.outputStream = outputStream; + } + + public MessageBroker getBroker() { + return broker; + } + + public OutputStream getOutputStream() { + return outputStream; + } + + public void setOutputStream(OutputStream outputStream) { + this.outputStream = outputStream; + } +} diff --git a/messaging/src/main/java/messagebroker/MessageHandlerFactory.java b/messaging/src/main/java/messagebroker/MessageHandlerFactory.java new file mode 100644 index 0000000..b42bf50 --- /dev/null +++ b/messaging/src/main/java/messagebroker/MessageHandlerFactory.java @@ -0,0 +1,46 @@ +package messagebroker; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.*; + +class MessageHandlerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerFactory.class); + + private final MessageHandlerContext context; + + private MessageHandler firstHandler = null; + private MessageHandler lastHandler = null; + + MessageHandlerFactory(MessageHandlerContext context) { + this.context = context; + } + + void addHandler(String handlerName) + throws ReflectiveOperationException { + Class handlerClass = Class.forName(handlerName) + .asSubclass(MessageHandler.class); + + LOGGER.debug("Class {} found for handler name {}", handlerClass, handlerName); + MessageHandler next = newMessageHandler(handlerClass); + if (firstHandler == null) { + firstHandler = next; + lastHandler = firstHandler; + } else { + lastHandler.setNextHandler(next); + lastHandler = next; + } + } + + MessageHandler getHandler() { + return firstHandler; + } + + private MessageHandler newMessageHandler(Class handlerClass) + throws ReflectiveOperationException { + var ctor = handlerClass.getConstructor(MessageHandlerContext.class); + return ctor.newInstance(context); + } +} diff --git a/messaging/src/main/java/messagebroker/UnknownMessageFormatException.java b/messaging/src/main/java/messagebroker/UnknownMessageFormatException.java new file mode 100644 index 0000000..bcf23a0 --- /dev/null +++ b/messaging/src/main/java/messagebroker/UnknownMessageFormatException.java @@ -0,0 +1,7 @@ +package messagebroker; + +public class UnknownMessageFormatException extends Exception { + public UnknownMessageFormatException(String message) { + super(message); + } +} diff --git a/messaging/src/main/java/messagebroker/handlers/AddHandlerMessageHandler.java b/messaging/src/main/java/messagebroker/handlers/AddHandlerMessageHandler.java new file mode 100644 index 0000000..8734c8b --- /dev/null +++ b/messaging/src/main/java/messagebroker/handlers/AddHandlerMessageHandler.java @@ -0,0 +1,40 @@ +package messagebroker.handlers; + +import messagebroker.BaseMessageHandler; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class AddHandlerMessageHandler extends BaseMessageHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(AddHandlerMessageHandler.class); + private static final Pattern msgPattern = Pattern.compile("CONTROL\\.ADD_HANDLER (\\S+)$"); + + public AddHandlerMessageHandler(MessageHandlerContext context) { + super(context); + } + + @Override + public void handleMessage(String message) throws UnknownMessageFormatException { + Matcher matcher = msgPattern.matcher(message); + if (matcher.matches()) { + String handlerName = matcher.group(1); + LOGGER.info("Parsed handler name {}", handlerName); + try { + getContext().getBroker().addHandler(handlerName); + sendResponse("OK"); + LOGGER.info("Added handler {} to broker.", handlerName); + } catch (ReflectiveOperationException e) { + String msg = "Unable to add handler"; + LOGGER.error(msg + "{}", e.toString(), e); + sendResponse(msg); + } + } else { + super.handleMessage(message); + } + } + +} diff --git a/messaging/src/main/java/messagebroker/handlers/EchoMessageHandler.java b/messaging/src/main/java/messagebroker/handlers/EchoMessageHandler.java new file mode 100644 index 0000000..879d25f --- /dev/null +++ b/messaging/src/main/java/messagebroker/handlers/EchoMessageHandler.java @@ -0,0 +1,31 @@ +package messagebroker.handlers; + +import messagebroker.BaseMessageHandler; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.util.regex.Pattern; + +public class EchoMessageHandler extends BaseMessageHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(EchoMessageHandler.class); + private static final Pattern PATTERN = Pattern.compile("ECHO (.*)$"); + + public EchoMessageHandler(MessageHandlerContext context) { + super(context); + } + + @Override + public void handleMessage(String message) throws UnknownMessageFormatException { + var matcher = PATTERN.matcher(message); + if (matcher.matches()) { + String parsed = matcher.group(1); + LOGGER.info("Parsed message {}", parsed); + sendResponse(parsed); + } else { + super.handleMessage(message); + } + } +} diff --git a/messaging/src/main/java/messagebroker/handlers/NewHandler.java b/messaging/src/main/java/messagebroker/handlers/NewHandler.java new file mode 100644 index 0000000..4844af1 --- /dev/null +++ b/messaging/src/main/java/messagebroker/handlers/NewHandler.java @@ -0,0 +1,19 @@ +package messagebroker.handlers; + +import messagebroker.BaseMessageHandler; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; + +public class NewHandler extends BaseMessageHandler { + private final MessageHandlerContext context; + + public NewHandler(MessageHandlerContext context) { + super(context); + this.context = context; + } + + @Override + public void handleMessage(String message) throws UnknownMessageFormatException { + super.handleMessage(message); + } +} diff --git a/messaging/src/test/java/messagebroker/handlers/AddHandlerMessageHandlerTest.java b/messaging/src/test/java/messagebroker/handlers/AddHandlerMessageHandlerTest.java new file mode 100644 index 0000000..f93381a --- /dev/null +++ b/messaging/src/test/java/messagebroker/handlers/AddHandlerMessageHandlerTest.java @@ -0,0 +1,34 @@ +package messagebroker.handlers; + +import messagebroker.MessageBroker; +import messagebroker.MessageHandlerContext; +import messagebroker.UnknownMessageFormatException; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class AddHandlerMessageHandlerTest { + + private MessageHandlerContext context = mock(MessageHandlerContext.class); + + public AddHandlerMessageHandlerTest() { + var broker = mock(MessageBroker.class); + when(context.getBroker()).thenReturn(broker); + } + + @Test + public void shouldHandleMessageWithAddHandlerPattern() throws ReflectiveOperationException, UnknownMessageFormatException { + var handlerName = "AnHandler"; + AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context); + handler.handleMessage("CONTROL.ADD_HANDLER " + handlerName); + verify(context.getBroker()).addHandler(handlerName); + } + + @Test(expected = UnknownMessageFormatException.class) + public void shouldThrowWhenMessageIsUnknown() throws ReflectiveOperationException, UnknownMessageFormatException { + var handlerName = "AnHandler"; + AddHandlerMessageHandler handler = new AddHandlerMessageHandler(context); + handler.handleMessage("BAD " + handlerName); + verify(context.getBroker()).addHandler(handlerName); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index a148649..8f935c1 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ async lang persistence + messaging