diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java index cf0b65a..0d06f91 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java @@ -1,160 +1,17 @@ package io.github.lumijiez; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import io.github.lumijiez.data.Data; -import io.github.lumijiez.data.models.NodeInfo; -import jakarta.persistence.EntityManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; -import java.io.IOException; -import java.lang.reflect.Type; -import java.net.*; -import java.net.http.HttpClient; -import java.net.http.WebSocket; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; +import io.github.lumijiez.app.NodeManager; public class Main { - public static Logger logger = LogManager.getLogger(Main.class); - private static final CountDownLatch waitForConnection = new CountDownLatch(1); - private static final Gson gson = new Gson(); - private static final List knownNodes = new ArrayList<>(); - private static int nodeCount = 0; public static void main(String[] args) { + NodeManager manager = new NodeManager(); + try { - String hostname = System.getenv().getOrDefault("HOSTNAME", "localhost"); - int udpPort = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084")); - - startUdpListener(udpPort); - - logger.info("Node started"); - EntityManager em = Data.getEntityManager(); - logger.info("Connected to database:\u001B[33m\033[1m symphony"); - em.close(); - - try (HttpClient client = HttpClient.newHttpClient()) { - CompletableFuture wsFuture = client - .newWebSocketBuilder() - .buildAsync(new URI("ws://symphony-discovery:8083/discovery"), new WebSocket.Listener() { - @Override - public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { - try { - Type nodeListType = new TypeToken>() {}.getType(); - List nodes = gson.fromJson(data.toString(), nodeListType); - - for (NodeInfo newNode : nodes) { - if (!knownNodes.contains(newNode)) { - knownNodes.add(newNode); - nodeCount++; - sendUdpSignal(newNode); - } - } - - logger.info("Acknowledged nodes:\u001B[33m\033[1m {}\u001B[0m", knownNodes.size()); - } catch (Exception e) { - logger.error("Error processing nodes: {}", e.getMessage()); - } - return WebSocket.Listener.super.onText(webSocket, data, last); - } - - @Override - public void onOpen(WebSocket webSocket) { - NodeInfo nodeInfo = new NodeInfo(hostname, udpPort); - webSocket.sendText(gson.toJson(nodeInfo), true); - logger.info("Successfully registered to \033[1mDiscovery"); - waitForConnection.countDown(); - WebSocket.Listener.super.onOpen(webSocket); - } - - @Override - public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { - logger.info("Unregistered from Discovery: {}", reason); - return WebSocket.Listener.super.onClose(webSocket, statusCode, reason); - } - - @Override - public void onError(WebSocket webSocket, Throwable error) { - logger.error("Error: {}", error.getMessage()); - WebSocket.Listener.super.onError(webSocket, error); - } - }); - - WebSocket ws = wsFuture.join(); - - try { - waitForConnection.await(); - Thread.currentThread().join(); - } finally { - ws.sendClose(WebSocket.NORMAL_CLOSURE, "Node shutting down").join(); - } - } - - } catch (Exception e) { - logger.error("Error in main: {}", e.getMessage(), e); + Thread.currentThread().join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } - - private static void sendUdpSignal(NodeInfo node) { - try (DatagramChannel channel = DatagramChannel.open()) { - channel.configureBlocking(false); - String message = String.valueOf(nodeCount); - ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); - channel.send(buffer, new InetSocketAddress(node.hostname(), node.port())); - logger.info("Sent UDP: \033[1m{}, message: \u001B[33m\033[1m{}", node.hostname() + ":" + node.port(), message); - } catch (IOException e) { - logger.error("Error sending UDP signal: {}", e.getMessage()); - } - } - - private static void startUdpListener(int udpPort) { - Thread udpListenerThread = new Thread(() -> { - try (Selector selector = Selector.open(); - DatagramChannel channel = DatagramChannel.open()) { - - channel.bind(new InetSocketAddress(udpPort)); - channel.configureBlocking(false); - channel.register(selector, SelectionKey.OP_READ); - - logger.info("UDP listens on port \033[1m{}", udpPort); - ByteBuffer buffer = ByteBuffer.allocate(1024); - - while (!Thread.currentThread().isInterrupted()) { - selector.select(); - Iterator keys = selector.selectedKeys().iterator(); - - while (keys.hasNext()) { - SelectionKey key = keys.next(); - keys.remove(); - - if (key.isReadable()) { - DatagramChannel datagramChannel = (DatagramChannel) key.channel(); - buffer.clear(); - InetSocketAddress sender = (InetSocketAddress) datagramChannel.receive(buffer); - buffer.flip(); - - String message = new String(buffer.array(), 0, buffer.limit()).trim(); - logger.info("Received UDP: \033[1m{}:{}: \u001B[33m\033[1m{}", - sender.getHostName(), - sender.getPort(), - message); - } - } - } - } catch (IOException e) { - logger.error("UDP listener error: {}", e.getMessage()); - } - }); - udpListenerThread.start(); - } } diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java new file mode 100644 index 0000000..4989eb9 --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/app/NodeManager.java @@ -0,0 +1,63 @@ +package io.github.lumijiez.app; + +import io.github.lumijiez.data.models.NodeInfo; +import io.github.lumijiez.network.UdpListener; +import io.github.lumijiez.network.UdpMessageSender; +import io.github.lumijiez.network.WebSocketManager; +import io.github.lumijiez.raft.Raft; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; + + +public class NodeManager { + private final Logger logger = LogManager.getLogger(NodeManager.class); + + private final UdpListener listener; + private final UdpMessageSender sender; + private final WebSocketManager ws; +// private final Raft raft; + + private final List nodes = new ArrayList<>(); + + public static final String HOST = System.getenv().getOrDefault("HOSTNAME", "localhost"); + public static final int PORT = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084")); + + public NodeManager() { + this.listener = new UdpListener(this); + this.sender = new UdpMessageSender(this); + this.ws = new WebSocketManager(this); +// this.raft = new Raft(this); + + listener.startListening(); + ws.connectAndListen(); + } + + public void handleMessage(String message) { +// raft.processMessage(message); + } + + public List getNodes() { + return nodes; + } + + public void registerNode(NodeInfo node) { + if (!nodes.contains(node)) { + nodes.add(node); + sender.sendMessage(node, "NODE_REGISTERED"); + + logger.info("New node registered, restarting Raft consensus"); +// raft.initializeRaft(); + } + } + + public void removeNode(NodeInfo node) { + nodes.remove(node); + } + + public UdpMessageSender getSender() { + return sender; + } +} diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java index 350a7ad..1cc6958 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/models/NodeInfo.java @@ -1,3 +1,16 @@ package io.github.lumijiez.data.models; -public record NodeInfo(String hostname, int port) { } +public record NodeInfo(String hostname, int port) { + public static NodeInfo fromString(String nodeString) { + String[] parts = nodeString.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid node string format. Expected 'hostname:port'"); + } + return new NodeInfo(parts[0], Integer.parseInt(parts[1])); + } + + @Override + public String toString() { + return hostname + ":" + port; + } +} diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java new file mode 100644 index 0000000..8c2ced8 --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpListener.java @@ -0,0 +1,62 @@ +package io.github.lumijiez.network; + +import io.github.lumijiez.app.NodeManager; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Iterator; + +public class UdpListener { + private static final Logger logger = LogManager.getLogger(UdpListener.class); + + private final NodeManager nodeManager; + + public UdpListener(NodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + public void startListening() { + Thread udpListenerThread = new Thread(() -> { + try (Selector selector = Selector.open(); + DatagramChannel channel = DatagramChannel.open()) { + + channel.bind(new InetSocketAddress(NodeManager.PORT)); + channel.configureBlocking(false); + channel.register(selector, SelectionKey.OP_READ); + + logger.info("UDP listens on port {}", NodeManager.PORT); + ByteBuffer buffer = ByteBuffer.allocate(1024); + + while (!Thread.currentThread().isInterrupted()) { + selector.select(); + Iterator keys = selector.selectedKeys().iterator(); + + while (keys.hasNext()) { + SelectionKey key = keys.next(); + keys.remove(); + + if (key.isReadable()) { + DatagramChannel datagramChannel = (DatagramChannel) key.channel(); + buffer.clear(); + InetSocketAddress sender = (InetSocketAddress) datagramChannel.receive(buffer); + buffer.flip(); + + String message = new String(buffer.array(), 0, buffer.limit()).trim(); + logger.info("Received UDP {}:{}: {}", sender.getHostName(), sender.getPort(), message); + nodeManager.handleMessage(message); + } + } + } + } catch (IOException e) { + logger.error("Error in UDP listener: {}", e.getMessage()); + } + }); + udpListenerThread.start(); + } +} \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java new file mode 100644 index 0000000..8bcb71b --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/UdpMessageSender.java @@ -0,0 +1,35 @@ +package io.github.lumijiez.network; + +import io.github.lumijiez.app.NodeManager; +import io.github.lumijiez.data.models.NodeInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +public class UdpMessageSender { + private static final Logger logger = LogManager.getLogger(UdpMessageSender.class); + private final NodeManager nodeManager; + + public UdpMessageSender(NodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + public void sendMessage(NodeInfo node, String message) { + try (DatagramChannel channel = DatagramChannel.open()) { + channel.configureBlocking(false); + ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); + channel.send(buffer, new InetSocketAddress(node.hostname(), node.port())); + logger.info("Sent UDP to {}:{}: {}", node.hostname(), node.port(), message); + } catch (IOException e) { + logger.error("Error sending UDP message: {}", e.getMessage()); + } + } + + public void sendCountMessage(NodeInfo node, int count) { + sendMessage(node, String.valueOf(count)); + } +} \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java new file mode 100644 index 0000000..6b37fe0 --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/network/WebSocketManager.java @@ -0,0 +1,88 @@ +package io.github.lumijiez.network; + +import io.github.lumijiez.app.NodeManager; +import io.github.lumijiez.data.models.NodeInfo; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; +import java.lang.reflect.Type; + +public class WebSocketManager { + private static final Logger logger = LogManager.getLogger(WebSocketManager.class); + private static final Gson gson = new Gson(); + private static final CountDownLatch waitForConnection = new CountDownLatch(1); + + private WebSocket webSocket; + private final NodeManager nodeManager; + + public WebSocketManager(NodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + public void connectAndListen() { + try (HttpClient client = HttpClient.newHttpClient()) { + CompletableFuture wsFuture = client + .newWebSocketBuilder() + .buildAsync(new URI("ws://symphony-discovery:8083/discovery"), new WebSocket.Listener() { + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + try { + Type nodeListType = new TypeToken>() {}.getType(); + List nodes = gson.fromJson(data.toString(), nodeListType); + + for (NodeInfo newNode : nodes) { + nodeManager.registerNode(newNode); +// logger.info("Discovered node: {}:{}", newNode.hostname(), newNode.port()); + } + + } catch (Exception e) { + logger.error("Error processing WebSocket data: {}", e.getMessage()); + } + return WebSocket.Listener.super.onText(webSocket, data, last); + } + + @Override + public void onOpen(WebSocket webSocket) { + NodeInfo nodeInfo = new NodeInfo(NodeManager.HOST, NodeManager.PORT); + webSocket.sendText(gson.toJson(nodeInfo), true); + logger.info("Successfully registered to Discovery"); + waitForConnection.countDown(); + WebSocket.Listener.super.onOpen(webSocket); + } + + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + logger.info("Unregistered from Discovery: {}", reason); + return WebSocket.Listener.super.onClose(webSocket, statusCode, reason); + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + logger.error("WebSocket error: {}", error.getMessage()); + WebSocket.Listener.super.onError(webSocket, error); + } + }); + + webSocket = wsFuture.join(); + waitForConnection.await(); + + } catch (Exception e) { + logger.error("Error in WebSocketManager: {}", e.getMessage()); + } + } + + public void send(NodeInfo nodeInfo) { + if (webSocket != null) { + webSocket.sendText(gson.toJson(nodeInfo), true); + } + } +} diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java new file mode 100644 index 0000000..9f0c4aa --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/Raft.java @@ -0,0 +1,20 @@ +package io.github.lumijiez.raft; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class Raft { + private static final Random RANDOM = new Random(); + + private static RaftStates STATE = RaftStates.FOLLOWER; + private static int CURRENT_TERM = 0; + private static String VOTED_FOR = null; + private static List LOG = new ArrayList<>(); + private static int COMMIT_INDEX = 0; + private static int LAST_APPLIED = 0; + + + private static final int ELECTION_TIMEOUT = 150 + RANDOM.nextInt(151); + +} \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java new file mode 100644 index 0000000..af2d050 --- /dev/null +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/raft/RaftStates.java @@ -0,0 +1,7 @@ +package io.github.lumijiez.raft; + +public enum RaftStates { + FOLLOWER, + CANDIDATE, + LEADER +} diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java index ebae711..48583d0 100644 --- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java +++ b/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java @@ -3,6 +3,8 @@ package io.github.lumijiez; import com.google.gson.Gson; import io.javalin.Javalin; import io.javalin.websocket.WsContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -13,6 +15,7 @@ public class JavalinConfig { private static final Map registeredNodes = new ConcurrentHashMap<>(); private static final Map nodes = new ConcurrentHashMap<>(); private static final Gson gson = new Gson(); + private static final Logger logger = LogManager.getLogger(JavalinConfig.class); public static void setup(Javalin app) { app.ws("/discovery", ws -> { diff --git a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java index 89555fb..e3c0e3b 100644 --- a/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java +++ b/SymphonyManager/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -26,7 +26,7 @@ public class BrokerConnector { try { connectToRabbitMQ(); } catch (Exception e) { - logger.error("Awaiting broker connection: {}", e.getMessage()); + logger.warn("Awaiting broker connection: {}", e.getMessage()); } }, 0, 5, TimeUnit.SECONDS); diff --git a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java index dcd40db..fe1b151 100644 --- a/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java +++ b/SymphonyProducer/src/main/java/io/github/lumijiez/BrokerConnector.java @@ -63,7 +63,7 @@ public class BrokerConnector { return scheduler.awaitTermination(5, TimeUnit.SECONDS); } catch (Exception e) { - logger.error("Awaiting broker connection: {}", e.getMessage()); + logger.warn("Awaiting broker connection: {}", e.getMessage()); try { Thread.sleep(5000); connectToRabbitMQ(latch);