diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java index b15016a..1af46e5 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java @@ -1,7 +1,6 @@ package io.github.lumijiez; import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import io.github.lumijiez.data.Data; import io.github.lumijiez.data.models.NodeInfo; @@ -9,10 +8,17 @@ import jakarta.persistence.EntityManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import java.io.IOException; import java.lang.reflect.Type; -import java.net.URI; +import java.net.*; import java.net.http.HttpClient; import java.net.http.WebSocket; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -22,15 +28,18 @@ public class Main { public static Logger logger = LogManager.getLogger(Main.class); private static final CountDownLatch waitForConnection = new CountDownLatch(1); private static final Gson gson = new Gson(); + private static final List knownNodes = new ArrayList<>(); + private static int nodeCount = 0; public static void main(String[] args) { try { String hostname = System.getenv().getOrDefault("HOSTNAME", "localhost"); - int port = Integer.parseInt(System.getenv().getOrDefault("PORT", "8080")); + int udpPort = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084")); + + startUdpListener(udpPort); logger.info("Node started"); EntityManager em = Data.getEntityManager(); - logger.info("Connected to database:\u001B[33m\033[1m symphony"); em.close(); @@ -43,22 +52,25 @@ public class Main { try { Type nodeListType = new TypeToken>() {}.getType(); List nodes = gson.fromJson(data.toString(), nodeListType); -//// logger.info("Discovered Nodes (JSON):\n{}", -//// new GsonBuilder() -//// .setPrettyPrinting() -//// .create() -//// .toJson(nodes) -// ); - logger.info("Acknowledged nodes:\u001B[33m\033[1m {}\u001B[0m", nodes.size()); - } catch (NumberFormatException e) { - logger.error("Received invalid node count: {}", data); + + for (NodeInfo newNode : nodes) { + if (!knownNodes.contains(newNode)) { + knownNodes.add(newNode); + nodeCount++; + sendUdpSignal(newNode, udpPort); + } + } + + logger.info("Acknowledged nodes:\u001B[33m\033[1m {}\u001B[0m", knownNodes.size()); + } catch (Exception e) { + logger.error("Error processing nodes: {}", e.getMessage()); } return WebSocket.Listener.super.onText(webSocket, data, last); } @Override public void onOpen(WebSocket webSocket) { - NodeInfo nodeInfo = new NodeInfo(hostname, port); + NodeInfo nodeInfo = new NodeInfo(hostname, udpPort); webSocket.sendText(gson.toJson(nodeInfo), true); logger.info("Successfully registered to \033[1mDiscovery"); waitForConnection.countDown(); @@ -92,4 +104,57 @@ public class Main { logger.error("Error in main: {}", e.getMessage(), e); } } -} \ No newline at end of file + + private static void sendUdpSignal(NodeInfo node, int udpPort) { + try (DatagramChannel channel = DatagramChannel.open()) { + channel.configureBlocking(false); + String message = String.valueOf(nodeCount); + ByteBuffer buffer = ByteBuffer.wrap(message.getBytes()); + channel.send(buffer, new InetSocketAddress(node.hostname(), node.port())); + logger.info("Sent UDP to: \033[1m{}, message: \u001B[33m\033[1m{}", node.hostname(), message); + } catch (IOException e) { + logger.error("Error sending UDP signal: {}", e.getMessage()); + } + } + + private static void startUdpListener(int udpPort) { + Thread udpListenerThread = new Thread(() -> { + try (Selector selector = Selector.open(); + DatagramChannel channel = DatagramChannel.open()) { + + channel.bind(new InetSocketAddress(udpPort)); + channel.configureBlocking(false); + channel.register(selector, SelectionKey.OP_READ); + + logger.info("UDP listener started on port \033[1m{}", udpPort); + ByteBuffer buffer = ByteBuffer.allocate(1024); + + while (!Thread.currentThread().isInterrupted()) { + selector.select(); + Iterator keys = selector.selectedKeys().iterator(); + + while (keys.hasNext()) { + SelectionKey key = keys.next(); + keys.remove(); + + if (key.isReadable()) { + DatagramChannel datagramChannel = (DatagramChannel) key.channel(); + buffer.clear(); + InetSocketAddress sender = (InetSocketAddress) datagramChannel.receive(buffer); + buffer.flip(); + + String message = new String(buffer.array(), 0, buffer.limit()).trim(); + logger.info("Received UDP from \033[1m{}:{}: \u001B[33m\033[1m{}", + sender.getHostName(), + sender.getPort(), + message); + } + } + } + } catch (IOException e) { + logger.error("UDP listener error: {}", e.getMessage()); + } + }); + udpListenerThread.start(); + } +} diff --git a/docker-compose.yml b/docker-compose.yml index baec2a5..86e7833 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,82 +43,92 @@ services: ports: - "8090:8090" - symphony-dbnode1: + node1: container_name: node1 build: context: ./SymphonyDatabaseNode dockerfile: Dockerfile ports: - "8100:8100" + - "8105:8105/udp" networks: - symphony-network environment: - HOSTNAME=node1 - PORT=8100 + - UDP_PORT=8105 depends_on: - postgres_db - symphony-discovery - symphony-dbnode2: + node2: container_name: node2 build: context: ./SymphonyDatabaseNode dockerfile: Dockerfile ports: - "8101:8101" + - "8106:8106/udp" networks: - symphony-network environment: - HOSTNAME=node2 - PORT=8101 + - UDP_PORT=8106 depends_on: - postgres_db - symphony-discovery - symphony-dbnode3: + node3: container_name: node3 build: context: ./SymphonyDatabaseNode dockerfile: Dockerfile ports: - "8102:8102" + - "8107:8107/udp" networks: - symphony-network environment: - HOSTNAME=node3 - PORT=8102 + - UDP_PORT=8107 depends_on: - postgres_db - symphony-discovery - symphony-dbnode4: + node4: container_name: node4 build: context: ./SymphonyDatabaseNode dockerfile: Dockerfile ports: - "8103:8103" + - "8108:8108/udp" networks: - symphony-network environment: - HOSTNAME=node4 - PORT=8103 + - UDP_PORT=8108 depends_on: - postgres_db - symphony-discovery - symphony-dbnode5: + node5: container_name: node5 build: context: ./SymphonyDatabaseNode dockerfile: Dockerfile ports: - "8104:8104" + - "8109:8109/udp" networks: - symphony-network environment: - HOSTNAME=node5 - PORT=8104 + - UDP_PORT=8109 depends_on: - postgres_db - symphony-discovery