From 20b7465d6b99499fd41a90d8814c842328b92070 Mon Sep 17 00:00:00 2001 From: Daniel <59575049+lumijiez@users.noreply.github.com> Date: Fri, 13 Dec 2024 00:03:02 +0200 Subject: [PATCH] queues, and posting to db works --- SymphonyDatabaseNode/pom.xml | 6 ++- .../main/java/io/github/lumijiez/Main.java | 51 +++++++++++++++++-- .../lumijiez/data/entities/PushData.java | 50 ++++++++++++++++++ .../github/lumijiez/data/entities/User.java | 47 ----------------- .../java/io/github/lumijiez/raft/Raft.java | 2 +- .../main/resources/META-INF/persistence.xml | 2 +- .../io/github/lumijiez/AddressHolder.java | 18 +++---- .../io/github/lumijiez/BrokerConnector.java | 36 ++++++++++++- .../io/github/lumijiez/BrokerConnector.java | 35 +++++++++---- 9 files changed, 171 insertions(+), 76 deletions(-) create mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/PushData.java delete mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java diff --git a/SymphonyDatabaseNode/pom.xml b/SymphonyDatabaseNode/pom.xml index 0fe6c02..410d9dd 100644 --- a/SymphonyDatabaseNode/pom.xml +++ b/SymphonyDatabaseNode/pom.xml @@ -15,7 +15,11 @@ gson 2.11.0 - + + io.javalin + javalin + 6.3.0 + org.hibernate.orm hibernate-core diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java index 02d93ac..a02db49 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java @@ -1,16 +1,59 @@ package io.github.lumijiez; +import io.github.lumijiez.data.Data; +import io.github.lumijiez.data.entities.PushData; import io.github.lumijiez.raft.Raft; +import io.javalin.Javalin; +import io.javalin.http.Context; +import jakarta.persistence.EntityManager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class Main { public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost"); public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084")); + public static final int HTTP_PORT = Integer.parseInt(System.getenv().getOrDefault("PORT", "8080")); + + private static final Logger logger = LogManager.getLogger(Main.class); + public static void main(String[] args) { - try { + Thread raftThread = new Thread(() -> { Raft raft = new Raft(); - Thread.currentThread().join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try { + Thread.currentThread().join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Raft thread interrupted", e); + } + }); + raftThread.start(); + + Javalin app = Javalin.create().start(HTTP_PORT); + + app.post("/push", Main::handlePush); + + logger.info("HTTP server started on port {}", HTTP_PORT); + } + + private static void handlePush(Context ctx) { + String data = ctx.body(); + logger.info("Received data for push: {}", data); + + EntityManager entityManager = Data.getEntityManager(); + try { + entityManager.getTransaction().begin(); + + PushData entity = new PushData(HOST + ":" + PORT, data); + entityManager.persist(entity); + + entityManager.getTransaction().commit(); + ctx.status(200).result("Data pushed successfully"); + } catch (Exception e) { + logger.error("Error saving data to database", e); + entityManager.getTransaction().rollback(); + ctx.status(500).result("Internal Server Error"); + } finally { + entityManager.close(); } } } diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/PushData.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/PushData.java new file mode 100644 index 0000000..c17a8b3 --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/PushData.java @@ -0,0 +1,50 @@ +package io.github.lumijiez.data.entities; + +import jakarta.persistence.*; + +@Entity +@Table(name = "push_data") +public class PushData { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(nullable = false) + private String nodeAddress; + + @Column(nullable = false) + private String data; + + public PushData() { + } + + public PushData(String nodeAddress, String data) { + this.nodeAddress = nodeAddress; + this.data = data; + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getNodeAddress() { + return nodeAddress; + } + + public void setNodeAddress(String nodeAddress) { + this.nodeAddress = nodeAddress; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } +} + diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java deleted file mode 100644 index ca56251..0000000 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java +++ /dev/null @@ -1,47 +0,0 @@ -package io.github.lumijiez.data.entities; - -import jakarta.persistence.Entity; -import jakarta.persistence.Id; -import jakarta.persistence.Table; - -@Entity -@Table(name="users") -public class User { - - @Id - private Long id; - private String name; - private String email; - - public User() {} - - public User(Long id, String name, String email) { - this.id = id; - this.name = name; - this.email = email; - } - - public Long getId() { - return id; - } - - public void setId(Long id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getEmail() { - return email; - } - - public void setEmail(String email) { - this.email = email; - } -} diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java index 53aa169..02c0155 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java @@ -135,7 +135,7 @@ public class Raft { String jsonBody = String.format( "{\"leaderHost\": \"%s\", \"leaderPort\": %d}", - selfAddress.split(":")[0], selfPort + selfAddress.split(":")[0], Main.HTTP_PORT ); try (OutputStream os = connection.getOutputStream()) { diff --git a/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml b/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml index b50a1e9..c01882b 100644 --- a/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml +++ b/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml @@ -7,7 +7,7 @@ org.hibernate.jpa.HibernatePersistenceProvider - io.github.lumijiez.data.entities.User + io.github.lumijiez.data.entities.PushData false diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java index b143257..746808e 100644 --- a/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java +++ b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java @@ -1,24 +1,22 @@ package io.github.lumijiez; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Random; import java.util.Timer; import java.util.TimerTask; +import static io.github.lumijiez.Main.logger; + public class AddressHolder { private String host; private int port; private static AddressHolder INSTANCE; - private AddressHolder() { -// Timer timer = new Timer(true); -// timer.scheduleAtFixedRate(new TimerTask() { -// @Override -// public void run() { -// Main.logger.info("Host: {}, Port: {}", host, port); -// } -// }, 0, 1000); - } - public static AddressHolder getInstance() { if (INSTANCE == null) { INSTANCE = new AddressHolder(); diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java index e3c0e3b..cf9757c 100644 --- a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java +++ b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -7,13 +7,18 @@ import com.rabbitmq.client.DeliverCallback; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static io.github.lumijiez.Main.logger; + public class BrokerConnector { - private static final String QUEUE_NAME = "random_json_queue"; + private static final String QUEUE_NAME = "random_sha"; private static final String RABBITMQ_HOST = "rabbitmq"; private static final String RABBITMQ_USER = "symphony"; private static final String RABBITMQ_PASSWORD = "symphony"; @@ -49,11 +54,38 @@ public class BrokerConnector { channel.queueDeclare(QUEUE_NAME, false, false, false, null); logger.info("Connected to RabbitMQ and queue declared"); - DeliverCallback deliverCallback = (consumerTag, delivery) -> channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + System.out.println("Received message: " + message); + + try { + sendPostRequest(AddressHolder.getInstance().getHost(), AddressHolder.getInstance().getPort(), message); + } catch (Exception e) { + throw new RuntimeException(e); + } + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); shutdownLatch.await(); } } + + private static void sendPostRequest(String host, int port, String data) throws Exception { + URL url = new URL("http://" + host + ":" + port + "/push"); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "text/plain"); + connection.setDoOutput(true); + + byte[] postData = data.getBytes(StandardCharsets.UTF_8); + connection.getOutputStream().write(postData); + + int responseCode = connection.getResponseCode(); + logger.info("POST to {}:{}/push with data: {}, Response Code: {}", host, port, data, responseCode); + + connection.getInputStream().close(); + } + } \ No newline at end of file diff --git a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java index fe1b151..8b297f2 100644 --- a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java +++ b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -8,6 +8,8 @@ import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -17,7 +19,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class BrokerConnector { - private static final String QUEUE_NAME = "random_json_queue"; + private static final String QUEUE_NAME = "random_sha"; private static final String RABBITMQ_HOST = "rabbitmq"; private static final String RABBITMQ_USER = "symphony"; private static final String RABBITMQ_PASSWORD = "symphony"; @@ -50,9 +52,9 @@ public class BrokerConnector { scheduler.scheduleAtFixedRate(() -> { try { - String jsonMessage = generateRandomJson(); + String jsonMessage = generateRandomSHA256(); channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); -// System.out.println("Sent: " + jsonMessage); + System.out.println("Sent: " + jsonMessage); } catch (IOException e) { logger.error("Failed to send message: {}", e.getMessage()); } @@ -74,13 +76,26 @@ public class BrokerConnector { return false; } - private static String generateRandomJson() { - Random random = new Random(); - JsonObject jsonObject = new JsonObject(); - jsonObject.add("id", new JsonPrimitive(random.nextInt(1000))); - jsonObject.add("name", new JsonPrimitive("Item_" + random.nextInt(100))); - jsonObject.add("value", new JsonPrimitive(random.nextDouble() * 100)); + private static String generateRandomSHA256() { + try { + Random random = new Random(); + byte[] randomBytes = new byte[32]; + random.nextBytes(randomBytes); - return jsonObject.toString(); + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(randomBytes); + + StringBuilder hexString = new StringBuilder(); + for (byte b : hash) { + String hex = Integer.toHexString(0xff & b); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 algorithm not available", e); + } } }