diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java index 0167d8e..2eb07b9 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Main.java @@ -1,36 +1,76 @@ package io.github.lumijiez; +import io.github.lumijiez.data.Data; import jakarta.persistence.EntityManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; -import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; +import java.net.http.WebSocket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; public class Main { public static Logger logger = LogManager.getLogger(Main.class); + private static final CountDownLatch waitForConnection = new CountDownLatch(1); - public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException { - HttpClient client = HttpClient.newHttpClient(); + public static void main(String[] args) { + try { + logger.info("Node up."); + EntityManager em = Data.getEntityManager(); - logger.info("Node up."); - EntityManager em = Data.getEntityManager(); + logger.info("Connected to database: << symphony >>"); + em.close(); - logger.info("Connected to database: << symphony >>"); - em.close(); + try (HttpClient client = HttpClient.newHttpClient()) { + CompletableFuture wsFuture = client + .newWebSocketBuilder() + .buildAsync(new URI("ws://symphony-discovery:8083/discovery"), new WebSocket.Listener() { + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + try { + int nodeCount = Integer.parseInt(data.toString()); + logger.info("Acknowledged system nodes: {}", nodeCount); + } catch (NumberFormatException e) { + logger.error("Received invalid node count: {}", data); + } + return WebSocket.Listener.super.onText(webSocket, data, last); + } - HttpRequest request = HttpRequest.newBuilder() - .uri(new URI("http://symphony-discovery:8083/check")) - .build(); + @Override + public void onOpen(WebSocket webSocket) { + logger.info("Successfully registered to Discovery"); + waitForConnection.countDown(); + WebSocket.Listener.super.onOpen(webSocket); + } - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + logger.info("Unregistered from Discovery: {}", reason); + return WebSocket.Listener.super.onClose(webSocket, statusCode, reason); + } - if (response.statusCode() == 200) { - logger.info("Node successfully registered to Discovery"); + @Override + public void onError(WebSocket webSocket, Throwable error) { + logger.error("Error: {}", error.getMessage()); + WebSocket.Listener.super.onError(webSocket, error); + } + }); + + WebSocket ws = wsFuture.join(); + + try { + waitForConnection.await(); + Thread.currentThread().join(); + } finally { + ws.sendClose(WebSocket.NORMAL_CLOSURE, "Node shutting down").join(); + } + } + + } catch (Exception e) { + logger.error("Error in main: {}", e.getMessage(), e); } } } \ No newline at end of file diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Data.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/Data.java similarity index 94% rename from SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Data.java rename to SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/Data.java index ebddbb2..b17b1ad 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/Data.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/Data.java @@ -1,4 +1,4 @@ -package io.github.lumijiez; +package io.github.lumijiez.data; import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManagerFactory; diff --git a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/User.java b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java similarity index 94% rename from SymphonyDatabaseNode/src/main/java/io/github/lumijiez/User.java rename to SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java index 1539429..ca56251 100644 --- a/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/User.java +++ b/SymphonyDatabaseNode/src/main/java/io/github/lumijiez/data/entities/User.java @@ -1,4 +1,4 @@ -package io.github.lumijiez; +package io.github.lumijiez.data.entities; import jakarta.persistence.Entity; import jakarta.persistence.Id; diff --git a/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml b/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml index 1eba1f6..b50a1e9 100644 --- a/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml +++ b/SymphonyDatabaseNode/src/main/resources/META-INF/persistence.xml @@ -7,7 +7,7 @@ org.hibernate.jpa.HibernatePersistenceProvider - io.github.lumijiez.User + io.github.lumijiez.data.entities.User false diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java index 77de4e8..376afdb 100644 --- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java +++ b/SymphonyDiscovery/src/main/java/io/github/lumijiez/JavalinConfig.java @@ -2,11 +2,34 @@ package io.github.lumijiez; import com.google.gson.Gson; import io.javalin.Javalin; +import io.javalin.json.JavalinGson; +import io.javalin.websocket.WsContext; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class JavalinConfig { - public static Gson gson = new Gson(); + private static final Map users = new ConcurrentHashMap<>(); public static void setup(Javalin app) { app.get("/check", ctx -> ctx.result("OK")); + + app.ws("/discovery", ws -> { + ws.onConnect(ctx -> { + String id = ctx.sessionId(); + users.put(id, ctx); + broadcast("Discovery", "Join"); + }); + + ws.onClose(ctx -> { + String id = ctx.sessionId(); + users.remove(id); + broadcast("Discovery", "Leave"); + }); + }); + } + + private static void broadcast(String sender, String message) { + users.values().forEach(ctx -> ctx.send(users.size())); } } diff --git a/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java b/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java index bb768b9..94f7bd7 100644 --- a/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java +++ b/SymphonyDiscovery/src/main/java/io/github/lumijiez/Main.java @@ -1,10 +1,14 @@ package io.github.lumijiez; import io.javalin.Javalin; +import io.javalin.json.JavalinGson; public class Main { public static void main(String[] args) { - Javalin app = Javalin.create().start(8083); + Javalin app = Javalin.create(config -> { + config.jsonMapper(new JavalinGson()); + }).start(8083); + JavalinConfig.setup(app); System.out.print("Discovery service up and running");