diff --git a/SymphonyManager/Dockerfile b/SymphonyManager/Dockerfile index 3624fdd..d4f0c5c 100644 --- a/SymphonyManager/Dockerfile +++ b/SymphonyManager/Dockerfile @@ -1,7 +1,7 @@ FROM ubuntu:latest LABEL authors="lumijiez" -FROM maven:3.9.8-eclipse-temurin-21 AS build +FROM maven:3.9.9-eclipse-temurin-21 AS build WORKDIR /app diff --git a/SymphonyManager/pom.xml b/SymphonyManager/pom.xml index 3a4ee54..e952658 100644 --- a/SymphonyManager/pom.xml +++ b/SymphonyManager/pom.xml @@ -10,6 +10,41 @@ jar + + com.rabbitmq + amqp-client + 5.23.0 + + + + com.google.code.gson + gson + 2.11.0 + + + + org.slf4j + slf4j-api + 1.7.32 + + + + ch.qos.logback + logback-classic + 1.5.12 + + + + ch.qos.logback + logback-core + 1.5.12 + + + + org.slf4j + slf4j-simple + 1.7.32 + @@ -24,6 +59,20 @@ + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.maven.plugins maven-jar-plugin @@ -38,5 +87,4 @@ - diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java new file mode 100644 index 0000000..445955a --- /dev/null +++ b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -0,0 +1,53 @@ +package io.github.lumijiez; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class BrokerConnector { + private static final String QUEUE_NAME = "random_json_queue"; + private static final String RABBITMQ_HOST = "rabbitmq"; + private static final String RABBITMQ_USER = "symphony"; + private static final String RABBITMQ_PASSWORD = "symphony"; + + public static void connect() { + ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); + + reconnectExecutor.scheduleWithFixedDelay(() -> { + try { + connectToRabbitMQ(); + } catch (Exception e) { + System.err.println("Awaiting broker connection: " + e.getMessage()); + } + }, 0, 5, TimeUnit.SECONDS); + } + + private static void connectToRabbitMQ() throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(RABBITMQ_HOST); + factory.setUsername(RABBITMQ_USER); + factory.setPassword(RABBITMQ_PASSWORD); + + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println("Waiting for messages. To exit press CTRL+C"); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + System.out.println("Received: " + message); + }; + + channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); + + Thread.currentThread().join(); + } + } +} diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/Main.java b/SymphonyManager/src/main/java/io/github/lumijiez/Main.java index c634d85..07409b3 100644 --- a/SymphonyManager/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyManager/src/main/java/io/github/lumijiez/Main.java @@ -1,7 +1,8 @@ package io.github.lumijiez; public class Main { + public static void main(String[] args) { - System.out.print("Manager started!"); + BrokerConnector.connect(); } } \ No newline at end of file diff --git a/SymphonyProducer/pom.xml b/SymphonyProducer/pom.xml index 8fa328a..3c865e1 100644 --- a/SymphonyProducer/pom.xml +++ b/SymphonyProducer/pom.xml @@ -39,6 +39,12 @@ logback-core 1.5.12 + + + org.slf4j + slf4j-simple + 1.7.32 + diff --git a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java new file mode 100644 index 0000000..5e0cf59 --- /dev/null +++ b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -0,0 +1,83 @@ +package io.github.lumijiez; + +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class BrokerConnector { + private static final String QUEUE_NAME = "random_json_queue"; + private static final String RABBITMQ_HOST = "rabbitmq"; + private static final String RABBITMQ_USER = "symphony"; + private static final String RABBITMQ_PASSWORD = "symphony"; + + public static void connect() { + CountDownLatch latch = new CountDownLatch(1); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("Shutdown signal received."); + latch.countDown(); + })); + + boolean success = connectToRabbitMQ(latch); + System.out.println("Success: " + success); + } + + private static boolean connectToRabbitMQ(CountDownLatch latch) { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(RABBITMQ_HOST); + factory.setUsername(RABBITMQ_USER); + factory.setPassword(RABBITMQ_PASSWORD); + + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor()) { + + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println("Connected to RabbitMQ and queue declared."); + + scheduler.scheduleAtFixedRate(() -> { + try { + String jsonMessage = generateRandomJson(); + channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); + System.out.println("Sent: " + jsonMessage); + } catch (IOException e) { + System.err.println("Failed to send message: " + e.getMessage()); + } + }, 0, 10, TimeUnit.SECONDS); + + latch.await(); + scheduler.shutdown(); + + return scheduler.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + System.err.println("Awaiting broker connection: " + e.getMessage()); + try { + Thread.sleep(5000); + connectToRabbitMQ(latch); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + return false; + } + + private static String generateRandomJson() { + Random random = new Random(); + JsonObject jsonObject = new JsonObject(); + jsonObject.add("id", new JsonPrimitive(random.nextInt(1000))); + jsonObject.add("name", new JsonPrimitive("Item_" + random.nextInt(100))); + jsonObject.add("value", new JsonPrimitive(random.nextDouble() * 100)); + + return jsonObject.toString(); + } +} diff --git a/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java b/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java index a7f564a..07409b3 100644 --- a/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java @@ -1,82 +1,8 @@ package io.github.lumijiez; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.CountDownLatch; - public class Main { - private static final String QUEUE_NAME = "random_json_queue"; - private static final String RABBITMQ_HOST = "rabbitmq"; - private static final String RABBITMQ_USER = "symphony"; - private static final String RABBITMQ_PASSWORD = "symphony"; public static void main(String[] args) { - CountDownLatch latch = new CountDownLatch(1); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("Shutdown signal received."); - latch.countDown(); - })); - - boolean success = connectToRabbitMQ(latch); - } - - private static boolean connectToRabbitMQ(CountDownLatch latch) { - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(RABBITMQ_HOST); - factory.setUsername(RABBITMQ_USER); - factory.setPassword(RABBITMQ_PASSWORD); - - try (Connection connection = factory.newConnection(); - Channel channel = connection.createChannel(); - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor()) { - - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - System.out.println("Connected to RabbitMQ and queue declared."); - - scheduler.scheduleAtFixedRate(() -> { - try { - String jsonMessage = generateRandomJson(); - channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); - System.out.println("Sent: " + jsonMessage); - } catch (IOException e) { - System.err.println("Failed to send message: " + e.getMessage()); - } - }, 0, 10, TimeUnit.SECONDS); - - latch.await(); - scheduler.shutdown(); - - return scheduler.awaitTermination(5, TimeUnit.SECONDS); - } catch (Exception e) { - System.err.println("RabbitMQ connection error: " + e.getMessage()); - try { - Thread.sleep(5000); - connectToRabbitMQ(latch); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } - return false; - } - - private static String generateRandomJson() { - Random random = new Random(); - JsonObject jsonObject = new JsonObject(); - jsonObject.add("id", new JsonPrimitive(random.nextInt(1000))); - jsonObject.add("name", new JsonPrimitive("Item_" + random.nextInt(100))); - jsonObject.add("value", new JsonPrimitive(random.nextDouble() * 100)); - - return jsonObject.toString(); + BrokerConnector.connect(); } } \ No newline at end of file diff --git a/config/rabbitmq/rabbit.conf b/config/rabbitmq/rabbit.conf new file mode 100644 index 0000000..099a149 --- /dev/null +++ b/config/rabbitmq/rabbit.conf @@ -0,0 +1,2 @@ +log.console = false +log.console.level = error diff --git a/docker-compose.yml b/docker-compose.yml index dc6e1bd..e9fe6d1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,14 +27,11 @@ services: - RABBITMQ_DEFAULT_PASS=symphony - RABBITMQ_NODENAME=rabbit@rabbitmq volumes: - - rabbitmq_data:/var/lib/rabbitmq + - ./config/rabbitmq/rabbit.conf:/etc/rabbitmq/rabbitmq.conf:ro networks: - symphony-network restart: always networks: symphony-network: - driver: bridge - -volumes: - rabbitmq_data: \ No newline at end of file + driver: bridge \ No newline at end of file