diff --git a/.idea/misc.xml b/.idea/misc.xml
index d4fe832..ec9392a 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -9,6 +9,7 @@
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java
index 0d06f91..02d93ac 100644
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java
+++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java
@@ -1,14 +1,13 @@
package io.github.lumijiez;
-
-import io.github.lumijiez.app.NodeManager;
+import io.github.lumijiez.raft.Raft;
public class Main {
-
+ public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost");
+ public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084"));
public static void main(String[] args) {
- NodeManager manager = new NodeManager();
-
try {
+ Raft raft = new Raft();
Thread.currentThread().join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java
deleted file mode 100644
index c2dfc6b..0000000
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package io.github.lumijiez.app;
-
-import io.github.lumijiez.data.models.NodeInfo;
-import io.github.lumijiez.network.UdpListener;
-import io.github.lumijiez.network.UdpMessageSender;
-import io.github.lumijiez.network.WebSocketManager;
-import io.github.lumijiez.raft.Raft;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class NodeManager {
- private final Logger logger = LogManager.getLogger(NodeManager.class);
-
- private final UdpListener listener;
- private final UdpMessageSender sender;
- private final WebSocketManager ws;
- private final Raft raft;
- private final List nodes = new ArrayList<>();
- public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost");
- public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084"));
-
- public NodeManager() {
- this.listener = new UdpListener(this);
- this.sender = new UdpMessageSender(this);
- this.ws = new WebSocketManager(this);
- this.raft = new Raft(this, sender);
-
- listener.startListening();
- ws.connectAndListen();
- raft.start();
- }
-
- public void handleMessage(String message) {
- raft.processMessage(message);
- }
-
- public List getNodes() {
- return nodes;
- }
-
- public void registerNode(NodeInfo node) {
- if (!nodes.contains(node)) {
- nodes.add(node);
- sender.sendMessage(node, "NODE_REGISTERED");
- // logger.info("New node registered, updating Raft peers");
- }
- }
-
- public void removeNode(NodeInfo node) {
- nodes.remove(node);
- // logger.info("Node removed, updating Raft peers");
- }
-
- public UdpMessageSender getSender() {
- return sender;
- }
-}
\ No newline at end of file
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java
deleted file mode 100644
index 1cc6958..0000000
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package io.github.lumijiez.data.models;
-
-public record NodeInfo(String hostname, int port) {
- public static NodeInfo fromString(String nodeString) {
- String[] parts = nodeString.split(":");
- if (parts.length != 2) {
- throw new IllegalArgumentException("Invalid node string format. Expected 'hostname:port'");
- }
- return new NodeInfo(parts[0], Integer.parseInt(parts[1]));
- }
-
- @Override
- public String toString() {
- return hostname + ":" + port;
- }
-}
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java
index 30b9aac..22e918e 100644
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java
+++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java
@@ -1,6 +1,5 @@
package io.github.lumijiez.network;
-import io.github.lumijiez.app.NodeManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -11,26 +10,31 @@ import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
+import java.util.function.Consumer;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
public class UdpListener {
private static final Logger logger = LogManager.getLogger(UdpListener.class);
+ private final int port;
+ private final Consumer messageHandler;
+ private final Gson gson = new Gson();
- private final NodeManager nodeManager;
-
- public UdpListener(NodeManager nodeManager) {
- this.nodeManager = nodeManager;
+ public UdpListener(int port, Consumer messageHandler) {
+ this.port = port;
+ this.messageHandler = messageHandler;
}
public void startListening() {
- Thread udpListenerThread = new Thread(() -> {
+ Thread listenerThread = new Thread(() -> {
try (Selector selector = Selector.open();
DatagramChannel channel = DatagramChannel.open()) {
- channel.bind(new InetSocketAddress(NodeManager.PORT));
+ channel.bind(new InetSocketAddress(port));
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
- logger.info("UDP listens on port {}", NodeManager.PORT);
+ logger.info("Listening for UDP messages on port {}", port);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (!Thread.currentThread().isInterrupted()) {
@@ -48,15 +52,37 @@ public class UdpListener {
buffer.flip();
String message = new String(buffer.array(), 0, buffer.limit()).trim();
- // logger.info("Received UDP {}:{}: {}", sender.getHostName(), sender.getPort(), message);
- nodeManager.handleMessage(message);
+ logger.info("Received message from {}:{} - {}", sender.getHostName(), sender.getPort(), message);
+ try {
+ JsonMessage jsonMessage = gson.fromJson(message, JsonMessage.class);
+ messageHandler.accept(jsonMessage);
+ } catch (JsonSyntaxException e) {
+ logger.error("Invalid JSON received: {}", message, e);
+ }
}
}
}
} catch (IOException e) {
- logger.error("Error in UDP listener: {}", e.getMessage());
+ logger.error("Error in UDP listener: {}", e.getMessage(), e);
}
});
- udpListenerThread.start();
+
+ listenerThread.start();
}
-}
\ No newline at end of file
+
+ public static class JsonMessage {
+ public String type;
+ public int term;
+ public String sender;
+ public String additionalData;
+
+ public JsonMessage() {}
+
+ public JsonMessage(String type, int term, String sender, String additionalData) {
+ this.type = type;
+ this.term = term;
+ this.sender = sender;
+ this.additionalData = additionalData;
+ }
+ }
+}
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java
deleted file mode 100644
index 8db0e5d..0000000
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package io.github.lumijiez.network;
-
-import io.github.lumijiez.app.NodeManager;
-import io.github.lumijiez.data.models.NodeInfo;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-
-public class UdpMessageSender {
- private static final Logger logger = LogManager.getLogger(UdpMessageSender.class);
- private final NodeManager nodeManager;
-
- public UdpMessageSender(NodeManager nodeManager) {
- this.nodeManager = nodeManager;
- }
-
- public void sendMessage(NodeInfo node, String message) {
- try (DatagramChannel channel = DatagramChannel.open()) {
- channel.configureBlocking(false);
- ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
- channel.send(buffer, new InetSocketAddress(node.hostname(), node.port()));
- // logger.info("Sent UDP to {}:{}: {}", node.hostname(), node.port(), message);
- } catch (IOException e) {
- logger.error("Error sending UDP message: {}", e.getMessage());
- }
- }
-}
\ No newline at end of file
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpSender.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpSender.java
new file mode 100644
index 0000000..ee5c64a
--- /dev/null
+++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpSender.java
@@ -0,0 +1,27 @@
+package io.github.lumijiez.network;
+
+import com.google.gson.Gson;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+public class UdpSender {
+ private static final Logger logger = LogManager.getLogger(UdpSender.class);
+ private final Gson gson = new Gson();
+
+ public void sendMessage(String hostname, int port, Object message) {
+ try (DatagramChannel channel = DatagramChannel.open()) {
+ channel.configureBlocking(false);
+ String jsonMessage = gson.toJson(message);
+ ByteBuffer buffer = ByteBuffer.wrap(jsonMessage.getBytes());
+ channel.send(buffer, new InetSocketAddress(hostname, port));
+ logger.info("Sent message to {}:{} - {}", hostname, port, jsonMessage);
+ } catch (IOException e) {
+ logger.error("Error sending UDP message: {}", e.getMessage(), e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java
deleted file mode 100644
index 1720786..0000000
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package io.github.lumijiez.network;
-
-import io.github.lumijiez.app.NodeManager;
-import io.github.lumijiez.data.models.NodeInfo;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
-
-import java.net.URI;
-import java.net.http.HttpClient;
-import java.net.http.WebSocket;
-import java.util.List;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CompletableFuture;
-import java.lang.reflect.Type;
-
-public class WebSocketManager {
- private static final Logger logger = LogManager.getLogger(WebSocketManager.class);
- private static final Gson gson = new Gson();
- private static final CountDownLatch waitForConnection = new CountDownLatch(1);
-
- private WebSocket webSocket;
- private final NodeManager nodeManager;
-
- public WebSocketManager(NodeManager nodeManager) {
- this.nodeManager = nodeManager;
- }
-
- public void connectAndListen() {
- try (HttpClient client = HttpClient.newHttpClient()) {
- CompletableFuture wsFuture = client
- .newWebSocketBuilder()
- .buildAsync(new URI("ws://symphony-discovery:8083/discovery"), new WebSocket.Listener() {
- @Override
- public CompletionStage> onText(WebSocket webSocket, CharSequence data, boolean last) {
- try {
- Type nodeListType = new TypeToken>() {}.getType();
- List nodes = gson.fromJson(data.toString(), nodeListType);
-
- for (NodeInfo newNode : nodes) {
- nodeManager.registerNode(newNode);
- }
-
- } catch (Exception e) {
- logger.error("Error processing WebSocket data: {}", e.getMessage());
- }
- return WebSocket.Listener.super.onText(webSocket, data, last);
- }
-
- @Override
- public void onOpen(WebSocket webSocket) {
- NodeInfo nodeInfo = new NodeInfo(NodeManager.HOST, NodeManager.PORT);
- webSocket.sendText(gson.toJson(nodeInfo), true);
- logger.info("Successfully registered to Discovery");
- waitForConnection.countDown();
- WebSocket.Listener.super.onOpen(webSocket);
- }
-
- @Override
- public CompletionStage> onClose(WebSocket webSocket, int statusCode, String reason) {
- logger.info("Unregistered from Discovery: {}", reason);
- return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
- }
-
- @Override
- public void onError(WebSocket webSocket, Throwable error) {
- logger.error("WebSocket error: {}", error.getMessage());
- WebSocket.Listener.super.onError(webSocket, error);
- }
- });
-
- webSocket = wsFuture.join();
- waitForConnection.await();
-
- } catch (Exception e) {
- logger.error("Error in WebSocketManager: {}", e.getMessage());
- }
- }
-}
\ No newline at end of file
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java
index a001d21..53aa169 100644
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java
+++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java
@@ -1,177 +1,234 @@
package io.github.lumijiez.raft;
-import io.github.lumijiez.app.NodeManager;
-import io.github.lumijiez.data.models.NodeInfo;
-import io.github.lumijiez.network.UdpMessageSender;
+import com.google.gson.Gson;
+import io.github.lumijiez.Main;
+import io.github.lumijiez.network.UdpListener;
+import io.github.lumijiez.network.UdpSender;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.*;
public class Raft {
- private static final Random RANDOM = new Random();
- private final Logger logger = LogManager.getLogger(Raft.class);
+ private static final Logger logger = LogManager.getLogger(Raft.class);
- // Configuration Parameters
- private static final int MIN_ELECTION_TIMEOUT = 300;
- private static final int MAX_ELECTION_TIMEOUT = 600;
- private static final int HEARTBEAT_INTERVAL = 100;
- private static final int QUORUM_FACTOR = 2;
+ private enum State {
+ FOLLOWER, CANDIDATE, LEADER
+ }
- // State Variables
- private RaftStates state = RaftStates.FOLLOWER;
+ private static final List NODES = Arrays.asList(
+ "node1:8105", "node2:8106", "node3:8107", "node4:8108", "node5:8109"
+ );
+
+ private final String selfAddress = Main.HOST;
+ private final int selfPort = Main.PORT;
+
+ private State currentState = State.FOLLOWER;
private int currentTerm = 0;
private String votedFor = null;
- private Set votesReceived = new HashSet<>();
+ private String currentLeader = null;
- private final NodeManager nodeManager;
- private final UdpMessageSender sender;
- private final ScheduledExecutorService electionExecutor = Executors.newSingleThreadScheduledExecutor();
+ private final Set receivedVotes = ConcurrentHashMap.newKeySet();
+ private final Random random = new Random();
+ private long electionTimeout = generateElectionTimeout();
+ private long lastHeartbeatTime = System.currentTimeMillis();
- public Raft(NodeManager nodeManager, UdpMessageSender sender) {
- this.nodeManager = nodeManager;
- this.sender = sender;
- nodeManager.getNodes().removeIf(node -> node.hostname().equals(NodeManager.HOST) && node.port() == NodeManager.PORT);
+ private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
+ private final ExecutorService electionExecutor = Executors.newSingleThreadExecutor();
+
+ private final UdpListener listener;
+ private final UdpSender sender;
+ private final Gson gson = new Gson();
+
+ public Raft() {
+ this.listener = new UdpListener(selfPort, this::processMessage);
+ this.sender = new UdpSender();
start();
}
- public void start() {
- logger.info("Raft initialization. Total peers: {}", nodeManager.getNodes().size());
- becomeFollower(1);
+ private void start() {
+ executorService.scheduleAtFixedRate(this::checkElectionTimeout, 50, 50, TimeUnit.MILLISECONDS);
+ executorService.scheduleAtFixedRate(this::sendHeartbeats, 100, 100, TimeUnit.MILLISECONDS);
+
+ listener.startListening();
+ logger.info("Node {} started", selfAddress);
}
- private void becomeFollower(int term) {
- if (term > currentTerm) {
- state = RaftStates.FOLLOWER;
- currentTerm = term;
- votedFor = null;
- votesReceived.clear();
- logger.debug("Transitioned to FOLLOWER. Term: {}", term);
+ private long generateElectionTimeout() {
+ return System.currentTimeMillis() + 150 + random.nextInt(150);
+ }
+
+ private void checkElectionTimeout() {
+ long currentTime = System.currentTimeMillis();
+
+ if (currentState != State.LEADER && currentTime > electionTimeout) {
+ startElection();
}
- scheduleElectionTimeout();
}
- private void becomeCandidate() {
- state = RaftStates.CANDIDATE;
+ private void startElection() {
+ currentState = State.CANDIDATE;
currentTerm++;
- votedFor = NodeManager.HOST + ":" + NodeManager.PORT;
- votesReceived = new HashSet<>(Set.of(votedFor)); // Self vote
+ votedFor = selfAddress;
+ receivedVotes.clear();
+ receivedVotes.add(selfAddress);
+ electionTimeout = generateElectionTimeout();
- // logger.info("Starting election in Term {}. Attempting to collect votes.", currentTerm);
- sendVoteRequests();
+ logger.info("Node {} starting election for term {}", selfAddress, currentTerm);
+
+ electionExecutor.submit(() -> {
+ for (String node : NODES) {
+ if (!node.equals(selfAddress)) {
+ UdpListener.JsonMessage message = new UdpListener.JsonMessage("VOTE_REQUEST", currentTerm, selfAddress + ":" + selfPort, null);
+ sendMessage(node, message);
+ }
+ }
+
+ try {
+ Thread.sleep(100);
+ tallyVotes();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+
+ private void tallyVotes() {
+ long requiredVotes = (NODES.size() / 2) + 1;
+
+ if (receivedVotes.size() >= requiredVotes) {
+ becomeLeader();
+ } else {
+ currentState = State.FOLLOWER;
+ votedFor = null;
+ electionTimeout = generateElectionTimeout();
+ logger.info("Node {} failed to become leader for term {}", selfAddress, currentTerm);
+ }
}
private void becomeLeader() {
- if (state == RaftStates.CANDIDATE &&
- votesReceived.size() >= (nodeManager.getNodes().size() / QUORUM_FACTOR + 1)) {
- state = RaftStates.LEADER;
- logger.info("LEADER ELECTION SUCCESS: Becoming LEADER in Term {}", currentTerm);
- votesReceived.clear();
+ if (currentState == State.CANDIDATE) {
+ currentState = State.LEADER;
+ currentLeader = selfAddress;
+ logger.info("Node {} became leader for term {}", selfAddress, currentTerm);
sendHeartbeats();
+ notifyManager();
}
}
- private void sendVoteRequests() {
- String voteRequest = String.format("REQUEST_VOTE|%d|%s:%d",
- currentTerm, NodeManager.HOST, NodeManager.PORT);
+ private void notifyManager() {
+ try {
+ URL url = new URL("http://manager:8081/update_leader");
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setDoOutput(true);
- for (NodeInfo peer : nodeManager.getNodes()) {
- sender.sendMessage(peer, voteRequest);
+ String jsonBody = String.format(
+ "{\"leaderHost\": \"%s\", \"leaderPort\": %d}",
+ selfAddress.split(":")[0], selfPort
+ );
+
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonBody.getBytes(StandardCharsets.UTF_8));
+ os.flush();
+ }
+
+ int responseCode = connection.getResponseCode();
+ } catch (Exception e) {
+ logger.error("Failed to notify manager of leadership: {}", e.getMessage(), e);
}
}
private void sendHeartbeats() {
- if (state != RaftStates.LEADER) return;
-
- String heartbeat = String.format("HEARTBEAT|%d|%s:%d",
- currentTerm, NodeManager.HOST, NodeManager.PORT);
-
- for (NodeInfo peer : nodeManager.getNodes()) {
- sender.sendMessage(peer, heartbeat);
+ if (currentState == State.LEADER) {
+ for (String node : NODES) {
+ if (!node.equals(selfAddress)) {
+ UdpListener.JsonMessage message = new UdpListener.JsonMessage("HEARTBEAT", currentTerm, selfAddress, null);
+ sendMessage(node, message);
+ }
+ }
}
-
- // Reschedule heartbeats
- electionExecutor.schedule(this::sendHeartbeats, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
}
- private void scheduleElectionTimeout() {
- int timeout = MIN_ELECTION_TIMEOUT + RANDOM.nextInt(MAX_ELECTION_TIMEOUT - MIN_ELECTION_TIMEOUT);
-
- electionExecutor.schedule(() -> {
- if (state != RaftStates.LEADER) {
- logger.debug("Election timeout triggered. Starting new election.");
- becomeCandidate();
- }
- }, timeout, TimeUnit.MILLISECONDS);
- }
-
- public void processMessage(String message) {
- String[] parts = message.split("\\|");
- if (parts.length < 3) return;
-
+ private void processMessage(UdpListener.JsonMessage message) {
try {
- String type = parts[0];
- int messageTerm = Integer.parseInt(parts[1]);
- String sender = parts[2];
-
- // Term comparison and potential state change
- if (messageTerm > currentTerm) {
- becomeFollower(messageTerm);
- }
-
- switch (type) {
- case "REQUEST_VOTE":
- handleRequestVote(messageTerm, sender);
+ switch (message.type) {
+ case "VOTE_REQUEST":
+ handleVoteRequest(message.term, message.sender);
+ break;
+ case "VOTE_RESPONSE":
+ handleVoteResponse(message.term, message.sender, message.additionalData);
break;
case "HEARTBEAT":
- handleHeartbeat(messageTerm, sender);
- break;
- case "VOTE_GRANTED":
- handleVoteGranted(messageTerm, sender);
+ handleHeartbeat(message.term, message.sender);
break;
}
- } catch (NumberFormatException e) {
- logger.error("Error processing message: {}", message);
+ } catch (Exception e) {
+ logger.error("Error processing message: {}", gson.toJson(message), e);
}
}
- private void handleRequestVote(int term, String candidate) {
+ private void handleVoteRequest(int term, String candidate) {
boolean voteGranted = false;
- if (term >= currentTerm &&
- (votedFor == null || votedFor.equals(candidate))) {
- voteGranted = true;
- votedFor = candidate;
+ if (term > currentTerm) {
currentTerm = term;
- scheduleElectionTimeout();
+ currentState = State.FOLLOWER;
+ votedFor = null;
}
- String response = String.format("VOTE_GRANTED|%d|%s", term, voteGranted);
- sender.sendMessage(NodeInfo.fromString(candidate), response);
+ if (term == currentTerm && (votedFor == null || votedFor.equals(candidate)) && currentState != State.LEADER) {
+ voteGranted = true;
+ votedFor = candidate;
+ electionTimeout = generateElectionTimeout();
+ }
+
+ UdpListener.JsonMessage response = new UdpListener.JsonMessage("VOTE_RESPONSE", term, selfAddress + ":" + selfPort, voteGranted ? "GRANTED" : "REJECTED");
+ sendMessage(candidate, response);
+ }
+
+ private void handleVoteResponse(int term, String sender, String response) {
+ if (currentState != State.CANDIDATE || term != currentTerm) return;
+
+ if ("GRANTED".equals(response)) {
+ receivedVotes.add(sender);
+ }
}
private void handleHeartbeat(int term, String leader) {
if (term >= currentTerm) {
- scheduleElectionTimeout();
- becomeFollower(term);
+ currentTerm = term;
+ currentState = State.FOLLOWER;
+ currentLeader = leader;
+ lastHeartbeatTime = System.currentTimeMillis();
+ electionTimeout = generateElectionTimeout();
}
}
- private void handleVoteGranted(int term, String candidate) {
- if (state == RaftStates.CANDIDATE && term == currentTerm) {
- votesReceived.add(candidate);
- logger.debug("Vote received from {}. Total votes: {}/{}",
- candidate, votesReceived.size(), (nodeManager.getNodes().size() / QUORUM_FACTOR + 1));
-
- becomeLeader();
+ private void sendMessage(String address, UdpListener.JsonMessage message) {
+ try {
+ String[] parts = address.split(":");
+ if (parts.length != 2) {
+ logger.error("Invalid address format: {}", address);
+ return;
+ }
+ String host = parts[0];
+ int port = Integer.parseInt(parts[1]);
+ sender.sendMessage(host, port, message);
+ } catch (NumberFormatException e) {
+ logger.error("Invalid port in address: {}", address, e);
+ } catch (Exception e) {
+ logger.error("Error sending message to {}: {}", address, e.getMessage(), e);
}
}
-
- public void shutdown() {
- electionExecutor.shutdownNow();
- }
-}
\ No newline at end of file
+}
diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java
deleted file mode 100644
index af2d050..0000000
--- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package io.github.lumijiez.raft;
-
-public enum RaftStates {
- FOLLOWER,
- CANDIDATE,
- LEADER
-}
diff --git a/SymphonyDiscovery/Dockerfile b/SymphonyDiscovery/Dockerfile
deleted file mode 100644
index abe6a7f..0000000
--- a/SymphonyDiscovery/Dockerfile
+++ /dev/null
@@ -1,24 +0,0 @@
-FROM ubuntu:latest
-LABEL authors="lumijiez"
-
-FROM maven:3.9.9-eclipse-temurin-21 AS build
-
-WORKDIR /app
-
-COPY pom.xml .
-
-RUN mvn dependency:go-offline
-
-COPY src /app/src
-
-RUN mvn clean package -DskipTests
-
-FROM openjdk:21
-
-WORKDIR /app
-
-COPY --from=build /app/target/SymphonyDiscovery-1.0-SNAPSHOT.jar /app/SymphonyDiscovery.jar
-
-EXPOSE 8083
-
-ENTRYPOINT ["java", "-jar", "SymphonyDiscovery.jar"]
diff --git a/SymphonyDiscovery/pom.xml b/SymphonyDiscovery/pom.xml
deleted file mode 100644
index 331d17b..0000000
--- a/SymphonyDiscovery/pom.xml
+++ /dev/null
@@ -1,94 +0,0 @@
-
-
- 4.0.0
- io.github.lumijiez
- SymphonyDiscovery
- 1.0-SNAPSHOT
-
- jar
-
-
-
- com.google.code.gson
- gson
- 2.11.0
-
-
-
- org.apache.logging.log4j
- log4j-api
- 2.24.2
-
-
- org.apache.logging.log4j
- log4j-core
- 2.24.2
-
-
-
- io.javalin
- javalin
- 6.3.0
-
-
-
- org.slf4j
- slf4j-simple
- 2.0.16
-
-
- ch.qos.logback
- logback-classic
- 1.4.12
-
-
-
- ch.qos.logback
- logback-core
- 1.4.14
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.8.1
-
- 21
- 21
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.2.1
-
-
- package
-
- shade
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
- 3.1.0
-
-
-
- io.github.lumijiez.Main
-
-
-
-
-
-
-
diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java
deleted file mode 100644
index 48583d0..0000000
--- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package io.github.lumijiez;
-
-import com.google.gson.Gson;
-import io.javalin.Javalin;
-import io.javalin.websocket.WsContext;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class JavalinConfig {
- private static final Map registeredNodes = new ConcurrentHashMap<>();
- private static final Map nodes = new ConcurrentHashMap<>();
- private static final Gson gson = new Gson();
- private static final Logger logger = LogManager.getLogger(JavalinConfig.class);
-
- public static void setup(Javalin app) {
- app.ws("/discovery", ws -> {
- ws.onConnect(ctx -> {
- // ToDo
- // A general notification
- nodes.put(ctx.sessionId(), ctx);
- });
-
- ws.onMessage(ctx -> {
- String message = ctx.message();
- NodeInfo nodeInfo = gson.fromJson(message, NodeInfo.class);
- registeredNodes.put(ctx.sessionId(), nodeInfo);
- broadcastNodeCount();
- });
-
- ws.onClose(ctx -> {
- registeredNodes.remove(ctx.sessionId());
- broadcastNodeCount();
- });
- });
- }
-
- private static void broadcastNodeCount() {
- List nodeInfoList = new ArrayList<>(registeredNodes.values());
- String nodesJson = gson.toJson(nodeInfoList);
- nodes.values().forEach(ctx -> ctx.send(nodesJson));
- }
-
- public record NodeInfo(String hostname, int port) { }
-}
diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java
deleted file mode 100644
index ed7f688..0000000
--- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package io.github.lumijiez;
-
-import io.javalin.Javalin;
-import io.javalin.json.JavalinGson;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.time.Duration;
-
-public class Main {
- public static void main(String[] args) {
- Logger logger = LogManager.getLogger(Main.class);
-
- Javalin app = Javalin.create(config -> {
- config.jsonMapper(new JavalinGson());
- config.jetty.modifyWebSocketServletFactory(wsFactoryConfig -> {
- wsFactoryConfig.setIdleTimeout(Duration.ZERO);
- });
- }).start(8083);
-
- JavalinConfig.setup(app);
-
- logger.info("Discovery service up and running");
- }
-}
\ No newline at end of file
diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/models/requests/RegisterRequest.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/models/requests/RegisterRequest.java
deleted file mode 100644
index 823f748..0000000
--- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/models/requests/RegisterRequest.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package io.github.lumijiez.models.requests;
-
-public class RegisterRequest {
- public String hostname;
- public String port;
-
- public RegisterRequest(String hostname, String port) {
- this.hostname = hostname;
- this.port = port;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public String getPort() {
- return port;
- }
-}
diff --git a/SymphonyDiscovery/src/main/resources/log4j2.xml b/SymphonyDiscovery/src/main/resources/log4j2.xml
deleted file mode 100644
index 20f2b90..0000000
--- a/SymphonyDiscovery/src/main/resources/log4j2.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
-
-
-
- %d{yyyy-MM-dd HH:mm:ss} %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow, INFO=green, DEBUG=blue}: %highlight{%msg}{FATAL=red, ERROR=red}%n
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/SymphonyDiscovery/src/main/resources/simplelogger.properties b/SymphonyDiscovery/src/main/resources/simplelogger.properties
deleted file mode 100644
index b6c5c5b..0000000
--- a/SymphonyDiscovery/src/main/resources/simplelogger.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-org.slf4j.simpleLogger.defaultLogLevel=off
-
-org.slf4j.simpleLogger.log.io.javalin=off
-
-org.slf4j.simpleLogger.log.org.eclipse.jetty=off
-org.slf4j.simpleLogger.log.org.eclipse.jetty.server=off
-org.slf4j.simpleLogger.log.org.eclipse.jetty.util=off
-
-# org.slf4j.simpleLogger.log.org.springframework=off
-# org.slf4j.simpleLogger.log.org.hibernate=off
-
-org.slf4j.simpleLogger.showDateTime=true
-
-org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z
-
-org.slf4j.simpleLogger.showThreadName=true
-
-org.slf4j.simpleLogger.showLogName=true
-
-org.slf4j.simpleLogger.showShortLogName=false
diff --git a/SymphonyManager/pom.xml b/SymphonyManager/pom.xml
index cd08f82..f24c54e 100644
--- a/SymphonyManager/pom.xml
+++ b/SymphonyManager/pom.xml
@@ -22,30 +22,6 @@
2.11.0
-
- org.slf4j
- slf4j-api
- 1.7.32
-
-
-
- ch.qos.logback
- logback-classic
- 1.5.12
-
-
-
- ch.qos.logback
- logback-core
- 1.5.12
-
-
-
- org.slf4j
- slf4j-simple
- 1.7.32
-
-
org.apache.logging.log4j
log4j-api
@@ -56,6 +32,29 @@
log4j-core
2.24.2
+
+
+ io.javalin
+ javalin
+ 6.3.0
+
+
+
+ org.slf4j
+ slf4j-simple
+ 2.0.16
+
+
+ ch.qos.logback
+ logback-classic
+ 1.4.12
+
+
+
+ ch.qos.logback
+ logback-core
+ 1.4.14
+
diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java
new file mode 100644
index 0000000..b143257
--- /dev/null
+++ b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java
@@ -0,0 +1,45 @@
+package io.github.lumijiez;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+public class AddressHolder {
+ private String host;
+ private int port;
+
+ private static AddressHolder INSTANCE;
+
+ private AddressHolder() {
+// Timer timer = new Timer(true);
+// timer.scheduleAtFixedRate(new TimerTask() {
+// @Override
+// public void run() {
+// Main.logger.info("Host: {}, Port: {}", host, port);
+// }
+// }, 0, 1000);
+ }
+
+ public static AddressHolder getInstance() {
+ if (INSTANCE == null) {
+ INSTANCE = new AddressHolder();
+ }
+ return INSTANCE;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+}
+
diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/JavalinHttpConfig.java b/SymphonyManager/src/main/java/io/github/lumijiez/JavalinHttpConfig.java
new file mode 100644
index 0000000..b473eee
--- /dev/null
+++ b/SymphonyManager/src/main/java/io/github/lumijiez/JavalinHttpConfig.java
@@ -0,0 +1,20 @@
+package io.github.lumijiez;
+
+import io.github.lumijiez.requests.UpdateLeaderRequest;
+import io.javalin.Javalin;
+import io.javalin.http.Context;
+
+public class JavalinHttpConfig {
+ public static void setup(Javalin app) {
+ app.post("/update_leader", JavalinHttpConfig::handleUpdateLeader);
+ }
+
+ public static void handleUpdateLeader(Context ctx) {
+ UpdateLeaderRequest request = ctx.bodyAsClass(UpdateLeaderRequest.class);
+
+ AddressHolder.getInstance().setHost(request.getLeaderHost());
+ AddressHolder.getInstance().setPort(request.getLeaderPort());
+
+ Main.logger.info("Changed host to leader: {}:{}", request.getLeaderHost(), request.getLeaderPort());
+ }
+}
diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/Main.java b/SymphonyManager/src/main/java/io/github/lumijiez/Main.java
index 07409b3..0ab2345 100644
--- a/SymphonyManager/src/main/java/io/github/lumijiez/Main.java
+++ b/SymphonyManager/src/main/java/io/github/lumijiez/Main.java
@@ -1,8 +1,26 @@
package io.github.lumijiez;
-public class Main {
+import io.javalin.Javalin;
+import io.javalin.json.JavalinGson;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.time.Duration;
+
+public class Main {
+ public static Logger logger = LogManager.getLogger(Main.class);
public static void main(String[] args) {
- BrokerConnector.connect();
+ new Thread(BrokerConnector::connect, "RabbitMQ-Connection").start();
+
+ Javalin app = Javalin.create(config -> {
+ config.jsonMapper(new JavalinGson());
+ config.jetty.modifyWebSocketServletFactory(wsFactoryConfig -> {
+ wsFactoryConfig.setIdleTimeout(Duration.ZERO);
+ });
+ }).start(8081);
+
+ JavalinHttpConfig.setup(app);
+
+ logger.info("Discovery service up and running");
}
}
\ No newline at end of file
diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/requests/UpdateLeaderRequest.java b/SymphonyManager/src/main/java/io/github/lumijiez/requests/UpdateLeaderRequest.java
new file mode 100644
index 0000000..b9e18b7
--- /dev/null
+++ b/SymphonyManager/src/main/java/io/github/lumijiez/requests/UpdateLeaderRequest.java
@@ -0,0 +1,22 @@
+package io.github.lumijiez.requests;
+
+public class UpdateLeaderRequest {
+ private String leaderHost;
+ private int leaderPort;
+
+ public void setLeaderHost(String leaderHost) {
+ this.leaderHost = leaderHost;
+ }
+
+ public void setLeaderPort(int leaderPort) {
+ this.leaderPort = leaderPort;
+ }
+
+ public int getLeaderPort() {
+ return leaderPort;
+ }
+
+ public String getLeaderHost() {
+ return leaderHost;
+ }
+}
diff --git a/config/logs/rabbitmq/rabbitmq.log b/config/logs/rabbitmq/rabbitmq.log
index e69de29..d6d2c45 100644
--- a/config/logs/rabbitmq/rabbitmq.log
+++ b/config/logs/rabbitmq/rabbitmq.log
@@ -0,0 +1 @@
+2024-12-12 20:43:45.086648+00:00 [error] <0.1439.0> closing AMQP 1.0 connection (172.18.0.1:54074 -> 172.18.0.3:5672, duration: '775ms'): RabbitMQ requires SASL security layer (expected protocol ID 3, but client sent protocol ID 2)
diff --git a/docker-compose.yml b/docker-compose.yml
index 86e7833..8af3b3f 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -23,18 +23,6 @@ services:
depends_on:
- rabbitmq
- symphony-discovery:
- container_name: discovery
- build:
- context: ./SymphonyDiscovery
- dockerfile: Dockerfile
- ports:
- - "8083:8083"
- networks:
- - symphony-network
- depends_on:
- - postgres_db
-
symphony-smtp:
container_name: smtp
build:
@@ -59,7 +47,6 @@ services:
- UDP_PORT=8105
depends_on:
- postgres_db
- - symphony-discovery
node2:
container_name: node2
@@ -77,7 +64,6 @@ services:
- UDP_PORT=8106
depends_on:
- postgres_db
- - symphony-discovery
node3:
container_name: node3
@@ -95,7 +81,6 @@ services:
- UDP_PORT=8107
depends_on:
- postgres_db
- - symphony-discovery
node4:
container_name: node4
@@ -113,7 +98,6 @@ services:
- UDP_PORT=8108
depends_on:
- postgres_db
- - symphony-discovery
node5:
container_name: node5
@@ -131,7 +115,6 @@ services:
- UDP_PORT=8109
depends_on:
- postgres_db
- - symphony-discovery
postgres_db:
image: postgres:latest
diff --git a/pom.xml b/pom.xml
index e6db36a..0cd743a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,7 +13,6 @@
SymphonyManager
SymphonyProducer
- SymphonyDiscovery
SymphonyDatabaseNode
SymphonySMTP