huge rework, RAFT WORKING

This commit is contained in:
Daniel
2024-12-12 23:26:47 +02:00
parent 1ddec6a353
commit 8b5f755152
25 changed files with 375 additions and 625 deletions

View File

@@ -1,14 +1,13 @@
package io.github.lumijiez;
import io.github.lumijiez.app.NodeManager;
import io.github.lumijiez.raft.Raft;
public class Main {
public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost");
public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084"));
public static void main(String[] args) {
NodeManager manager = new NodeManager();
try {
Raft raft = new Raft();
Thread.currentThread().join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

View File

@@ -1,60 +0,0 @@
package io.github.lumijiez.app;
import io.github.lumijiez.data.models.NodeInfo;
import io.github.lumijiez.network.UdpListener;
import io.github.lumijiez.network.UdpMessageSender;
import io.github.lumijiez.network.WebSocketManager;
import io.github.lumijiez.raft.Raft;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
public class NodeManager {
private final Logger logger = LogManager.getLogger(NodeManager.class);
private final UdpListener listener;
private final UdpMessageSender sender;
private final WebSocketManager ws;
private final Raft raft;
private final List<NodeInfo> nodes = new ArrayList<>();
public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost");
public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084"));
public NodeManager() {
this.listener = new UdpListener(this);
this.sender = new UdpMessageSender(this);
this.ws = new WebSocketManager(this);
this.raft = new Raft(this, sender);
listener.startListening();
ws.connectAndListen();
raft.start();
}
public void handleMessage(String message) {
raft.processMessage(message);
}
public List<NodeInfo> getNodes() {
return nodes;
}
public void registerNode(NodeInfo node) {
if (!nodes.contains(node)) {
nodes.add(node);
sender.sendMessage(node, "NODE_REGISTERED");
// logger.info("New node registered, updating Raft peers");
}
}
public void removeNode(NodeInfo node) {
nodes.remove(node);
// logger.info("Node removed, updating Raft peers");
}
public UdpMessageSender getSender() {
return sender;
}
}

View File

@@ -1,16 +0,0 @@
package io.github.lumijiez.data.models;
public record NodeInfo(String hostname, int port) {
public static NodeInfo fromString(String nodeString) {
String[] parts = nodeString.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid node string format. Expected 'hostname:port'");
}
return new NodeInfo(parts[0], Integer.parseInt(parts[1]));
}
@Override
public String toString() {
return hostname + ":" + port;
}
}

View File

@@ -1,6 +1,5 @@
package io.github.lumijiez.network;
import io.github.lumijiez.app.NodeManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -11,26 +10,31 @@ import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.function.Consumer;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
public class UdpListener {
private static final Logger logger = LogManager.getLogger(UdpListener.class);
private final int port;
private final Consumer<JsonMessage> messageHandler;
private final Gson gson = new Gson();
private final NodeManager nodeManager;
public UdpListener(NodeManager nodeManager) {
this.nodeManager = nodeManager;
public UdpListener(int port, Consumer<JsonMessage> messageHandler) {
this.port = port;
this.messageHandler = messageHandler;
}
public void startListening() {
Thread udpListenerThread = new Thread(() -> {
Thread listenerThread = new Thread(() -> {
try (Selector selector = Selector.open();
DatagramChannel channel = DatagramChannel.open()) {
channel.bind(new InetSocketAddress(NodeManager.PORT));
channel.bind(new InetSocketAddress(port));
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
logger.info("UDP listens on port {}", NodeManager.PORT);
logger.info("Listening for UDP messages on port {}", port);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (!Thread.currentThread().isInterrupted()) {
@@ -48,15 +52,37 @@ public class UdpListener {
buffer.flip();
String message = new String(buffer.array(), 0, buffer.limit()).trim();
// logger.info("Received UDP {}:{}: {}", sender.getHostName(), sender.getPort(), message);
nodeManager.handleMessage(message);
logger.info("Received message from {}:{} - {}", sender.getHostName(), sender.getPort(), message);
try {
JsonMessage jsonMessage = gson.fromJson(message, JsonMessage.class);
messageHandler.accept(jsonMessage);
} catch (JsonSyntaxException e) {
logger.error("Invalid JSON received: {}", message, e);
}
}
}
}
} catch (IOException e) {
logger.error("Error in UDP listener: {}", e.getMessage());
logger.error("Error in UDP listener: {}", e.getMessage(), e);
}
});
udpListenerThread.start();
listenerThread.start();
}
}
public static class JsonMessage {
public String type;
public int term;
public String sender;
public String additionalData;
public JsonMessage() {}
public JsonMessage(String type, int term, String sender, String additionalData) {
this.type = type;
this.term = term;
this.sender = sender;
this.additionalData = additionalData;
}
}
}

View File

@@ -1,31 +0,0 @@
package io.github.lumijiez.network;
import io.github.lumijiez.app.NodeManager;
import io.github.lumijiez.data.models.NodeInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
public class UdpMessageSender {
private static final Logger logger = LogManager.getLogger(UdpMessageSender.class);
private final NodeManager nodeManager;
public UdpMessageSender(NodeManager nodeManager) {
this.nodeManager = nodeManager;
}
public void sendMessage(NodeInfo node, String message) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
channel.send(buffer, new InetSocketAddress(node.hostname(), node.port()));
// logger.info("Sent UDP to {}:{}: {}", node.hostname(), node.port(), message);
} catch (IOException e) {
logger.error("Error sending UDP message: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,27 @@
package io.github.lumijiez.network;
import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
public class UdpSender {
private static final Logger logger = LogManager.getLogger(UdpSender.class);
private final Gson gson = new Gson();
public void sendMessage(String hostname, int port, Object message) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.configureBlocking(false);
String jsonMessage = gson.toJson(message);
ByteBuffer buffer = ByteBuffer.wrap(jsonMessage.getBytes());
channel.send(buffer, new InetSocketAddress(hostname, port));
logger.info("Sent message to {}:{} - {}", hostname, port, jsonMessage);
} catch (IOException e) {
logger.error("Error sending UDP message: {}", e.getMessage(), e);
}
}
}

View File

@@ -1,81 +0,0 @@
package io.github.lumijiez.network;
import io.github.lumijiez.app.NodeManager;
import io.github.lumijiez.data.models.NodeInfo;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.lang.reflect.Type;
public class WebSocketManager {
private static final Logger logger = LogManager.getLogger(WebSocketManager.class);
private static final Gson gson = new Gson();
private static final CountDownLatch waitForConnection = new CountDownLatch(1);
private WebSocket webSocket;
private final NodeManager nodeManager;
public WebSocketManager(NodeManager nodeManager) {
this.nodeManager = nodeManager;
}
public void connectAndListen() {
try (HttpClient client = HttpClient.newHttpClient()) {
CompletableFuture<WebSocket> wsFuture = client
.newWebSocketBuilder()
.buildAsync(new URI("ws://symphony-discovery:8083/discovery"), new WebSocket.Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
try {
Type nodeListType = new TypeToken<List<NodeInfo>>() {}.getType();
List<NodeInfo> nodes = gson.fromJson(data.toString(), nodeListType);
for (NodeInfo newNode : nodes) {
nodeManager.registerNode(newNode);
}
} catch (Exception e) {
logger.error("Error processing WebSocket data: {}", e.getMessage());
}
return WebSocket.Listener.super.onText(webSocket, data, last);
}
@Override
public void onOpen(WebSocket webSocket) {
NodeInfo nodeInfo = new NodeInfo(NodeManager.HOST, NodeManager.PORT);
webSocket.sendText(gson.toJson(nodeInfo), true);
logger.info("Successfully registered to Discovery");
waitForConnection.countDown();
WebSocket.Listener.super.onOpen(webSocket);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
logger.info("Unregistered from Discovery: {}", reason);
return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
logger.error("WebSocket error: {}", error.getMessage());
WebSocket.Listener.super.onError(webSocket, error);
}
});
webSocket = wsFuture.join();
waitForConnection.await();
} catch (Exception e) {
logger.error("Error in WebSocketManager: {}", e.getMessage());
}
}
}

View File

@@ -1,177 +1,234 @@
package io.github.lumijiez.raft;
import io.github.lumijiez.app.NodeManager;
import io.github.lumijiez.data.models.NodeInfo;
import io.github.lumijiez.network.UdpMessageSender;
import com.google.gson.Gson;
import io.github.lumijiez.Main;
import io.github.lumijiez.network.UdpListener;
import io.github.lumijiez.network.UdpSender;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;
public class Raft {
private static final Random RANDOM = new Random();
private final Logger logger = LogManager.getLogger(Raft.class);
private static final Logger logger = LogManager.getLogger(Raft.class);
// Configuration Parameters
private static final int MIN_ELECTION_TIMEOUT = 300;
private static final int MAX_ELECTION_TIMEOUT = 600;
private static final int HEARTBEAT_INTERVAL = 100;
private static final int QUORUM_FACTOR = 2;
private enum State {
FOLLOWER, CANDIDATE, LEADER
}
// State Variables
private RaftStates state = RaftStates.FOLLOWER;
private static final List<String> NODES = Arrays.asList(
"node1:8105", "node2:8106", "node3:8107", "node4:8108", "node5:8109"
);
private final String selfAddress = Main.HOST;
private final int selfPort = Main.PORT;
private State currentState = State.FOLLOWER;
private int currentTerm = 0;
private String votedFor = null;
private Set<String> votesReceived = new HashSet<>();
private String currentLeader = null;
private final NodeManager nodeManager;
private final UdpMessageSender sender;
private final ScheduledExecutorService electionExecutor = Executors.newSingleThreadScheduledExecutor();
private final Set<String> receivedVotes = ConcurrentHashMap.newKeySet();
private final Random random = new Random();
private long electionTimeout = generateElectionTimeout();
private long lastHeartbeatTime = System.currentTimeMillis();
public Raft(NodeManager nodeManager, UdpMessageSender sender) {
this.nodeManager = nodeManager;
this.sender = sender;
nodeManager.getNodes().removeIf(node -> node.hostname().equals(NodeManager.HOST) && node.port() == NodeManager.PORT);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final ExecutorService electionExecutor = Executors.newSingleThreadExecutor();
private final UdpListener listener;
private final UdpSender sender;
private final Gson gson = new Gson();
public Raft() {
this.listener = new UdpListener(selfPort, this::processMessage);
this.sender = new UdpSender();
start();
}
public void start() {
logger.info("Raft initialization. Total peers: {}", nodeManager.getNodes().size());
becomeFollower(1);
private void start() {
executorService.scheduleAtFixedRate(this::checkElectionTimeout, 50, 50, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(this::sendHeartbeats, 100, 100, TimeUnit.MILLISECONDS);
listener.startListening();
logger.info("Node {} started", selfAddress);
}
private void becomeFollower(int term) {
if (term > currentTerm) {
state = RaftStates.FOLLOWER;
currentTerm = term;
votedFor = null;
votesReceived.clear();
logger.debug("Transitioned to FOLLOWER. Term: {}", term);
private long generateElectionTimeout() {
return System.currentTimeMillis() + 150 + random.nextInt(150);
}
private void checkElectionTimeout() {
long currentTime = System.currentTimeMillis();
if (currentState != State.LEADER && currentTime > electionTimeout) {
startElection();
}
scheduleElectionTimeout();
}
private void becomeCandidate() {
state = RaftStates.CANDIDATE;
private void startElection() {
currentState = State.CANDIDATE;
currentTerm++;
votedFor = NodeManager.HOST + ":" + NodeManager.PORT;
votesReceived = new HashSet<>(Set.of(votedFor)); // Self vote
votedFor = selfAddress;
receivedVotes.clear();
receivedVotes.add(selfAddress);
electionTimeout = generateElectionTimeout();
// logger.info("Starting election in Term {}. Attempting to collect votes.", currentTerm);
sendVoteRequests();
logger.info("Node {} starting election for term {}", selfAddress, currentTerm);
electionExecutor.submit(() -> {
for (String node : NODES) {
if (!node.equals(selfAddress)) {
UdpListener.JsonMessage message = new UdpListener.JsonMessage("VOTE_REQUEST", currentTerm, selfAddress + ":" + selfPort, null);
sendMessage(node, message);
}
}
try {
Thread.sleep(100);
tallyVotes();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private void tallyVotes() {
long requiredVotes = (NODES.size() / 2) + 1;
if (receivedVotes.size() >= requiredVotes) {
becomeLeader();
} else {
currentState = State.FOLLOWER;
votedFor = null;
electionTimeout = generateElectionTimeout();
logger.info("Node {} failed to become leader for term {}", selfAddress, currentTerm);
}
}
private void becomeLeader() {
if (state == RaftStates.CANDIDATE &&
votesReceived.size() >= (nodeManager.getNodes().size() / QUORUM_FACTOR + 1)) {
state = RaftStates.LEADER;
logger.info("LEADER ELECTION SUCCESS: Becoming LEADER in Term {}", currentTerm);
votesReceived.clear();
if (currentState == State.CANDIDATE) {
currentState = State.LEADER;
currentLeader = selfAddress;
logger.info("Node {} became leader for term {}", selfAddress, currentTerm);
sendHeartbeats();
notifyManager();
}
}
private void sendVoteRequests() {
String voteRequest = String.format("REQUEST_VOTE|%d|%s:%d",
currentTerm, NodeManager.HOST, NodeManager.PORT);
private void notifyManager() {
try {
URL url = new URL("http://manager:8081/update_leader");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setDoOutput(true);
for (NodeInfo peer : nodeManager.getNodes()) {
sender.sendMessage(peer, voteRequest);
String jsonBody = String.format(
"{\"leaderHost\": \"%s\", \"leaderPort\": %d}",
selfAddress.split(":")[0], selfPort
);
try (OutputStream os = connection.getOutputStream()) {
os.write(jsonBody.getBytes(StandardCharsets.UTF_8));
os.flush();
}
int responseCode = connection.getResponseCode();
} catch (Exception e) {
logger.error("Failed to notify manager of leadership: {}", e.getMessage(), e);
}
}
private void sendHeartbeats() {
if (state != RaftStates.LEADER) return;
String heartbeat = String.format("HEARTBEAT|%d|%s:%d",
currentTerm, NodeManager.HOST, NodeManager.PORT);
for (NodeInfo peer : nodeManager.getNodes()) {
sender.sendMessage(peer, heartbeat);
if (currentState == State.LEADER) {
for (String node : NODES) {
if (!node.equals(selfAddress)) {
UdpListener.JsonMessage message = new UdpListener.JsonMessage("HEARTBEAT", currentTerm, selfAddress, null);
sendMessage(node, message);
}
}
}
// Reschedule heartbeats
electionExecutor.schedule(this::sendHeartbeats, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
}
private void scheduleElectionTimeout() {
int timeout = MIN_ELECTION_TIMEOUT + RANDOM.nextInt(MAX_ELECTION_TIMEOUT - MIN_ELECTION_TIMEOUT);
electionExecutor.schedule(() -> {
if (state != RaftStates.LEADER) {
logger.debug("Election timeout triggered. Starting new election.");
becomeCandidate();
}
}, timeout, TimeUnit.MILLISECONDS);
}
public void processMessage(String message) {
String[] parts = message.split("\\|");
if (parts.length < 3) return;
private void processMessage(UdpListener.JsonMessage message) {
try {
String type = parts[0];
int messageTerm = Integer.parseInt(parts[1]);
String sender = parts[2];
// Term comparison and potential state change
if (messageTerm > currentTerm) {
becomeFollower(messageTerm);
}
switch (type) {
case "REQUEST_VOTE":
handleRequestVote(messageTerm, sender);
switch (message.type) {
case "VOTE_REQUEST":
handleVoteRequest(message.term, message.sender);
break;
case "VOTE_RESPONSE":
handleVoteResponse(message.term, message.sender, message.additionalData);
break;
case "HEARTBEAT":
handleHeartbeat(messageTerm, sender);
break;
case "VOTE_GRANTED":
handleVoteGranted(messageTerm, sender);
handleHeartbeat(message.term, message.sender);
break;
}
} catch (NumberFormatException e) {
logger.error("Error processing message: {}", message);
} catch (Exception e) {
logger.error("Error processing message: {}", gson.toJson(message), e);
}
}
private void handleRequestVote(int term, String candidate) {
private void handleVoteRequest(int term, String candidate) {
boolean voteGranted = false;
if (term >= currentTerm &&
(votedFor == null || votedFor.equals(candidate))) {
voteGranted = true;
votedFor = candidate;
if (term > currentTerm) {
currentTerm = term;
scheduleElectionTimeout();
currentState = State.FOLLOWER;
votedFor = null;
}
String response = String.format("VOTE_GRANTED|%d|%s", term, voteGranted);
sender.sendMessage(NodeInfo.fromString(candidate), response);
if (term == currentTerm && (votedFor == null || votedFor.equals(candidate)) && currentState != State.LEADER) {
voteGranted = true;
votedFor = candidate;
electionTimeout = generateElectionTimeout();
}
UdpListener.JsonMessage response = new UdpListener.JsonMessage("VOTE_RESPONSE", term, selfAddress + ":" + selfPort, voteGranted ? "GRANTED" : "REJECTED");
sendMessage(candidate, response);
}
private void handleVoteResponse(int term, String sender, String response) {
if (currentState != State.CANDIDATE || term != currentTerm) return;
if ("GRANTED".equals(response)) {
receivedVotes.add(sender);
}
}
private void handleHeartbeat(int term, String leader) {
if (term >= currentTerm) {
scheduleElectionTimeout();
becomeFollower(term);
currentTerm = term;
currentState = State.FOLLOWER;
currentLeader = leader;
lastHeartbeatTime = System.currentTimeMillis();
electionTimeout = generateElectionTimeout();
}
}
private void handleVoteGranted(int term, String candidate) {
if (state == RaftStates.CANDIDATE && term == currentTerm) {
votesReceived.add(candidate);
logger.debug("Vote received from {}. Total votes: {}/{}",
candidate, votesReceived.size(), (nodeManager.getNodes().size() / QUORUM_FACTOR + 1));
becomeLeader();
private void sendMessage(String address, UdpListener.JsonMessage message) {
try {
String[] parts = address.split(":");
if (parts.length != 2) {
logger.error("Invalid address format: {}", address);
return;
}
String host = parts[0];
int port = Integer.parseInt(parts[1]);
sender.sendMessage(host, port, message);
} catch (NumberFormatException e) {
logger.error("Invalid port in address: {}", address, e);
} catch (Exception e) {
logger.error("Error sending message to {}: {}", address, e.getMessage(), e);
}
}
public void shutdown() {
electionExecutor.shutdownNow();
}
}
}

View File

@@ -1,7 +0,0 @@
package io.github.lumijiez.raft;
public enum RaftStates {
FOLLOWER,
CANDIDATE,
LEADER
}