Correct node acknowledgement

This commit is contained in:
2024-11-24 00:27:14 +02:00
parent 03b1ea3b6d
commit 6464ea2986
6 changed files with 88 additions and 21 deletions

View File

@@ -1,36 +1,76 @@
package io.github.lumijiez; package io.github.lumijiez;
import io.github.lumijiez.data.Data;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient; import java.net.http.HttpClient;
import java.net.http.HttpRequest; import java.net.http.WebSocket;
import java.net.http.HttpResponse; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
public class Main { public class Main {
public static Logger logger = LogManager.getLogger(Main.class); public static Logger logger = LogManager.getLogger(Main.class);
private static final CountDownLatch waitForConnection = new CountDownLatch(1);
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException { public static void main(String[] args) {
HttpClient client = HttpClient.newHttpClient(); try {
logger.info("Node up.");
EntityManager em = Data.getEntityManager();
logger.info("Node up."); logger.info("Connected to database: << symphony >>");
EntityManager em = Data.getEntityManager(); em.close();
logger.info("Connected to database: << symphony >>"); try (HttpClient client = HttpClient.newHttpClient()) {
em.close(); CompletableFuture<WebSocket> wsFuture = client
.newWebSocketBuilder()
.buildAsync(new URI("ws://symphony-discovery:8083/discovery"), new WebSocket.Listener() {
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
try {
int nodeCount = Integer.parseInt(data.toString());
logger.info("Acknowledged system nodes: {}", nodeCount);
} catch (NumberFormatException e) {
logger.error("Received invalid node count: {}", data);
}
return WebSocket.Listener.super.onText(webSocket, data, last);
}
HttpRequest request = HttpRequest.newBuilder() @Override
.uri(new URI("http://symphony-discovery:8083/check")) public void onOpen(WebSocket webSocket) {
.build(); logger.info("Successfully registered to Discovery");
waitForConnection.countDown();
WebSocket.Listener.super.onOpen(webSocket);
}
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString()); @Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
logger.info("Unregistered from Discovery: {}", reason);
return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
}
if (response.statusCode() == 200) { @Override
logger.info("Node successfully registered to Discovery"); public void onError(WebSocket webSocket, Throwable error) {
logger.error("Error: {}", error.getMessage());
WebSocket.Listener.super.onError(webSocket, error);
}
});
WebSocket ws = wsFuture.join();
try {
waitForConnection.await();
Thread.currentThread().join();
} finally {
ws.sendClose(WebSocket.NORMAL_CLOSURE, "Node shutting down").join();
}
}
} catch (Exception e) {
logger.error("Error in main: {}", e.getMessage(), e);
} }
} }
} }

View File

@@ -1,4 +1,4 @@
package io.github.lumijiez; package io.github.lumijiez.data;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory; import jakarta.persistence.EntityManagerFactory;

View File

@@ -1,4 +1,4 @@
package io.github.lumijiez; package io.github.lumijiez.data.entities;
import jakarta.persistence.Entity; import jakarta.persistence.Entity;
import jakarta.persistence.Id; import jakarta.persistence.Id;

View File

@@ -7,7 +7,7 @@
<persistence-unit name="mainUnit" transaction-type="RESOURCE_LOCAL"> <persistence-unit name="mainUnit" transaction-type="RESOURCE_LOCAL">
<provider>org.hibernate.jpa.HibernatePersistenceProvider</provider> <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
<class>io.github.lumijiez.User</class> <class>io.github.lumijiez.data.entities.User</class>
<exclude-unlisted-classes>false</exclude-unlisted-classes> <exclude-unlisted-classes>false</exclude-unlisted-classes>

View File

@@ -2,11 +2,34 @@ package io.github.lumijiez;
import com.google.gson.Gson; import com.google.gson.Gson;
import io.javalin.Javalin; import io.javalin.Javalin;
import io.javalin.json.JavalinGson;
import io.javalin.websocket.WsContext;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class JavalinConfig { public class JavalinConfig {
public static Gson gson = new Gson(); private static final Map<String, WsContext> users = new ConcurrentHashMap<>();
public static void setup(Javalin app) { public static void setup(Javalin app) {
app.get("/check", ctx -> ctx.result("OK")); app.get("/check", ctx -> ctx.result("OK"));
app.ws("/discovery", ws -> {
ws.onConnect(ctx -> {
String id = ctx.sessionId();
users.put(id, ctx);
broadcast("Discovery", "Join");
});
ws.onClose(ctx -> {
String id = ctx.sessionId();
users.remove(id);
broadcast("Discovery", "Leave");
});
});
}
private static void broadcast(String sender, String message) {
users.values().forEach(ctx -> ctx.send(users.size()));
} }
} }

View File

@@ -1,10 +1,14 @@
package io.github.lumijiez; package io.github.lumijiez;
import io.javalin.Javalin; import io.javalin.Javalin;
import io.javalin.json.JavalinGson;
public class Main { public class Main {
public static void main(String[] args) { public static void main(String[] args) {
Javalin app = Javalin.create().start(8083); Javalin app = Javalin.create(config -> {
config.jsonMapper(new JavalinGson());
}).start(8083);
JavalinConfig.setup(app); JavalinConfig.setup(app);
System.out.print("Discovery service up and running"); System.out.print("Discovery service up and running");