From 8b5f755152f2d2f6ebee4be8071e907dadd9158a Mon Sep 17 00:00:00 2001 From: Daniel <59575049+lumijiez@users.noreply.github.com> Date: Thu, 12 Dec 2024 23:26:47 +0200 Subject: [PATCH] huge rework, RAFT WORKING --- .idea/misc.xml | 1 + .../main/java/io/github/lumijiez/Main.java | 9 +- .../io/github/lumijiez/app/NodeManager.java | 60 ---- .../github/lumijiez/data/models/NodeInfo.java | 16 - .../github/lumijiez/network/UdpListener.java | 52 +++- .../lumijiez/network/UdpMessageSender.java | 31 -- .../io/github/lumijiez/network/UdpSender.java | 27 ++ .../lumijiez/network/WebSocketManager.java | 81 ----- .../java/io/github/lumijiez/raft/Raft.java | 289 +++++++++++------- .../io/github/lumijiez/raft/RaftStates.java | 7 - SymphonyDiscovery/Dockerfile | 24 -- SymphonyDiscovery/pom.xml | 94 ------ .../io/github/lumijiez/JavalinConfig.java | 49 --- .../main/java/io/github/lumijiez/Main.java | 25 -- .../models/requests/RegisterRequest.java | 19 -- .../src/main/resources/log4j2.xml | 21 -- .../main/resources/simplelogger.properties | 20 -- SymphonyManager/pom.xml | 47 ++- .../io/github/lumijiez/AddressHolder.java | 45 +++ .../io/github/lumijiez/JavalinHttpConfig.java | 20 ++ .../main/java/io/github/lumijiez/Main.java | 22 +- .../requests/UpdateLeaderRequest.java | 22 ++ config/logs/rabbitmq/rabbitmq.log | 1 + docker-compose.yml | 17 -- pom.xml | 1 - 25 files changed, 375 insertions(+), 625 deletions(-) delete mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java delete mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java delete mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java create mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpSender.java delete mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java delete mode 100644 SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java delete mode 100644 SymphonyDiscovery/Dockerfile delete mode 100644 SymphonyDiscovery/pom.xml delete mode 100644 SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java delete mode 100644 SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java delete mode 100644 SymphonyDiscovery/src/main/java/io/github/lumijiez/models/requests/RegisterRequest.java delete mode 100644 SymphonyDiscovery/src/main/resources/log4j2.xml delete mode 100644 SymphonyDiscovery/src/main/resources/simplelogger.properties create mode 100644 SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java create mode 100644 SymphonyManager/src/main/java/io/github/lumijiez/JavalinHttpConfig.java create mode 100644 SymphonyManager/src/main/java/io/github/lumijiez/requests/UpdateLeaderRequest.java diff --git a/.idea/misc.xml b/.idea/misc.xml index d4fe832..ec9392a 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -9,6 +9,7 @@ diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java index 0d06f91..02d93ac 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java @@ -1,14 +1,13 @@ package io.github.lumijiez; - -import io.github.lumijiez.app.NodeManager; +import io.github.lumijiez.raft.Raft; public class Main { - + public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost"); + public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084")); public static void main(String[] args) { - NodeManager manager = new NodeManager(); - try { + Raft raft = new Raft(); Thread.currentThread().join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java deleted file mode 100644 index c2dfc6b..0000000 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java +++ /dev/null @@ -1,60 +0,0 @@ -package io.github.lumijiez.app; - -import io.github.lumijiez.data.models.NodeInfo; -import io.github.lumijiez.network.UdpListener; -import io.github.lumijiez.network.UdpMessageSender; -import io.github.lumijiez.network.WebSocketManager; -import io.github.lumijiez.raft.Raft; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; - -public class NodeManager { - private final Logger logger = LogManager.getLogger(NodeManager.class); - - private final UdpListener listener; - private final UdpMessageSender sender; - private final WebSocketManager ws; - private final Raft raft; - private final List nodes = new ArrayList<>(); - public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost"); - public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084")); - - public NodeManager() { - this.listener = new UdpListener(this); - this.sender = new UdpMessageSender(this); - this.ws = new WebSocketManager(this); - this.raft = new Raft(this, sender); - - listener.startListening(); - ws.connectAndListen(); - raft.start(); - } - - public void handleMessage(String message) { - raft.processMessage(message); - } - - public List getNodes() { - return nodes; - } - - public void registerNode(NodeInfo node) { - if (!nodes.contains(node)) { - nodes.add(node); - sender.sendMessage(node, "NODE_REGISTERED"); - // logger.info("New node registered, updating Raft peers"); - } - } - - public void removeNode(NodeInfo node) { - nodes.remove(node); - // logger.info("Node removed, updating Raft peers"); - } - - public UdpMessageSender getSender() { - return sender; - } -} \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java deleted file mode 100644 index 1cc6958..0000000 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.github.lumijiez.data.models; - -public record NodeInfo(String hostname, int port) { - public static NodeInfo fromString(String nodeString) { - String[] parts = nodeString.split(":"); - if (parts.length != 2) { - throw new IllegalArgumentException("Invalid node string format. Expected 'hostname:port'"); - } - return new NodeInfo(parts[0], Integer.parseInt(parts[1])); - } - - @Override - public String toString() { - return hostname + ":" + port; - } -} diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java index 30b9aac..22e918e 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java @@ -1,6 +1,5 @@ package io.github.lumijiez.network; -import io.github.lumijiez.app.NodeManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -11,26 +10,31 @@ import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; +import java.util.function.Consumer; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; public class UdpListener { private static final Logger logger = LogManager.getLogger(UdpListener.class); + private final int port; + private final Consumer messageHandler; + private final Gson gson = new Gson(); - private final NodeManager nodeManager; - - public UdpListener(NodeManager nodeManager) { - this.nodeManager = nodeManager; + public UdpListener(int port, Consumer messageHandler) { + this.port = port; + this.messageHandler = messageHandler; } public void startListening() { - Thread udpListenerThread = new Thread(() -> { + Thread listenerThread = new Thread(() -> { try (Selector selector = Selector.open(); DatagramChannel channel = DatagramChannel.open()) { - channel.bind(new InetSocketAddress(NodeManager.PORT)); + channel.bind(new InetSocketAddress(port)); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); - logger.info("UDP listens on port {}", NodeManager.PORT); + logger.info("Listening for UDP messages on port {}", port); ByteBuffer buffer = ByteBuffer.allocate(1024); while (!Thread.currentThread().isInterrupted()) { @@ -48,15 +52,37 @@ public class UdpListener { buffer.flip(); String message = new String(buffer.array(), 0, buffer.limit()).trim(); - // logger.info("Received UDP {}:{}: {}", sender.getHostName(), sender.getPort(), message); - nodeManager.handleMessage(message); + logger.info("Received message from {}:{} - {}", sender.getHostName(), sender.getPort(), message); + try { + JsonMessage jsonMessage = gson.fromJson(message, JsonMessage.class); + messageHandler.accept(jsonMessage); + } catch (JsonSyntaxException e) { + logger.error("Invalid JSON received: {}", message, e); + } } } } } catch (IOException e) { - logger.error("Error in UDP listener: {}", e.getMessage()); + logger.error("Error in UDP listener: {}", e.getMessage(), e); } }); - udpListenerThread.start(); + + listenerThread.start(); } -} \ No newline at end of file + + public static class JsonMessage { + public String type; + public int term; + public String sender; + public String additionalData; + + public JsonMessage() {} + + public JsonMessage(String type, int term, String sender, String additionalData) { + this.type = type; + this.term = term; + this.sender = sender; + this.additionalData = additionalData; + } + } +} diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java deleted file mode 100644 index 8db0e5d..0000000 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java +++ /dev/null @@ -1,31 +0,0 @@ -package io.github.lumijiez.network; - -import io.github.lumijiez.app.NodeManager; -import io.github.lumijiez.data.models.NodeInfo; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; - -public class UdpMessageSender { - private static final Logger logger = LogManager.getLogger(UdpMessageSender.class); - private final NodeManager nodeManager; - - public UdpMessageSender(NodeManager nodeManager) { - this.nodeManager = nodeManager; - } - - public void sendMessage(NodeInfo node, String message) { - try (DatagramChannel channel = DatagramChannel.open()) { - channel.configureBlocking(false); - ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); - channel.send(buffer, new InetSocketAddress(node.hostname(), node.port())); - // logger.info("Sent UDP to {}:{}: {}", node.hostname(), node.port(), message); - } catch (IOException e) { - logger.error("Error sending UDP message: {}", e.getMessage()); - } - } -} \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpSender.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpSender.java new file mode 100644 index 0000000..ee5c64a --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpSender.java @@ -0,0 +1,27 @@ +package io.github.lumijiez.network; + +import com.google.gson.Gson; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +public class UdpSender { + private static final Logger logger = LogManager.getLogger(UdpSender.class); + private final Gson gson = new Gson(); + + public void sendMessage(String hostname, int port, Object message) { + try (DatagramChannel channel = DatagramChannel.open()) { + channel.configureBlocking(false); + String jsonMessage = gson.toJson(message); + ByteBuffer buffer = ByteBuffer.wrap(jsonMessage.getBytes()); + channel.send(buffer, new InetSocketAddress(hostname, port)); + logger.info("Sent message to {}:{} - {}", hostname, port, jsonMessage); + } catch (IOException e) { + logger.error("Error sending UDP message: {}", e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java deleted file mode 100644 index 1720786..0000000 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java +++ /dev/null @@ -1,81 +0,0 @@ -package io.github.lumijiez.network; - -import io.github.lumijiez.app.NodeManager; -import io.github.lumijiez.data.models.NodeInfo; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.WebSocket; -import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CompletableFuture; -import java.lang.reflect.Type; - -public class WebSocketManager { - private static final Logger logger = LogManager.getLogger(WebSocketManager.class); - private static final Gson gson = new Gson(); - private static final CountDownLatch waitForConnection = new CountDownLatch(1); - - private WebSocket webSocket; - private final NodeManager nodeManager; - - public WebSocketManager(NodeManager nodeManager) { - this.nodeManager = nodeManager; - } - - public void connectAndListen() { - try (HttpClient client = HttpClient.newHttpClient()) { - CompletableFuture wsFuture = client - .newWebSocketBuilder() - .buildAsync(new URI("ws://symphony-discovery:8083/discovery"), new WebSocket.Listener() { - @Override - public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { - try { - Type nodeListType = new TypeToken>() {}.getType(); - List nodes = gson.fromJson(data.toString(), nodeListType); - - for (NodeInfo newNode : nodes) { - nodeManager.registerNode(newNode); - } - - } catch (Exception e) { - logger.error("Error processing WebSocket data: {}", e.getMessage()); - } - return WebSocket.Listener.super.onText(webSocket, data, last); - } - - @Override - public void onOpen(WebSocket webSocket) { - NodeInfo nodeInfo = new NodeInfo(NodeManager.HOST, NodeManager.PORT); - webSocket.sendText(gson.toJson(nodeInfo), true); - logger.info("Successfully registered to Discovery"); - waitForConnection.countDown(); - WebSocket.Listener.super.onOpen(webSocket); - } - - @Override - public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { - logger.info("Unregistered from Discovery: {}", reason); - return WebSocket.Listener.super.onClose(webSocket, statusCode, reason); - } - - @Override - public void onError(WebSocket webSocket, Throwable error) { - logger.error("WebSocket error: {}", error.getMessage()); - WebSocket.Listener.super.onError(webSocket, error); - } - }); - - webSocket = wsFuture.join(); - waitForConnection.await(); - - } catch (Exception e) { - logger.error("Error in WebSocketManager: {}", e.getMessage()); - } - } -} \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java index a001d21..53aa169 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java @@ -1,177 +1,234 @@ package io.github.lumijiez.raft; -import io.github.lumijiez.app.NodeManager; -import io.github.lumijiez.data.models.NodeInfo; -import io.github.lumijiez.network.UdpMessageSender; +import com.google.gson.Gson; +import io.github.lumijiez.Main; +import io.github.lumijiez.network.UdpListener; +import io.github.lumijiez.network.UdpSender; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.*; public class Raft { - private static final Random RANDOM = new Random(); - private final Logger logger = LogManager.getLogger(Raft.class); + private static final Logger logger = LogManager.getLogger(Raft.class); - // Configuration Parameters - private static final int MIN_ELECTION_TIMEOUT = 300; - private static final int MAX_ELECTION_TIMEOUT = 600; - private static final int HEARTBEAT_INTERVAL = 100; - private static final int QUORUM_FACTOR = 2; + private enum State { + FOLLOWER, CANDIDATE, LEADER + } - // State Variables - private RaftStates state = RaftStates.FOLLOWER; + private static final List NODES = Arrays.asList( + "node1:8105", "node2:8106", "node3:8107", "node4:8108", "node5:8109" + ); + + private final String selfAddress = Main.HOST; + private final int selfPort = Main.PORT; + + private State currentState = State.FOLLOWER; private int currentTerm = 0; private String votedFor = null; - private Set votesReceived = new HashSet<>(); + private String currentLeader = null; - private final NodeManager nodeManager; - private final UdpMessageSender sender; - private final ScheduledExecutorService electionExecutor = Executors.newSingleThreadScheduledExecutor(); + private final Set receivedVotes = ConcurrentHashMap.newKeySet(); + private final Random random = new Random(); + private long electionTimeout = generateElectionTimeout(); + private long lastHeartbeatTime = System.currentTimeMillis(); - public Raft(NodeManager nodeManager, UdpMessageSender sender) { - this.nodeManager = nodeManager; - this.sender = sender; - nodeManager.getNodes().removeIf(node -> node.hostname().equals(NodeManager.HOST) && node.port() == NodeManager.PORT); + private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); + private final ExecutorService electionExecutor = Executors.newSingleThreadExecutor(); + + private final UdpListener listener; + private final UdpSender sender; + private final Gson gson = new Gson(); + + public Raft() { + this.listener = new UdpListener(selfPort, this::processMessage); + this.sender = new UdpSender(); start(); } - public void start() { - logger.info("Raft initialization. Total peers: {}", nodeManager.getNodes().size()); - becomeFollower(1); + private void start() { + executorService.scheduleAtFixedRate(this::checkElectionTimeout, 50, 50, TimeUnit.MILLISECONDS); + executorService.scheduleAtFixedRate(this::sendHeartbeats, 100, 100, TimeUnit.MILLISECONDS); + + listener.startListening(); + logger.info("Node {} started", selfAddress); } - private void becomeFollower(int term) { - if (term > currentTerm) { - state = RaftStates.FOLLOWER; - currentTerm = term; - votedFor = null; - votesReceived.clear(); - logger.debug("Transitioned to FOLLOWER. Term: {}", term); + private long generateElectionTimeout() { + return System.currentTimeMillis() + 150 + random.nextInt(150); + } + + private void checkElectionTimeout() { + long currentTime = System.currentTimeMillis(); + + if (currentState != State.LEADER && currentTime > electionTimeout) { + startElection(); } - scheduleElectionTimeout(); } - private void becomeCandidate() { - state = RaftStates.CANDIDATE; + private void startElection() { + currentState = State.CANDIDATE; currentTerm++; - votedFor = NodeManager.HOST + ":" + NodeManager.PORT; - votesReceived = new HashSet<>(Set.of(votedFor)); // Self vote + votedFor = selfAddress; + receivedVotes.clear(); + receivedVotes.add(selfAddress); + electionTimeout = generateElectionTimeout(); - // logger.info("Starting election in Term {}. Attempting to collect votes.", currentTerm); - sendVoteRequests(); + logger.info("Node {} starting election for term {}", selfAddress, currentTerm); + + electionExecutor.submit(() -> { + for (String node : NODES) { + if (!node.equals(selfAddress)) { + UdpListener.JsonMessage message = new UdpListener.JsonMessage("VOTE_REQUEST", currentTerm, selfAddress + ":" + selfPort, null); + sendMessage(node, message); + } + } + + try { + Thread.sleep(100); + tallyVotes(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + private void tallyVotes() { + long requiredVotes = (NODES.size() / 2) + 1; + + if (receivedVotes.size() >= requiredVotes) { + becomeLeader(); + } else { + currentState = State.FOLLOWER; + votedFor = null; + electionTimeout = generateElectionTimeout(); + logger.info("Node {} failed to become leader for term {}", selfAddress, currentTerm); + } } private void becomeLeader() { - if (state == RaftStates.CANDIDATE && - votesReceived.size() >= (nodeManager.getNodes().size() / QUORUM_FACTOR + 1)) { - state = RaftStates.LEADER; - logger.info("LEADER ELECTION SUCCESS: Becoming LEADER in Term {}", currentTerm); - votesReceived.clear(); + if (currentState == State.CANDIDATE) { + currentState = State.LEADER; + currentLeader = selfAddress; + logger.info("Node {} became leader for term {}", selfAddress, currentTerm); sendHeartbeats(); + notifyManager(); } } - private void sendVoteRequests() { - String voteRequest = String.format("REQUEST_VOTE|%d|%s:%d", - currentTerm, NodeManager.HOST, NodeManager.PORT); + private void notifyManager() { + try { + URL url = new URL("http://manager:8081/update_leader"); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); - for (NodeInfo peer : nodeManager.getNodes()) { - sender.sendMessage(peer, voteRequest); + String jsonBody = String.format( + "{\"leaderHost\": \"%s\", \"leaderPort\": %d}", + selfAddress.split(":")[0], selfPort + ); + + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonBody.getBytes(StandardCharsets.UTF_8)); + os.flush(); + } + + int responseCode = connection.getResponseCode(); + } catch (Exception e) { + logger.error("Failed to notify manager of leadership: {}", e.getMessage(), e); } } private void sendHeartbeats() { - if (state != RaftStates.LEADER) return; - - String heartbeat = String.format("HEARTBEAT|%d|%s:%d", - currentTerm, NodeManager.HOST, NodeManager.PORT); - - for (NodeInfo peer : nodeManager.getNodes()) { - sender.sendMessage(peer, heartbeat); + if (currentState == State.LEADER) { + for (String node : NODES) { + if (!node.equals(selfAddress)) { + UdpListener.JsonMessage message = new UdpListener.JsonMessage("HEARTBEAT", currentTerm, selfAddress, null); + sendMessage(node, message); + } + } } - - // Reschedule heartbeats - electionExecutor.schedule(this::sendHeartbeats, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS); } - private void scheduleElectionTimeout() { - int timeout = MIN_ELECTION_TIMEOUT + RANDOM.nextInt(MAX_ELECTION_TIMEOUT - MIN_ELECTION_TIMEOUT); - - electionExecutor.schedule(() -> { - if (state != RaftStates.LEADER) { - logger.debug("Election timeout triggered. Starting new election."); - becomeCandidate(); - } - }, timeout, TimeUnit.MILLISECONDS); - } - - public void processMessage(String message) { - String[] parts = message.split("\\|"); - if (parts.length < 3) return; - + private void processMessage(UdpListener.JsonMessage message) { try { - String type = parts[0]; - int messageTerm = Integer.parseInt(parts[1]); - String sender = parts[2]; - - // Term comparison and potential state change - if (messageTerm > currentTerm) { - becomeFollower(messageTerm); - } - - switch (type) { - case "REQUEST_VOTE": - handleRequestVote(messageTerm, sender); + switch (message.type) { + case "VOTE_REQUEST": + handleVoteRequest(message.term, message.sender); + break; + case "VOTE_RESPONSE": + handleVoteResponse(message.term, message.sender, message.additionalData); break; case "HEARTBEAT": - handleHeartbeat(messageTerm, sender); - break; - case "VOTE_GRANTED": - handleVoteGranted(messageTerm, sender); + handleHeartbeat(message.term, message.sender); break; } - } catch (NumberFormatException e) { - logger.error("Error processing message: {}", message); + } catch (Exception e) { + logger.error("Error processing message: {}", gson.toJson(message), e); } } - private void handleRequestVote(int term, String candidate) { + private void handleVoteRequest(int term, String candidate) { boolean voteGranted = false; - if (term >= currentTerm && - (votedFor == null || votedFor.equals(candidate))) { - voteGranted = true; - votedFor = candidate; + if (term > currentTerm) { currentTerm = term; - scheduleElectionTimeout(); + currentState = State.FOLLOWER; + votedFor = null; } - String response = String.format("VOTE_GRANTED|%d|%s", term, voteGranted); - sender.sendMessage(NodeInfo.fromString(candidate), response); + if (term == currentTerm && (votedFor == null || votedFor.equals(candidate)) && currentState != State.LEADER) { + voteGranted = true; + votedFor = candidate; + electionTimeout = generateElectionTimeout(); + } + + UdpListener.JsonMessage response = new UdpListener.JsonMessage("VOTE_RESPONSE", term, selfAddress + ":" + selfPort, voteGranted ? "GRANTED" : "REJECTED"); + sendMessage(candidate, response); + } + + private void handleVoteResponse(int term, String sender, String response) { + if (currentState != State.CANDIDATE || term != currentTerm) return; + + if ("GRANTED".equals(response)) { + receivedVotes.add(sender); + } } private void handleHeartbeat(int term, String leader) { if (term >= currentTerm) { - scheduleElectionTimeout(); - becomeFollower(term); + currentTerm = term; + currentState = State.FOLLOWER; + currentLeader = leader; + lastHeartbeatTime = System.currentTimeMillis(); + electionTimeout = generateElectionTimeout(); } } - private void handleVoteGranted(int term, String candidate) { - if (state == RaftStates.CANDIDATE && term == currentTerm) { - votesReceived.add(candidate); - logger.debug("Vote received from {}. Total votes: {}/{}", - candidate, votesReceived.size(), (nodeManager.getNodes().size() / QUORUM_FACTOR + 1)); - - becomeLeader(); + private void sendMessage(String address, UdpListener.JsonMessage message) { + try { + String[] parts = address.split(":"); + if (parts.length != 2) { + logger.error("Invalid address format: {}", address); + return; + } + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + sender.sendMessage(host, port, message); + } catch (NumberFormatException e) { + logger.error("Invalid port in address: {}", address, e); + } catch (Exception e) { + logger.error("Error sending message to {}: {}", address, e.getMessage(), e); } } - - public void shutdown() { - electionExecutor.shutdownNow(); - } -} \ No newline at end of file +} diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java deleted file mode 100644 index af2d050..0000000 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.github.lumijiez.raft; - -public enum RaftStates { - FOLLOWER, - CANDIDATE, - LEADER -} diff --git a/SymphonyDiscovery/Dockerfile b/SymphonyDiscovery/Dockerfile deleted file mode 100644 index abe6a7f..0000000 --- a/SymphonyDiscovery/Dockerfile +++ /dev/null @@ -1,24 +0,0 @@ -FROM ubuntu:latest -LABEL authors="lumijiez" - -FROM maven:3.9.9-eclipse-temurin-21 AS build - -WORKDIR /app - -COPY pom.xml . - -RUN mvn dependency:go-offline - -COPY src /app/src - -RUN mvn clean package -DskipTests - -FROM openjdk:21 - -WORKDIR /app - -COPY --from=build /app/target/SymphonyDiscovery-1.0-SNAPSHOT.jar /app/SymphonyDiscovery.jar - -EXPOSE 8083 - -ENTRYPOINT ["java", "-jar", "SymphonyDiscovery.jar"] diff --git a/SymphonyDiscovery/pom.xml b/SymphonyDiscovery/pom.xml deleted file mode 100644 index 331d17b..0000000 --- a/SymphonyDiscovery/pom.xml +++ /dev/null @@ -1,94 +0,0 @@ - - - 4.0.0 - io.github.lumijiez - SymphonyDiscovery - 1.0-SNAPSHOT - - jar - - - - com.google.code.gson - gson - 2.11.0 - - - - org.apache.logging.log4j - log4j-api - 2.24.2 - - - org.apache.logging.log4j - log4j-core - 2.24.2 - - - - io.javalin - javalin - 6.3.0 - - - - org.slf4j - slf4j-simple - 2.0.16 - - - ch.qos.logback - logback-classic - 1.4.12 - - - - ch.qos.logback - logback-core - 1.4.14 - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - 21 - 21 - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.1 - - - package - - shade - - - - - - - org.apache.maven.plugins - maven-jar-plugin - 3.1.0 - - - - io.github.lumijiez.Main - - - - - - - diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java deleted file mode 100644 index 48583d0..0000000 --- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java +++ /dev/null @@ -1,49 +0,0 @@ -package io.github.lumijiez; - -import com.google.gson.Gson; -import io.javalin.Javalin; -import io.javalin.websocket.WsContext; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class JavalinConfig { - private static final Map registeredNodes = new ConcurrentHashMap<>(); - private static final Map nodes = new ConcurrentHashMap<>(); - private static final Gson gson = new Gson(); - private static final Logger logger = LogManager.getLogger(JavalinConfig.class); - - public static void setup(Javalin app) { - app.ws("/discovery", ws -> { - ws.onConnect(ctx -> { - // ToDo - // A general notification - nodes.put(ctx.sessionId(), ctx); - }); - - ws.onMessage(ctx -> { - String message = ctx.message(); - NodeInfo nodeInfo = gson.fromJson(message, NodeInfo.class); - registeredNodes.put(ctx.sessionId(), nodeInfo); - broadcastNodeCount(); - }); - - ws.onClose(ctx -> { - registeredNodes.remove(ctx.sessionId()); - broadcastNodeCount(); - }); - }); - } - - private static void broadcastNodeCount() { - List nodeInfoList = new ArrayList<>(registeredNodes.values()); - String nodesJson = gson.toJson(nodeInfoList); - nodes.values().forEach(ctx -> ctx.send(nodesJson)); - } - - public record NodeInfo(String hostname, int port) { } -} diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java deleted file mode 100644 index ed7f688..0000000 --- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java +++ /dev/null @@ -1,25 +0,0 @@ -package io.github.lumijiez; - -import io.javalin.Javalin; -import io.javalin.json.JavalinGson; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.time.Duration; - -public class Main { - public static void main(String[] args) { - Logger logger = LogManager.getLogger(Main.class); - - Javalin app = Javalin.create(config -> { - config.jsonMapper(new JavalinGson()); - config.jetty.modifyWebSocketServletFactory(wsFactoryConfig -> { - wsFactoryConfig.setIdleTimeout(Duration.ZERO); - }); - }).start(8083); - - JavalinConfig.setup(app); - - logger.info("Discovery service up and running"); - } -} \ No newline at end of file diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/models/requests/RegisterRequest.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/models/requests/RegisterRequest.java deleted file mode 100644 index 823f748..0000000 --- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/models/requests/RegisterRequest.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.github.lumijiez.models.requests; - -public class RegisterRequest { - public String hostname; - public String port; - - public RegisterRequest(String hostname, String port) { - this.hostname = hostname; - this.port = port; - } - - public String getHostname() { - return hostname; - } - - public String getPort() { - return port; - } -} diff --git a/SymphonyDiscovery/src/main/resources/log4j2.xml b/SymphonyDiscovery/src/main/resources/log4j2.xml deleted file mode 100644 index 20f2b90..0000000 --- a/SymphonyDiscovery/src/main/resources/log4j2.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - - %d{yyyy-MM-dd HH:mm:ss} %highlight{%-5level}{FATAL=red, ERROR=red, WARN=yellow, INFO=green, DEBUG=blue}: %highlight{%msg}{FATAL=red, ERROR=red}%n - - - - - - - - - - - - - - diff --git a/SymphonyDiscovery/src/main/resources/simplelogger.properties b/SymphonyDiscovery/src/main/resources/simplelogger.properties deleted file mode 100644 index b6c5c5b..0000000 --- a/SymphonyDiscovery/src/main/resources/simplelogger.properties +++ /dev/null @@ -1,20 +0,0 @@ -org.slf4j.simpleLogger.defaultLogLevel=off - -org.slf4j.simpleLogger.log.io.javalin=off - -org.slf4j.simpleLogger.log.org.eclipse.jetty=off -org.slf4j.simpleLogger.log.org.eclipse.jetty.server=off -org.slf4j.simpleLogger.log.org.eclipse.jetty.util=off - -# org.slf4j.simpleLogger.log.org.springframework=off -# org.slf4j.simpleLogger.log.org.hibernate=off - -org.slf4j.simpleLogger.showDateTime=true - -org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z - -org.slf4j.simpleLogger.showThreadName=true - -org.slf4j.simpleLogger.showLogName=true - -org.slf4j.simpleLogger.showShortLogName=false diff --git a/SymphonyManager/pom.xml b/SymphonyManager/pom.xml index cd08f82..f24c54e 100644 --- a/SymphonyManager/pom.xml +++ b/SymphonyManager/pom.xml @@ -22,30 +22,6 @@ 2.11.0 - - org.slf4j - slf4j-api - 1.7.32 - - - - ch.qos.logback - logback-classic - 1.5.12 - - - - ch.qos.logback - logback-core - 1.5.12 - - - - org.slf4j - slf4j-simple - 1.7.32 - - org.apache.logging.log4j log4j-api @@ -56,6 +32,29 @@ log4j-core 2.24.2 + + + io.javalin + javalin + 6.3.0 + + + + org.slf4j + slf4j-simple + 2.0.16 + + + ch.qos.logback + logback-classic + 1.4.12 + + + + ch.qos.logback + logback-core + 1.4.14 + diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java new file mode 100644 index 0000000..b143257 --- /dev/null +++ b/SymphonyManager/src/main/java/io/github/lumijiez/AddressHolder.java @@ -0,0 +1,45 @@ +package io.github.lumijiez; + +import java.util.Timer; +import java.util.TimerTask; + +public class AddressHolder { + private String host; + private int port; + + private static AddressHolder INSTANCE; + + private AddressHolder() { +// Timer timer = new Timer(true); +// timer.scheduleAtFixedRate(new TimerTask() { +// @Override +// public void run() { +// Main.logger.info("Host: {}, Port: {}", host, port); +// } +// }, 0, 1000); + } + + public static AddressHolder getInstance() { + if (INSTANCE == null) { + INSTANCE = new AddressHolder(); + } + return INSTANCE; + } + + public int getPort() { + return port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } +} + diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/JavalinHttpConfig.java b/SymphonyManager/src/main/java/io/github/lumijiez/JavalinHttpConfig.java new file mode 100644 index 0000000..b473eee --- /dev/null +++ b/SymphonyManager/src/main/java/io/github/lumijiez/JavalinHttpConfig.java @@ -0,0 +1,20 @@ +package io.github.lumijiez; + +import io.github.lumijiez.requests.UpdateLeaderRequest; +import io.javalin.Javalin; +import io.javalin.http.Context; + +public class JavalinHttpConfig { + public static void setup(Javalin app) { + app.post("/update_leader", JavalinHttpConfig::handleUpdateLeader); + } + + public static void handleUpdateLeader(Context ctx) { + UpdateLeaderRequest request = ctx.bodyAsClass(UpdateLeaderRequest.class); + + AddressHolder.getInstance().setHost(request.getLeaderHost()); + AddressHolder.getInstance().setPort(request.getLeaderPort()); + + Main.logger.info("Changed host to leader: {}:{}", request.getLeaderHost(), request.getLeaderPort()); + } +} diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/Main.java b/SymphonyManager/src/main/java/io/github/lumijiez/Main.java index 07409b3..0ab2345 100644 --- a/SymphonyManager/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyManager/src/main/java/io/github/lumijiez/Main.java @@ -1,8 +1,26 @@ package io.github.lumijiez; -public class Main { +import io.javalin.Javalin; +import io.javalin.json.JavalinGson; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.time.Duration; + +public class Main { + public static Logger logger = LogManager.getLogger(Main.class); public static void main(String[] args) { - BrokerConnector.connect(); + new Thread(BrokerConnector::connect, "RabbitMQ-Connection").start(); + + Javalin app = Javalin.create(config -> { + config.jsonMapper(new JavalinGson()); + config.jetty.modifyWebSocketServletFactory(wsFactoryConfig -> { + wsFactoryConfig.setIdleTimeout(Duration.ZERO); + }); + }).start(8081); + + JavalinHttpConfig.setup(app); + + logger.info("Discovery service up and running"); } } \ No newline at end of file diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/requests/UpdateLeaderRequest.java b/SymphonyManager/src/main/java/io/github/lumijiez/requests/UpdateLeaderRequest.java new file mode 100644 index 0000000..b9e18b7 --- /dev/null +++ b/SymphonyManager/src/main/java/io/github/lumijiez/requests/UpdateLeaderRequest.java @@ -0,0 +1,22 @@ +package io.github.lumijiez.requests; + +public class UpdateLeaderRequest { + private String leaderHost; + private int leaderPort; + + public void setLeaderHost(String leaderHost) { + this.leaderHost = leaderHost; + } + + public void setLeaderPort(int leaderPort) { + this.leaderPort = leaderPort; + } + + public int getLeaderPort() { + return leaderPort; + } + + public String getLeaderHost() { + return leaderHost; + } +} diff --git a/config/logs/rabbitmq/rabbitmq.log b/config/logs/rabbitmq/rabbitmq.log index e69de29..d6d2c45 100644 --- a/config/logs/rabbitmq/rabbitmq.log +++ b/config/logs/rabbitmq/rabbitmq.log @@ -0,0 +1 @@ +2024-12-12 20:43:45.086648+00:00 [error] <0.1439.0> closing AMQP 1.0 connection (172.18.0.1:54074 -> 172.18.0.3:5672, duration: '775ms'): RabbitMQ requires SASL security layer (expected protocol ID 3, but client sent protocol ID 2) diff --git a/docker-compose.yml b/docker-compose.yml index 86e7833..8af3b3f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,18 +23,6 @@ services: depends_on: - rabbitmq - symphony-discovery: - container_name: discovery - build: - context: ./SymphonyDiscovery - dockerfile: Dockerfile - ports: - - "8083:8083" - networks: - - symphony-network - depends_on: - - postgres_db - symphony-smtp: container_name: smtp build: @@ -59,7 +47,6 @@ services: - UDP_PORT=8105 depends_on: - postgres_db - - symphony-discovery node2: container_name: node2 @@ -77,7 +64,6 @@ services: - UDP_PORT=8106 depends_on: - postgres_db - - symphony-discovery node3: container_name: node3 @@ -95,7 +81,6 @@ services: - UDP_PORT=8107 depends_on: - postgres_db - - symphony-discovery node4: container_name: node4 @@ -113,7 +98,6 @@ services: - UDP_PORT=8108 depends_on: - postgres_db - - symphony-discovery node5: container_name: node5 @@ -131,7 +115,6 @@ services: - UDP_PORT=8109 depends_on: - postgres_db - - symphony-discovery postgres_db: image: postgres:latest diff --git a/pom.xml b/pom.xml index e6db36a..0cd743a 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,6 @@ SymphonyManager SymphonyProducer - SymphonyDiscovery SymphonyDatabaseNode SymphonySMTP