queues, and posting to db works

This commit is contained in:
Daniel
2024-12-13 00:03:02 +02:00
parent 8b5f755152
commit 20b7465d6b
9 changed files with 171 additions and 76 deletions

View File

@@ -15,7 +15,11 @@
<artifactId>gson</artifactId> <artifactId>gson</artifactId>
<version>2.11.0</version> <version>2.11.0</version>
</dependency> </dependency>
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>6.3.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.hibernate.orm</groupId> <groupId>org.hibernate.orm</groupId>
<artifactId>hibernate-core</artifactId> <artifactId>hibernate-core</artifactId>

View File

@@ -1,16 +1,59 @@
package io.github.lumijiez; package io.github.lumijiez;
import io.github.lumijiez.data.Data;
import io.github.lumijiez.data.entities.PushData;
import io.github.lumijiez.raft.Raft; import io.github.lumijiez.raft.Raft;
import io.javalin.Javalin;
import io.javalin.http.Context;
import jakarta.persistence.EntityManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Main { public class Main {
public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost"); public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost");
public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084")); public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084"));
public static final int HTTP_PORT = Integer.parseInt(System.getenv().getOrDefault("PORT", "8080"));
private static final Logger logger = LogManager.getLogger(Main.class);
public static void main(String[] args) { public static void main(String[] args) {
try { Thread raftThread = new Thread(() -> {
Raft raft = new Raft(); Raft raft = new Raft();
Thread.currentThread().join(); try {
} catch (InterruptedException e) { Thread.currentThread().join();
Thread.currentThread().interrupt(); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Raft thread interrupted", e);
}
});
raftThread.start();
Javalin app = Javalin.create().start(HTTP_PORT);
app.post("/push", Main::handlePush);
logger.info("HTTP server started on port {}", HTTP_PORT);
}
private static void handlePush(Context ctx) {
String data = ctx.body();
logger.info("Received data for push: {}", data);
EntityManager entityManager = Data.getEntityManager();
try {
entityManager.getTransaction().begin();
PushData entity = new PushData(HOST + ":" + PORT, data);
entityManager.persist(entity);
entityManager.getTransaction().commit();
ctx.status(200).result("Data pushed successfully");
} catch (Exception e) {
logger.error("Error saving data to database", e);
entityManager.getTransaction().rollback();
ctx.status(500).result("Internal Server Error");
} finally {
entityManager.close();
} }
} }
} }

View File

@@ -0,0 +1,50 @@
package io.github.lumijiez.data.entities;
import jakarta.persistence.*;
@Entity
@Table(name = "push_data")
public class PushData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String nodeAddress;
@Column(nullable = false)
private String data;
public PushData() {
}
public PushData(String nodeAddress, String data) {
this.nodeAddress = nodeAddress;
this.data = data;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getNodeAddress() {
return nodeAddress;
}
public void setNodeAddress(String nodeAddress) {
this.nodeAddress = nodeAddress;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}

View File

@@ -1,47 +0,0 @@
package io.github.lumijiez.data.entities;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
@Entity
@Table(name="users")
public class User {
@Id
private Long id;
private String name;
private String email;
public User() {}
public User(Long id, String name, String email) {
this.id = id;
this.name = name;
this.email = email;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
}

View File

@@ -135,7 +135,7 @@ public class Raft {
String jsonBody = String.format( String jsonBody = String.format(
"{\"leaderHost\": \"%s\", \"leaderPort\": %d}", "{\"leaderHost\": \"%s\", \"leaderPort\": %d}",
selfAddress.split(":")[0], selfPort selfAddress.split(":")[0], Main.HTTP_PORT
); );
try (OutputStream os = connection.getOutputStream()) { try (OutputStream os = connection.getOutputStream()) {

View File

@@ -7,7 +7,7 @@
<persistence-unit name="mainUnit" transaction-type="RESOURCE_LOCAL"> <persistence-unit name="mainUnit" transaction-type="RESOURCE_LOCAL">
<provider>org.hibernate.jpa.HibernatePersistenceProvider</provider> <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
<class>io.github.lumijiez.data.entities.User</class> <class>io.github.lumijiez.data.entities.PushData</class>
<exclude-unlisted-classes>false</exclude-unlisted-classes> <exclude-unlisted-classes>false</exclude-unlisted-classes>

View File

@@ -1,24 +1,22 @@
package io.github.lumijiez; package io.github.lumijiez;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import static io.github.lumijiez.Main.logger;
public class AddressHolder { public class AddressHolder {
private String host; private String host;
private int port; private int port;
private static AddressHolder INSTANCE; private static AddressHolder INSTANCE;
private AddressHolder() {
// Timer timer = new Timer(true);
// timer.scheduleAtFixedRate(new TimerTask() {
// @Override
// public void run() {
// Main.logger.info("Host: {}, Port: {}", host, port);
// }
// }, 0, 1000);
}
public static AddressHolder getInstance() { public static AddressHolder getInstance() {
if (INSTANCE == null) { if (INSTANCE == null) {
INSTANCE = new AddressHolder(); INSTANCE = new AddressHolder();

View File

@@ -7,13 +7,18 @@ import com.rabbitmq.client.DeliverCallback;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.github.lumijiez.Main.logger;
public class BrokerConnector { public class BrokerConnector {
private static final String QUEUE_NAME = "random_json_queue"; private static final String QUEUE_NAME = "random_sha";
private static final String RABBITMQ_HOST = "rabbitmq"; private static final String RABBITMQ_HOST = "rabbitmq";
private static final String RABBITMQ_USER = "symphony"; private static final String RABBITMQ_USER = "symphony";
private static final String RABBITMQ_PASSWORD = "symphony"; private static final String RABBITMQ_PASSWORD = "symphony";
@@ -49,11 +54,38 @@ public class BrokerConnector {
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
logger.info("Connected to RabbitMQ and queue declared"); logger.info("Connected to RabbitMQ and queue declared");
DeliverCallback deliverCallback = (consumerTag, delivery) -> channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received message: " + message);
try {
sendPostRequest(AddressHolder.getInstance().getHost(), AddressHolder.getInstance().getPort(), message);
} catch (Exception e) {
throw new RuntimeException(e);
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
shutdownLatch.await(); shutdownLatch.await();
} }
} }
private static void sendPostRequest(String host, int port, String data) throws Exception {
URL url = new URL("http://" + host + ":" + port + "/push");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "text/plain");
connection.setDoOutput(true);
byte[] postData = data.getBytes(StandardCharsets.UTF_8);
connection.getOutputStream().write(postData);
int responseCode = connection.getResponseCode();
logger.info("POST to {}:{}/push with data: {}, Response Code: {}", host, port, data, responseCode);
connection.getInputStream().close();
}
} }

View File

@@ -8,6 +8,8 @@ import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@@ -17,7 +19,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
public class BrokerConnector { public class BrokerConnector {
private static final String QUEUE_NAME = "random_json_queue"; private static final String QUEUE_NAME = "random_sha";
private static final String RABBITMQ_HOST = "rabbitmq"; private static final String RABBITMQ_HOST = "rabbitmq";
private static final String RABBITMQ_USER = "symphony"; private static final String RABBITMQ_USER = "symphony";
private static final String RABBITMQ_PASSWORD = "symphony"; private static final String RABBITMQ_PASSWORD = "symphony";
@@ -50,9 +52,9 @@ public class BrokerConnector {
scheduler.scheduleAtFixedRate(() -> { scheduler.scheduleAtFixedRate(() -> {
try { try {
String jsonMessage = generateRandomJson(); String jsonMessage = generateRandomSHA256();
channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8)); channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes(StandardCharsets.UTF_8));
// System.out.println("Sent: " + jsonMessage); System.out.println("Sent: " + jsonMessage);
} catch (IOException e) { } catch (IOException e) {
logger.error("Failed to send message: {}", e.getMessage()); logger.error("Failed to send message: {}", e.getMessage());
} }
@@ -74,13 +76,26 @@ public class BrokerConnector {
return false; return false;
} }
private static String generateRandomJson() { private static String generateRandomSHA256() {
Random random = new Random(); try {
JsonObject jsonObject = new JsonObject(); Random random = new Random();
jsonObject.add("id", new JsonPrimitive(random.nextInt(1000))); byte[] randomBytes = new byte[32];
jsonObject.add("name", new JsonPrimitive("Item_" + random.nextInt(100))); random.nextBytes(randomBytes);
jsonObject.add("value", new JsonPrimitive(random.nextDouble() * 100));
return jsonObject.toString(); MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(randomBytes);
StringBuilder hexString = new StringBuilder();
for (byte b : hash) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("SHA-256 algorithm not available", e);
}
} }
} }