UDP communication between nodes
This commit is contained in:
@@ -1,7 +1,6 @@
|
|||||||
package io.github.lumijiez;
|
package io.github.lumijiez;
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.GsonBuilder;
|
|
||||||
import com.google.gson.reflect.TypeToken;
|
import com.google.gson.reflect.TypeToken;
|
||||||
import io.github.lumijiez.data.Data;
|
import io.github.lumijiez.data.Data;
|
||||||
import io.github.lumijiez.data.models.NodeInfo;
|
import io.github.lumijiez.data.models.NodeInfo;
|
||||||
@@ -9,10 +8,17 @@ import jakarta.persistence.EntityManager;
|
|||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Type;
|
import java.lang.reflect.Type;
|
||||||
import java.net.URI;
|
import java.net.*;
|
||||||
import java.net.http.HttpClient;
|
import java.net.http.HttpClient;
|
||||||
import java.net.http.WebSocket;
|
import java.net.http.WebSocket;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.DatagramChannel;
|
||||||
|
import java.nio.channels.SelectionKey;
|
||||||
|
import java.nio.channels.Selector;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
@@ -22,15 +28,18 @@ public class Main {
|
|||||||
public static Logger logger = LogManager.getLogger(Main.class);
|
public static Logger logger = LogManager.getLogger(Main.class);
|
||||||
private static final CountDownLatch waitForConnection = new CountDownLatch(1);
|
private static final CountDownLatch waitForConnection = new CountDownLatch(1);
|
||||||
private static final Gson gson = new Gson();
|
private static final Gson gson = new Gson();
|
||||||
|
private static final List<NodeInfo> knownNodes = new ArrayList<>();
|
||||||
|
private static int nodeCount = 0;
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
try {
|
try {
|
||||||
String hostname = System.getenv().getOrDefault("HOSTNAME", "localhost");
|
String hostname = System.getenv().getOrDefault("HOSTNAME", "localhost");
|
||||||
int port = Integer.parseInt(System.getenv().getOrDefault("PORT", "8080"));
|
int udpPort = Integer.parseInt(System.getenv().getOrDefault("UDP_PORT", "8084"));
|
||||||
|
|
||||||
|
startUdpListener(udpPort);
|
||||||
|
|
||||||
logger.info("Node started");
|
logger.info("Node started");
|
||||||
EntityManager em = Data.getEntityManager();
|
EntityManager em = Data.getEntityManager();
|
||||||
|
|
||||||
logger.info("Connected to database:\u001B[33m\033[1m symphony");
|
logger.info("Connected to database:\u001B[33m\033[1m symphony");
|
||||||
em.close();
|
em.close();
|
||||||
|
|
||||||
@@ -43,22 +52,25 @@ public class Main {
|
|||||||
try {
|
try {
|
||||||
Type nodeListType = new TypeToken<List<NodeInfo>>() {}.getType();
|
Type nodeListType = new TypeToken<List<NodeInfo>>() {}.getType();
|
||||||
List<NodeInfo> nodes = gson.fromJson(data.toString(), nodeListType);
|
List<NodeInfo> nodes = gson.fromJson(data.toString(), nodeListType);
|
||||||
//// logger.info("Discovered Nodes (JSON):\n{}",
|
|
||||||
//// new GsonBuilder()
|
for (NodeInfo newNode : nodes) {
|
||||||
//// .setPrettyPrinting()
|
if (!knownNodes.contains(newNode)) {
|
||||||
//// .create()
|
knownNodes.add(newNode);
|
||||||
//// .toJson(nodes)
|
nodeCount++;
|
||||||
// );
|
sendUdpSignal(newNode, udpPort);
|
||||||
logger.info("Acknowledged nodes:\u001B[33m\033[1m {}\u001B[0m", nodes.size());
|
}
|
||||||
} catch (NumberFormatException e) {
|
}
|
||||||
logger.error("Received invalid node count: {}", data);
|
|
||||||
|
logger.info("Acknowledged nodes:\u001B[33m\033[1m {}\u001B[0m", knownNodes.size());
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Error processing nodes: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
return WebSocket.Listener.super.onText(webSocket, data, last);
|
return WebSocket.Listener.super.onText(webSocket, data, last);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onOpen(WebSocket webSocket) {
|
public void onOpen(WebSocket webSocket) {
|
||||||
NodeInfo nodeInfo = new NodeInfo(hostname, port);
|
NodeInfo nodeInfo = new NodeInfo(hostname, udpPort);
|
||||||
webSocket.sendText(gson.toJson(nodeInfo), true);
|
webSocket.sendText(gson.toJson(nodeInfo), true);
|
||||||
logger.info("Successfully registered to \033[1mDiscovery");
|
logger.info("Successfully registered to \033[1mDiscovery");
|
||||||
waitForConnection.countDown();
|
waitForConnection.countDown();
|
||||||
@@ -92,4 +104,57 @@ public class Main {
|
|||||||
logger.error("Error in main: {}", e.getMessage(), e);
|
logger.error("Error in main: {}", e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
private static void sendUdpSignal(NodeInfo node, int udpPort) {
|
||||||
|
try (DatagramChannel channel = DatagramChannel.open()) {
|
||||||
|
channel.configureBlocking(false);
|
||||||
|
String message = String.valueOf(nodeCount);
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
|
||||||
|
channel.send(buffer, new InetSocketAddress(node.hostname(), node.port()));
|
||||||
|
logger.info("Sent UDP to: \033[1m{}, message: \u001B[33m\033[1m{}", node.hostname(), message);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("Error sending UDP signal: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void startUdpListener(int udpPort) {
|
||||||
|
Thread udpListenerThread = new Thread(() -> {
|
||||||
|
try (Selector selector = Selector.open();
|
||||||
|
DatagramChannel channel = DatagramChannel.open()) {
|
||||||
|
|
||||||
|
channel.bind(new InetSocketAddress(udpPort));
|
||||||
|
channel.configureBlocking(false);
|
||||||
|
channel.register(selector, SelectionKey.OP_READ);
|
||||||
|
|
||||||
|
logger.info("UDP listener started on port \033[1m{}", udpPort);
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
|
|
||||||
|
while (!Thread.currentThread().isInterrupted()) {
|
||||||
|
selector.select();
|
||||||
|
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
|
||||||
|
|
||||||
|
while (keys.hasNext()) {
|
||||||
|
SelectionKey key = keys.next();
|
||||||
|
keys.remove();
|
||||||
|
|
||||||
|
if (key.isReadable()) {
|
||||||
|
DatagramChannel datagramChannel = (DatagramChannel) key.channel();
|
||||||
|
buffer.clear();
|
||||||
|
InetSocketAddress sender = (InetSocketAddress) datagramChannel.receive(buffer);
|
||||||
|
buffer.flip();
|
||||||
|
|
||||||
|
String message = new String(buffer.array(), 0, buffer.limit()).trim();
|
||||||
|
logger.info("Received UDP from \033[1m{}:{}: \u001B[33m\033[1m{}",
|
||||||
|
sender.getHostName(),
|
||||||
|
sender.getPort(),
|
||||||
|
message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("UDP listener error: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
udpListenerThread.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -43,82 +43,92 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- "8090:8090"
|
- "8090:8090"
|
||||||
|
|
||||||
symphony-dbnode1:
|
node1:
|
||||||
container_name: node1
|
container_name: node1
|
||||||
build:
|
build:
|
||||||
context: ./SymphonyDatabaseNode
|
context: ./SymphonyDatabaseNode
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
ports:
|
ports:
|
||||||
- "8100:8100"
|
- "8100:8100"
|
||||||
|
- "8105:8105/udp"
|
||||||
networks:
|
networks:
|
||||||
- symphony-network
|
- symphony-network
|
||||||
environment:
|
environment:
|
||||||
- HOSTNAME=node1
|
- HOSTNAME=node1
|
||||||
- PORT=8100
|
- PORT=8100
|
||||||
|
- UDP_PORT=8105
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
- symphony-discovery
|
- symphony-discovery
|
||||||
|
|
||||||
symphony-dbnode2:
|
node2:
|
||||||
container_name: node2
|
container_name: node2
|
||||||
build:
|
build:
|
||||||
context: ./SymphonyDatabaseNode
|
context: ./SymphonyDatabaseNode
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
ports:
|
ports:
|
||||||
- "8101:8101"
|
- "8101:8101"
|
||||||
|
- "8106:8106/udp"
|
||||||
networks:
|
networks:
|
||||||
- symphony-network
|
- symphony-network
|
||||||
environment:
|
environment:
|
||||||
- HOSTNAME=node2
|
- HOSTNAME=node2
|
||||||
- PORT=8101
|
- PORT=8101
|
||||||
|
- UDP_PORT=8106
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
- symphony-discovery
|
- symphony-discovery
|
||||||
|
|
||||||
symphony-dbnode3:
|
node3:
|
||||||
container_name: node3
|
container_name: node3
|
||||||
build:
|
build:
|
||||||
context: ./SymphonyDatabaseNode
|
context: ./SymphonyDatabaseNode
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
ports:
|
ports:
|
||||||
- "8102:8102"
|
- "8102:8102"
|
||||||
|
- "8107:8107/udp"
|
||||||
networks:
|
networks:
|
||||||
- symphony-network
|
- symphony-network
|
||||||
environment:
|
environment:
|
||||||
- HOSTNAME=node3
|
- HOSTNAME=node3
|
||||||
- PORT=8102
|
- PORT=8102
|
||||||
|
- UDP_PORT=8107
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
- symphony-discovery
|
- symphony-discovery
|
||||||
|
|
||||||
symphony-dbnode4:
|
node4:
|
||||||
container_name: node4
|
container_name: node4
|
||||||
build:
|
build:
|
||||||
context: ./SymphonyDatabaseNode
|
context: ./SymphonyDatabaseNode
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
ports:
|
ports:
|
||||||
- "8103:8103"
|
- "8103:8103"
|
||||||
|
- "8108:8108/udp"
|
||||||
networks:
|
networks:
|
||||||
- symphony-network
|
- symphony-network
|
||||||
environment:
|
environment:
|
||||||
- HOSTNAME=node4
|
- HOSTNAME=node4
|
||||||
- PORT=8103
|
- PORT=8103
|
||||||
|
- UDP_PORT=8108
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
- symphony-discovery
|
- symphony-discovery
|
||||||
|
|
||||||
symphony-dbnode5:
|
node5:
|
||||||
container_name: node5
|
container_name: node5
|
||||||
build:
|
build:
|
||||||
context: ./SymphonyDatabaseNode
|
context: ./SymphonyDatabaseNode
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
ports:
|
ports:
|
||||||
- "8104:8104"
|
- "8104:8104"
|
||||||
|
- "8109:8109/udp"
|
||||||
networks:
|
networks:
|
||||||
- symphony-network
|
- symphony-network
|
||||||
environment:
|
environment:
|
||||||
- HOSTNAME=node5
|
- HOSTNAME=node5
|
||||||
- PORT=8104
|
- PORT=8104
|
||||||
|
- UDP_PORT=8109
|
||||||
depends_on:
|
depends_on:
|
||||||
- postgres_db
|
- postgres_db
|
||||||
- symphony-discovery
|
- symphony-discovery
|
||||||
|
|||||||
Reference in New Issue
Block a user