diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java index 3dc4148..a4214aa 100644 --- a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java +++ b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -5,6 +5,8 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -14,17 +16,23 @@ public class BrokerConnector { private static final String RABBITMQ_HOST = "rabbitmq"; private static final String RABBITMQ_USER = "symphony"; private static final String RABBITMQ_PASSWORD = "symphony"; + private static final CountDownLatch shutdownLatch = new CountDownLatch(1); public static void connect() { - ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor(); + try (ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor()) { + reconnectExecutor.scheduleWithFixedDelay(() -> { + try { + connectToRabbitMQ(); + } catch (Exception e) { + System.err.println("Awaiting broker connection: " + e.getMessage()); + } + }, 0, 5, TimeUnit.SECONDS); - reconnectExecutor.scheduleWithFixedDelay(() -> { - try { - connectToRabbitMQ(); - } catch (Exception e) { - System.err.println("Awaiting broker connection: " + e.getMessage()); - } - }, 0, 5, TimeUnit.SECONDS); + shutdownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.err.println("Connector interrupted: " + e.getMessage()); + } } private static void connectToRabbitMQ() throws Exception { @@ -40,15 +48,18 @@ public class BrokerConnector { System.out.println("Connected to RabbitMQ and queue declared."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { - // String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8)); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); - Thread.currentThread().join(); + shutdownLatch.await(); } } -} + public static void shutdown() { + shutdownLatch.countDown(); + } +} \ No newline at end of file diff --git a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java index fd50780..394eee5 100644 --- a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java +++ b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -53,7 +53,7 @@ public class BrokerConnector { } catch (IOException e) { System.err.println("Failed to send message: " + e.getMessage()); } - }, 0, 1, TimeUnit.NANOSECONDS); + }, 0, 1, TimeUnit.SECONDS); latch.await(); scheduler.shutdown();