From 03b8a2a1dfd1804439ffa7c061665e3d73c5c039 Mon Sep 17 00:00:00 2001 From: Martin Kaeser Date: Tue, 19 Mar 2024 15:00:53 +0100 Subject: [PATCH 1/3] Cleanup state after future has completed - Clean up request and promise maps whenever future completes - Enables timeout for Client/Server.send() (cherry picked from commit c801fbb221b4090405e58ed37d414fab06b16ce3) --- .../main/java/eu/chargetime/ocpp/Client.java | 17 ++++++++++------- .../main/java/eu/chargetime/ocpp/ISession.java | 2 ++ .../main/java/eu/chargetime/ocpp/Queue.java | 10 ++++++++++ .../main/java/eu/chargetime/ocpp/Server.java | 18 +++++++++++------- .../main/java/eu/chargetime/ocpp/Session.java | 10 ++++++++++ .../ocpp/TimeoutSessionDecorator.java | 5 +++++ 6 files changed, 48 insertions(+), 14 deletions(-) diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java index f0b50941f..238ab19d8 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Client.java @@ -78,7 +78,6 @@ public void handleConfirmation(String uniqueId, Confirmation confirmation) { promiseRepository.getPromise(uniqueId); if (promiseOptional.isPresent()) { promiseOptional.get().complete(confirmation); - promiseRepository.removePromise(uniqueId); } else { logger.debug("Promise not found for confirmation {}", confirmation); } @@ -105,11 +104,9 @@ public void handleError( Optional> promiseOptional = promiseRepository.getPromise(uniqueId); if (promiseOptional.isPresent()) { - promiseOptional - .get() + promiseOptional.get() .completeExceptionally( new CallErrorException(errorCode, errorDescription, payload)); - promiseRepository.removePromise(uniqueId); } else { logger.debug("Promise not found for error {}", errorDescription); } @@ -158,10 +155,16 @@ public CompletableFuture send(Request request) throw new OccurenceConstraintException(); } - String id = session.storeRequest(request); - CompletableFuture promise = promiseRepository.createPromise(id); + String requestUuid = session.storeRequest(request); + CompletableFuture promise = promiseRepository.createPromise(requestUuid); - session.sendRequest(featureOptional.get().getAction(), request, id); + // Clean up after the promise has completed, no matter if it was successful or had an error or a timeout. + promise.whenComplete((confirmation, throwable) -> { + session.removeRequest(requestUuid); + promiseRepository.removePromise(requestUuid); + }); + + session.sendRequest(featureOptional.get().getAction(), request, requestUuid); return promise; } diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java index 18ea99383..a2c89d883 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/ISession.java @@ -40,6 +40,8 @@ public interface ISession { String storeRequest(Request payload); + void removeRequest(String ticket); + void sendRequest(String action, Request payload, String uuid); boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException; diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java index 318c191da..9267efe9f 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Queue.java @@ -89,6 +89,16 @@ public Optional restoreRequest(String ticket) { return Optional.empty(); } + /** + * Remove a stored {@link Request} using a unique identifier. + * If no request is found for the identifier this method has no effect. + * + * @param ticket unique identifier returned when {@link Request} was initially stored. + */ + public void removeRequest(String ticket) { + requestQueue.remove(ticket); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("requestQueue", requestQueue).toString(); diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java index d62abe934..49d89e8a1 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Server.java @@ -97,7 +97,6 @@ public void handleConfirmation(String uniqueId, Confirmation confirmation) { promiseRepository.getPromise(uniqueId); if (promiseOptional.isPresent()) { promiseOptional.get().complete(confirmation); - promiseRepository.removePromise(uniqueId); } else { logger.debug("Promise not found for confirmation {}", confirmation); } @@ -135,11 +134,9 @@ public void handleError( Optional> promiseOptional = promiseRepository.getPromise(uniqueId); if (promiseOptional.isPresent()) { - promiseOptional - .get() + promiseOptional.get() .completeExceptionally( new CallErrorException(errorCode, errorDescription, payload)); - promiseRepository.removePromise(uniqueId); } else { logger.debug("Promise not found for error {}", errorDescription); } @@ -216,9 +213,16 @@ public CompletableFuture send(UUID sessionIndex, Request request) throw new OccurenceConstraintException(); } - String id = session.storeRequest(request); - CompletableFuture promise = promiseRepository.createPromise(id); - session.sendRequest(featureOptional.get().getAction(), request, id); + String requestUuid = session.storeRequest(request); + CompletableFuture promise = promiseRepository.createPromise(requestUuid); + + // Clean up after the promise has completed, no matter if it was successful or had an error or a timeout. + promise.whenComplete((confirmation, throwable) -> { + session.removeRequest(requestUuid); + promiseRepository.removePromise(requestUuid); + }); + + session.sendRequest(featureOptional.get().getAction(), request, requestUuid); return promise; } diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java index 1c3294b94..05ffc6182 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java @@ -114,6 +114,16 @@ public String storeRequest(Request payload) { return queue.store(payload); } + /** + * Remove a stored {@link Request} using a unique identifier. + * If no request is found for the identifier this method has no effect. + * + * @param ticket unique identifier returned when {@link Request} was initially stored. + */ + public void removeRequest(String ticket) { + queue.removeRequest(ticket); + } + /** * Send a {@link Confirmation} to a {@link Request} * diff --git a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java index bc27444b0..474d270c5 100644 --- a/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java +++ b/ocpp-v1_6/src/main/java/eu/chargetime/ocpp/TimeoutSessionDecorator.java @@ -92,6 +92,11 @@ public void sendRequest(String action, Request payload, String uuid) { this.session.sendRequest(action, payload, uuid); } + @Override + public void removeRequest(String ticket) { + this.session.removeRequest(ticket); + } + @Override public boolean completePendingPromise(String id, Confirmation confirmation) throws UnsupportedFeatureException, OccurenceConstraintException { return this.session.completePendingPromise(id, confirmation); From 92aabc5b6233cf81fe05e48166035fd99fe77fe2 Mon Sep 17 00:00:00 2001 From: Martin Kaeser Date: Tue, 19 Mar 2024 16:57:39 +0100 Subject: [PATCH 2/3] Cleanup state after future has completed - tests - Fix and improve tests (cherry picked from commit 6179a4e2530e432c867a28a32d58bcfe5d639a91) --- .../eu/chargetime/ocpp/test/ClientTest.java | 85 +++++++++++++++-- .../eu/chargetime/ocpp/test/ServerTest.java | 93 ++++++++++++++++++- 2 files changed, 167 insertions(+), 11 deletions(-) diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java index 4c6aebd08..151a2f3d8 100644 --- a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java @@ -26,23 +26,30 @@ of this software and associated documentation files (the "Software"), to deal SOFTWARE. */ -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.*; - import eu.chargetime.ocpp.*; import eu.chargetime.ocpp.feature.Feature; import eu.chargetime.ocpp.model.Confirmation; import eu.chargetime.ocpp.model.Request; import eu.chargetime.ocpp.model.TestConfirmation; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.*; + @RunWith(MockitoJUnitRunner.class) public class ClientTest { private Client client; @@ -63,8 +70,10 @@ public void setup() { .when(session) .open(any(), any()); + when(promiseRepository.createPromise(any())).then(invocation -> new CompletableFuture()); when(featureRepository.findFeature(any())).thenReturn(Optional.of(feature)); when(session.getFeatureRepository()).thenReturn(featureRepository); + when(session.storeRequest(any())).then(invocation -> UUID.randomUUID().toString()); client = new Client(session, promiseRepository); } @@ -164,4 +173,68 @@ public void send_aMessage_validatesMessage() throws Exception { // Then verify(request, times(1)).validate(); } + + @Test + public void send_aMessage_promiseCompletes() throws Exception { + // Given + CompletableFuture internalFuture = new CompletableFuture<>(); + when(promiseRepository.createPromise(any())).thenReturn(internalFuture); + + // When + CompletableFuture returnedFuture = client.send(request); + TestConfirmation confirmation = new TestConfirmation(); + internalFuture.complete(confirmation); + + // Then + assertThat(returnedFuture, is(internalFuture)); + assertThat(returnedFuture.isDone(), is(true)); + assertThat(returnedFuture.get(), is(confirmation)); + verify(session, times(1)).removeRequest(any()); + verify(promiseRepository, times(1)).removePromise(any()); + } + + @Test + public void send_aMessage_promiseCompletesExceptionally() throws Exception { + // Given + CompletableFuture internalFuture = new CompletableFuture<>(); + when(promiseRepository.createPromise(any())).thenReturn(internalFuture); + + // When + CompletableFuture returnedFuture = client.send(request); + internalFuture.completeExceptionally(new IllegalStateException()); + + // Then + assertThat(returnedFuture, is(internalFuture)); + assertThat(returnedFuture.isDone(), is(true)); + assertThat(returnedFuture.isCompletedExceptionally(), is(true)); + ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get); + assertThat(executionException.getCause().getClass(), is(equalTo(IllegalStateException.class))); + verify(session, times(1)).removeRequest(any()); + verify(promiseRepository, times(1)).removePromise(any()); + } + + @Test + public void send_aMessage_promiseCompletesWithTimeout() throws Exception { + // Given + CompletableFuture internalFuture = new CompletableFuture<>(); + when(promiseRepository.createPromise(any())).thenReturn(internalFuture); + + // When + CompletableFuture returnedFuture = client.send(request); + // If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..) .. + returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS); + assertThat(returnedFuture.isDone(), is(false)); + Thread.sleep(100); + // .. alternatively, it can be implemented manually +// returnedFuture.completeExceptionally(new TimeoutException()); + + // Then + assertThat(returnedFuture, is(internalFuture)); + assertThat(returnedFuture.isDone(), is(true)); + assertThat(returnedFuture.isCompletedExceptionally(), is(true)); + ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get); + assertThat(executionException.getCause().getClass(), is(equalTo(TimeoutException.class))); + verify(session, times(1)).removeRequest(any()); + verify(promiseRepository, times(1)).removePromise(any()); + } } diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java index 1fedbe141..b6259f3c4 100644 --- a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java @@ -1,19 +1,30 @@ package eu.chargetime.ocpp.test; -import static org.mockito.Mockito.*; - import eu.chargetime.ocpp.*; import eu.chargetime.ocpp.feature.Feature; +import eu.chargetime.ocpp.model.Confirmation; import eu.chargetime.ocpp.model.Request; import eu.chargetime.ocpp.model.SessionInformation; -import java.util.Optional; -import java.util.UUID; +import eu.chargetime.ocpp.model.TestConfirmation; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.*; + /* ChargeTime.eu - Java-OCA-OCPP @@ -58,7 +69,7 @@ public class ServerTest { @Mock private Request request; @Mock private SessionInformation information; @Mock private IFeatureRepository featureRepository; - @Mock private IPromiseRepository promiseRepository; + @Mock IPromiseRepository promiseRepository; @Before public void setup() { @@ -75,8 +86,10 @@ public void setup() { .when(serverEvents) .newSession(any(), any()); + when(promiseRepository.createPromise(any())).then(invocation -> new CompletableFuture()); when(featureRepository.findFeature(any())).thenReturn(Optional.of(feature)); when(session.getFeatureRepository()).thenReturn(featureRepository); + when(session.storeRequest(any())).thenAnswer(invocation -> UUID.randomUUID().toString()); server = new Server(listener, promiseRepository); } @@ -143,4 +156,74 @@ public void send_aMessage_validatesMessage() throws Exception { // Then verify(request, times(1)).validate(); } + + @Test + public void send_aMessage_promiseCompletes() throws Exception { + // Given + server.open(LOCALHOST, PORT, serverEvents); + listenerEvents.newSession(session, information); + CompletableFuture internalFuture = new CompletableFuture<>(); + when(promiseRepository.createPromise(any())).thenReturn(internalFuture); + + // When + CompletableFuture returnedFuture = server.send(sessionIndex, request); + TestConfirmation confirmation = new TestConfirmation(); + internalFuture.complete(confirmation); + + // Then + assertThat(returnedFuture, is(internalFuture)); + assertThat(returnedFuture.isDone(), is(true)); + assertThat(returnedFuture.get(), is(confirmation)); + verify(session, times(1)).removeRequest(any()); + verify(promiseRepository, times(1)).removePromise(any()); + } + + @Test + public void send_aMessage_promiseCompletesExceptionally() throws Exception { + // Given + server.open(LOCALHOST, PORT, serverEvents); + listenerEvents.newSession(session, information); + CompletableFuture internalFuture = new CompletableFuture<>(); + when(promiseRepository.createPromise(any())).thenReturn(internalFuture); + + // When + CompletableFuture returnedFuture = server.send(sessionIndex, request); + internalFuture.completeExceptionally(new IllegalStateException()); + + // Then + assertThat(returnedFuture, is(internalFuture)); + assertThat(returnedFuture.isDone(), is(true)); + assertThat(returnedFuture.isCompletedExceptionally(), is(true)); + ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get); + assertThat(executionException.getCause().getClass(), is(equalTo(IllegalStateException.class))); + verify(session, times(1)).removeRequest(any()); + verify(promiseRepository, times(1)).removePromise(any()); + } + + @Test + public void send_aMessage_promiseCompletesWithTimeout() throws Exception { + // Given + server.open(LOCALHOST, PORT, serverEvents); + listenerEvents.newSession(session, information); + CompletableFuture internalFuture = new CompletableFuture<>(); + when(promiseRepository.createPromise(any())).thenReturn(internalFuture); + + // When + CompletableFuture returnedFuture = server.send(sessionIndex, request); + // If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..). + returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS); + assertThat(returnedFuture.isDone(), is(false)); + Thread.sleep(100); + // .. alternatively, it can be implemented manually +// returnedFuture.completeExceptionally(new TimeoutException()); + + // Then + assertThat(returnedFuture, is(internalFuture)); + assertThat(returnedFuture.isDone(), is(true)); + assertThat(returnedFuture.isCompletedExceptionally(), is(true)); + ExecutionException executionException = assertThrows(ExecutionException.class, returnedFuture::get); + assertThat(executionException.getCause().getClass(), is(equalTo(TimeoutException.class))); + verify(session, times(1)).removeRequest(any()); + verify(promiseRepository, times(1)).removePromise(any()); + } } From f4c10a9cf1c3e072bf2c9f4093fbb8262ceff08d Mon Sep 17 00:00:00 2001 From: Martin Kaeser Date: Wed, 3 Apr 2024 09:15:23 +0200 Subject: [PATCH 3/3] Cleanup state after future has completed - adjust tests for Java 8 --- .../src/test/java/eu/chargetime/ocpp/test/ClientTest.java | 5 ++--- .../src/test/java/eu/chargetime/ocpp/test/ServerTest.java | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java index 151a2f3d8..cdb609592 100644 --- a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ClientTest.java @@ -41,7 +41,6 @@ of this software and associated documentation files (the "Software"), to deal import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.*; @@ -222,11 +221,11 @@ public void send_aMessage_promiseCompletesWithTimeout() throws Exception { // When CompletableFuture returnedFuture = client.send(request); // If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..) .. - returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS); +// returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS); assertThat(returnedFuture.isDone(), is(false)); Thread.sleep(100); // .. alternatively, it can be implemented manually -// returnedFuture.completeExceptionally(new TimeoutException()); + returnedFuture.completeExceptionally(new TimeoutException()); // Then assertThat(returnedFuture, is(internalFuture)); diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java index b6259f3c4..a20c0d26a 100644 --- a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/ServerTest.java @@ -16,7 +16,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.hamcrest.CoreMatchers.equalTo; @@ -211,11 +210,11 @@ public void send_aMessage_promiseCompletesWithTimeout() throws Exception { // When CompletableFuture returnedFuture = server.send(sessionIndex, request); // If the client uses at least Java 9, it could use CompletableFuture::orTimeout(..). - returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS); +// returnedFuture.orTimeout(50, TimeUnit.MILLISECONDS); assertThat(returnedFuture.isDone(), is(false)); Thread.sleep(100); // .. alternatively, it can be implemented manually -// returnedFuture.completeExceptionally(new TimeoutException()); + returnedFuture.completeExceptionally(new TimeoutException()); // Then assertThat(returnedFuture, is(internalFuture));