diff --git a/SymphonyDatabaseNode/pom.xml b/SymphonyDatabaseNode/pom.xml
index 0fe6c02..410d9dd 100644
--- a/SymphonyDatabaseNode/pom.xml
+++ b/SymphonyDatabaseNode/pom.xml
@@ -15,7 +15,11 @@
gson
2.11.0
-
+
+ io.javalin
+ javalin
+ 6.3.0
+
org.hibernate.orm
hibernate-core
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java
index 02d93ac..a02db49 100644
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java
+++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java
@@ -1,16 +1,59 @@
package io.github.lumijiez;
+import io.github.lumijiez.data.Data;
+import io.github.lumijiez.data.entities.PushData;
import io.github.lumijiez.raft.Raft;
+import io.javalin.Javalin;
+import io.javalin.http.Context;
+import jakarta.persistence.EntityManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class Main {
public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost");
public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084"));
+ public static final int HTTP_PORT = Integer.parseInt(System.getenv().getOrDefault("PORT", "8080"));
+
+ private static final Logger logger = LogManager.getLogger(Main.class);
+
public static void main(String[] args) {
- try {
+ Thread raftThread = new Thread(() -> {
Raft raft = new Raft();
- Thread.currentThread().join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Raft thread interrupted", e);
+ }
+ });
+ raftThread.start();
+
+ Javalin app = Javalin.create().start(HTTP_PORT);
+
+ app.post("/push", Main::handlePush);
+
+ logger.info("HTTP server started on port {}", HTTP_PORT);
+ }
+
+ private static void handlePush(Context ctx) {
+ String data = ctx.body();
+ logger.info("Received data for push: {}", data);
+
+ EntityManager entityManager = Data.getEntityManager();
+ try {
+ entityManager.getTransaction().begin();
+
+ PushData entity = new PushData(HOST + ":" + PORT, data);
+ entityManager.persist(entity);
+
+ entityManager.getTransaction().commit();
+ ctx.status(200).result("Data pushed successfully");
+ } catch (Exception e) {
+ logger.error("Error saving data to database", e);
+ entityManager.getTransaction().rollback();
+ ctx.status(500).result("Internal Server Error");
+ } finally {
+ entityManager.close();
}
}
}
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/PushData.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/PushData.java
new file mode 100644
index 0000000..c17a8b3
--- /dev/null
+++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/PushData.java
@@ -0,0 +1,50 @@
+package io.github.lumijiez.data.entities;
+
+import jakarta.persistence.*;
+
+@Entity
+@Table(name = "push_data")
+public class PushData {
+ @Id
+ @GeneratedValue(strategy = GenerationType.IDENTITY)
+ private Long id;
+
+ @Column(nullable = false)
+ private String nodeAddress;
+
+ @Column(nullable = false)
+ private String data;
+
+ public PushData() {
+ }
+
+ public PushData(String nodeAddress, String data) {
+ this.nodeAddress = nodeAddress;
+ this.data = data;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public String getNodeAddress() {
+ return nodeAddress;
+ }
+
+ public void setNodeAddress(String nodeAddress) {
+ this.nodeAddress = nodeAddress;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+}
+
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java
deleted file mode 100644
index ca56251..0000000
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package io.github.lumijiez.data.entities;
-
-import jakarta.persistence.Entity;
-import jakarta.persistence.Id;
-import jakarta.persistence.Table;
-
-@Entity
-@Table(name="users")
-public class User {
-
- @Id
- private Long id;
- private String name;
- private String email;
-
- public User() {}
-
- public User(Long id, String name, String email) {
- this.id = id;
- this.name = name;
- this.email = email;
- }
-
- public Long getId() {
- return id;
- }
-
- public void setId(Long id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getEmail() {
- return email;
- }
-
- public void setEmail(String email) {
- this.email = email;
- }
-}
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java
index 53aa169..02c0155 100644
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java
+++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java
@@ -135,7 +135,7 @@ public class Raft {
String jsonBody = String.format(
"{\"leaderHost\": \"%s\", \"leaderPort\": %d}",
- selfAddress.split(":")[0], selfPort
+ selfAddress.split(":")[0], Main.HTTP_PORT
);
try (OutputStream os = connection.getOutputStream()) {
diff --git a/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml b/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml
index b50a1e9..c01882b 100644
--- a/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml
+++ b/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml
@@ -7,7 +7,7 @@
org.hibernate.jpa.HibernatePersistenceProvider
- io.github.lumijiez.data.entities.User
+ io.github.lumijiez.data.entities.PushData
false
diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java
index b143257..746808e 100644
--- a/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java
+++ b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java
@@ -1,24 +1,22 @@
package io.github.lumijiez;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
+import static io.github.lumijiez.Main.logger;
+
public class AddressHolder {
private String host;
private int port;
private static AddressHolder INSTANCE;
- private AddressHolder() {
-// Timer timer = new Timer(true);
-// timer.scheduleAtFixedRate(new TimerTask() {
-// @Override
-// public void run() {
-// Main.logger.info("Host: {}, Port: {}", host, port);
-// }
-// }, 0, 1000);
- }
-
public static AddressHolder getInstance() {
if (INSTANCE == null) {
INSTANCE = new AddressHolder();
diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java
index e3c0e3b..cf9757c 100644
--- a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java
+++ b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java
@@ -7,13 +7,18 @@ import com.rabbitmq.client.DeliverCallback;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static io.github.lumijiez.Main.logger;
+
public class BrokerConnector {
- private static final String QUEUE_NAME = "random_json_queue";
+ private static final String QUEUE_NAME = "random_sha";
private static final String RABBITMQ_HOST = "rabbitmq";
private static final String RABBITMQ_USER = "symphony";
private static final String RABBITMQ_PASSWORD = "symphony";
@@ -49,11 +54,38 @@ public class BrokerConnector {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
logger.info("Connected to RabbitMQ and queue declared");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+ String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
+ System.out.println("Received message: " + message);
+
+ try {
+ sendPostRequest(AddressHolder.getInstance().getHost(), AddressHolder.getInstance().getPort(), message);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ };
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
shutdownLatch.await();
}
}
+
+ private static void sendPostRequest(String host, int port, String data) throws Exception {
+ URL url = new URL("http://" + host + ":" + port + "/push");
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "text/plain");
+ connection.setDoOutput(true);
+
+ byte[] postData = data.getBytes(StandardCharsets.UTF_8);
+ connection.getOutputStream().write(postData);
+
+ int responseCode = connection.getResponseCode();
+ logger.info("POST to {}:{}/push with data: {}, Response Code: {}", host, port, data, responseCode);
+
+ connection.getInputStream().close();
+ }
+
}
\ No newline at end of file
diff --git a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java
index fe1b151..8b297f2 100644
--- a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java
+++ b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java
@@ -8,6 +8,8 @@ import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@@ -17,7 +19,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class BrokerConnector {
- private static final String QUEUE_NAME = "random_json_queue";
+ private static final String QUEUE_NAME = "random_sha";
private static final String RABBITMQ_HOST = "rabbitmq";
private static final String RABBITMQ_USER = "symphony";
private static final String RABBITMQ_PASSWORD = "symphony";
@@ -50,9 +52,9 @@ public class BrokerConnector {
scheduler.scheduleAtFixedRate(() -> {
try {
- String jsonMessage = generateRandomJson();
+ String jsonMessage = generateRandomSHA256();
channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8));
-// System.out.println("Sent: " + jsonMessage);
+ System.out.println("Sent: " + jsonMessage);
} catch (IOException e) {
logger.error("Failed to send message: {}", e.getMessage());
}
@@ -74,13 +76,26 @@ public class BrokerConnector {
return false;
}
- private static String generateRandomJson() {
- Random random = new Random();
- JsonObject jsonObject = new JsonObject();
- jsonObject.add("id", new JsonPrimitive(random.nextInt(1000)));
- jsonObject.add("name", new JsonPrimitive("Item_" + random.nextInt(100)));
- jsonObject.add("value", new JsonPrimitive(random.nextDouble() * 100));
+ private static String generateRandomSHA256() {
+ try {
+ Random random = new Random();
+ byte[] randomBytes = new byte[32];
+ random.nextBytes(randomBytes);
- return jsonObject.toString();
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ byte[] hash = digest.digest(randomBytes);
+
+ StringBuilder hexString = new StringBuilder();
+ for (byte b : hash) {
+ String hex = Integer.toHexString(0xff & b);
+ if (hex.length() == 1) {
+ hexString.append('0');
+ }
+ hexString.append(hex);
+ }
+ return hexString.toString();
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("SHA-256 algorithm not available", e);
+ }
}
}