From fbd07659c788f1727b281385283a733daa7c858f Mon Sep 17 00:00:00 2001 From: Lumijiez Date: Fri, 22 Nov 2024 20:14:20 +0200 Subject: [PATCH] Producer doesn't busy-wait now --- .../main/java/io/github/lumijiez/Main.java | 95 +++++++++---------- 1 file changed, 45 insertions(+), 50 deletions(-) diff --git a/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java b/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java index eecb332..a7f564a 100644 --- a/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyProducer/src/main/java/io/github/lumijiez/Main.java @@ -2,16 +2,17 @@ package io.github.lumijiez; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.CountDownLatch; public class Main { private static final String QUEUE_NAME = "random_json_queue"; @@ -20,59 +21,53 @@ public class Main { private static final String RABBITMQ_PASSWORD = "symphony"; public static void main(String[] args) { + CountDownLatch latch = new CountDownLatch(1); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("Shutdown signal received."); + latch.countDown(); + })); + + boolean success = connectToRabbitMQ(latch); + } + + private static boolean connectToRabbitMQ(CountDownLatch latch) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RABBITMQ_HOST); factory.setUsername(RABBITMQ_USER); factory.setPassword(RABBITMQ_PASSWORD); - Connection connection = null; - Channel channel = null; + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor()) { - while (true) { + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println("Connected to RabbitMQ and queue declared."); + + scheduler.scheduleAtFixedRate(() -> { + try { + String jsonMessage = generateRandomJson(); + channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); + System.out.println("Sent: " + jsonMessage); + } catch (IOException e) { + System.err.println("Failed to send message: " + e.getMessage()); + } + }, 0, 10, TimeUnit.SECONDS); + + latch.await(); + scheduler.shutdown(); + + return scheduler.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + System.err.println("RabbitMQ connection error: " + e.getMessage()); try { - System.out.println("Attempting to connect to RabbitMQ..."); - - connection = factory.newConnection(); - channel = connection.createChannel(); - - channel.queueDeclare(QUEUE_NAME, false, false, false, null); - System.out.println("Connected to RabbitMQ and queue declared."); - - Timer timer = new Timer(true); - Channel finalChannel = channel; - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try { - String jsonMessage = generateRandomJson(); - finalChannel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); - System.out.println("Sent: " + jsonMessage); - } catch (IOException e) { - System.err.println("Failed to send message: " + e.getMessage()); - } - } - }, 0, 10000); - - System.out.println("Press Ctrl+C to exit."); - Thread.sleep(Long.MAX_VALUE); - break; - - } catch (IOException | TimeoutException | InterruptedException e) { - System.err.println("Failed to connect to RabbitMQ: " + e.getMessage()); - System.err.println("Retrying in 5 seconds..."); - try { - if (connection != null && connection.isOpen()) connection.close(); - if (channel != null && channel.isOpen()) channel.close(); - } catch (IOException | TimeoutException ignored) { - } - - try { - Thread.sleep(5000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + Thread.sleep(5000); + connectToRabbitMQ(latch); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } } + return false; } private static String generateRandomJson() { @@ -84,4 +79,4 @@ public class Main { return jsonObject.toString(); } -} +} \ No newline at end of file