Skip to content

Commit 74dc23d

Browse files
authored
Merge pull request #23 from PierreZ/watch_v2
Watch v2
2 parents 36a913b + cb46130 commit 74dc23d

28 files changed

+761
-624
lines changed

.idea/checkstyle-idea.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# fdb-etcd ![https://img.shields.io/badge/vert.x-3.8.5-purple.svg](https://img.shields.io/badge/vert.x-3.8.5-purple.svg) ![gradle build](https://github.com/PierreZ/fdb-etcd/workflows/gradle%20build/badge.svg)
1+
# fdb-etcd ![https://img.shields.io/badge/vert.x-4.0.3-purple.svg](https://img.shields.io/badge/vert.x-3.8.5-purple.svg) ![gradle build](https://github.com/PierreZ/fdb-etcd/workflows/gradle%20build/badge.svg)
22

33
An experiment to provide ETCD layer on top of FoundationDB, built with [Record-Layer](https://foundationdb.github.io/fdb-record-layer/) and [Vert.x](https://vertx.io/).
44

@@ -9,13 +9,13 @@ An experiment to provide ETCD layer on top of FoundationDB, built with [Record-L
99
* Integrations test using a real FDB spawned with testcontainers and official Java etcd client,
1010
* Tests are backported from jetcd test cases
1111
* Supported operations:
12-
* put,
13-
* get,
14-
* scan,
15-
* delete,
16-
* compact,
17-
* leases,
18-
* watches,
12+
* put,
13+
* get,
14+
* scan,
15+
* delete,
16+
* compact,
17+
* leases,
18+
* watches,
1919
* ETCD MVCC simulated using FDB's read version
2020
* multi-tenancy (soon back by the AuthService)
2121

@@ -30,20 +30,22 @@ For TODO's, please have a look to the [Github issues](https://github.com/pierrez
3030
* gradle 6.2.2
3131
* [FoundationDB Client Packages](https://www.foundationdb.org/download/)
3232

33-
3433
### Gradle cheat-sheet
3534

3635
To launch your tests:
36+
3737
```
3838
./gradlew clean test
3939
```
4040

4141
To package your application:
42+
4243
```
4344
./gradlew clean assemble
4445
```
4546

4647
To run your application:
48+
4749
```
4850
./gradlew clean run
4951
```

build.gradle

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ repositories {
2323
}
2424

2525
ext {
26-
vertxVersion = '3.8.5'
26+
vertxVersion = '4.0.3'
2727
junitJupiterEngineVersion = '5.4.0'
2828
}
2929

@@ -42,19 +42,20 @@ dependencies {
4242
implementation "io.vertx:vertx-grpc:$vertxVersion"
4343
implementation 'org.foundationdb:fdb-record-layer-core-pb3:2.8.91.0'
4444

45-
compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'
45+
implementation group: 'io.grpc', name: 'grpc-alts', version: "1.35.0"
46+
47+
implementation group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'
4648

4749
testImplementation "io.vertx:vertx-junit5:$vertxVersion"
48-
testCompile "org.testcontainers:testcontainers:1.13.0"
50+
testImplementation "org.testcontainers:testcontainers:1.15.3"
4951
testImplementation "io.etcd:jetcd-core:0.5.0"
5052

5153
// https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
52-
testCompile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
53-
54+
testImplementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
5455

5556
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitJupiterEngineVersion"
5657
testImplementation "org.junit.jupiter:junit-jupiter-api:$junitJupiterEngineVersion"
57-
testCompile("org.junit.jupiter:junit-jupiter-params:$junitJupiterEngineVersion")
58+
testImplementation("org.junit.jupiter:junit-jupiter-params:$junitJupiterEngineVersion")
5859
}
5960

6061

@@ -90,12 +91,16 @@ protobuf {
9091
}
9192
plugins {
9293
grpc {
93-
artifact = "io.vertx:protoc-gen-grpc-java:1.25.0"
94+
artifact = "io.grpc:protoc-gen-grpc-java:1.35.0"
95+
}
96+
vertx {
97+
artifact = "io.vertx:vertx-grpc-protoc-plugin:${vertxVersion}"
9498
}
9599
}
96100
generateProtoTasks {
97101
all()*.plugins {
98102
grpc
103+
vertx
99104
}
100105
}
101106
}

gradle/wrapper/gradle-wrapper.jar

3.42 KB
Binary file not shown.
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
#Sat Mar 21 15:26:57 CET 2020
2-
distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.2-all.zip
31
distributionBase=GRADLE_USER_HOME
42
distributionPath=wrapper/dists
5-
zipStorePath=wrapper/dists
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-7.0-all.zip
64
zipStoreBase=GRADLE_USER_HOME
5+
zipStorePath=wrapper/dists

gradlew

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
#!/usr/bin/env sh
22

3+
#
4+
# Copyright 2015 the original author or authors.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# https://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
319
##############################################################################
420
##
521
## Gradle start up script for UN*X
@@ -28,7 +44,7 @@ APP_NAME="Gradle"
2844
APP_BASE_NAME=`basename "$0"`
2945

3046
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
31-
DEFAULT_JVM_OPTS='"-Xmx64m"'
47+
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
3248

3349
# Use the maximum available, or set MAX_FD != -1 to use that value.
3450
MAX_FD="maximum"
@@ -109,8 +125,8 @@ if $darwin; then
109125
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
110126
fi
111127

112-
# For Cygwin, switch paths to Windows format before running java
113-
if $cygwin ; then
128+
# For Cygwin or MSYS, switch paths to Windows format before running java
129+
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
114130
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
115131
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
116132
JAVACMD=`cygpath --unix "$JAVACMD"`
@@ -138,19 +154,19 @@ if $cygwin ; then
138154
else
139155
eval `echo args$i`="\"$arg\""
140156
fi
141-
i=$((i+1))
157+
i=`expr $i + 1`
142158
done
143159
case $i in
144-
(0) set -- ;;
145-
(1) set -- "$args0" ;;
146-
(2) set -- "$args0" "$args1" ;;
147-
(3) set -- "$args0" "$args1" "$args2" ;;
148-
(4) set -- "$args0" "$args1" "$args2" "$args3" ;;
149-
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
150-
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
151-
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
152-
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
153-
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
160+
0) set -- ;;
161+
1) set -- "$args0" ;;
162+
2) set -- "$args0" "$args1" ;;
163+
3) set -- "$args0" "$args1" "$args2" ;;
164+
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
165+
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
166+
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
167+
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
168+
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
169+
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
154170
esac
155171
fi
156172

@@ -159,14 +175,9 @@ save () {
159175
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
160176
echo " "
161177
}
162-
APP_ARGS=$(save "$@")
178+
APP_ARGS=`save "$@"`
163179

164180
# Collect all arguments for the java command, following the shell quoting and substitution rules
165181
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
166182

167-
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
168-
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
169-
cd "$(dirname "$0")"
170-
fi
171-
172183
exec "$JAVACMD" "$@"

gradlew.bat

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
@rem
2+
@rem Copyright 2015 the original author or authors.
3+
@rem
4+
@rem Licensed under the Apache License, Version 2.0 (the "License");
5+
@rem you may not use this file except in compliance with the License.
6+
@rem You may obtain a copy of the License at
7+
@rem
8+
@rem https://www.apache.org/licenses/LICENSE-2.0
9+
@rem
10+
@rem Unless required by applicable law or agreed to in writing, software
11+
@rem distributed under the License is distributed on an "AS IS" BASIS,
12+
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
@rem See the License for the specific language governing permissions and
14+
@rem limitations under the License.
15+
@rem
16+
117
@if "%DEBUG%" == "" @echo off
218
@rem ##########################################################################
319
@rem
@@ -13,8 +29,11 @@ if "%DIRNAME%" == "" set DIRNAME=.
1329
set APP_BASE_NAME=%~n0
1430
set APP_HOME=%DIRNAME%
1531

32+
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
33+
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
34+
1635
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
17-
set DEFAULT_JVM_OPTS="-Xmx64m"
36+
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
1837

1938
@rem Find java.exe
2039
if defined JAVA_HOME goto findJavaFromJavaHome

src/main/java/fr/pierrezemb/fdb/layer/etcd/MainVerticle.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import fr.pierrezemb.fdb.layer.etcd.grpc.KVService;
66
import fr.pierrezemb.fdb.layer.etcd.grpc.LeaseService;
77
import fr.pierrezemb.fdb.layer.etcd.grpc.WatchService;
8-
import fr.pierrezemb.fdb.layer.etcd.notifier.Notifier;
9-
import fr.pierrezemb.fdb.layer.etcd.notifier.VertxNotifier;
108
import fr.pierrezemb.fdb.layer.etcd.store.EtcdRecordLayer;
119
import io.vertx.core.AbstractVerticle;
1210
import io.vertx.core.Promise;
@@ -24,23 +22,25 @@ public void start(Promise<Void> startPromise) throws Exception {
2422
System.out.println("connecting to fdb@" + clusterFilePath);
2523

2624
EtcdRecordLayer recordLayer = new EtcdRecordLayer(clusterFilePath);
27-
Notifier notifier = new VertxNotifier(vertx.eventBus());
2825

2926
VertxServerBuilder serverBuilder = VertxServerBuilder
3027
.forAddress(vertx,
3128
this.context.config().getString("listen-address", "localhost"),
3229
this.context.config().getInteger("listen-port", 8080))
3330
.intercept(new AuthInterceptor(authEnabled, defaultTenant))
34-
.addService(new KVService(recordLayer, notifier))
31+
.addService(new KVService(recordLayer))
3532
.addService(new LeaseService(recordLayer))
36-
.addService(new WatchService(recordLayer, notifier))
33+
.addService(new WatchService(recordLayer, vertx))
3734
.addService(new AuthService());
3835

3936
VertxServer server = serverBuilder.build();
4037

4138
server.start(ar -> {
4239
if (ar.succeeded()) {
43-
System.out.println("gRPC service started");
40+
System.out.println("gRPC service started on "
41+
+ this.context.config().getString("listen-address", "localhost")
42+
+ ":"
43+
+ this.context.config().getInteger("listen-port", 8080));
4444
startPromise.complete();
4545
} else {
4646
System.out.println("Could not start server " + ar.cause().getMessage());
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package fr.pierrezemb.fdb.layer.etcd;
2+
3+
import com.apple.foundationdb.record.query.expressions.Query;
4+
import com.apple.foundationdb.record.query.expressions.QueryComponent;
5+
import com.google.protobuf.AbstractMessageLite;
6+
import com.google.protobuf.ByteString;
7+
import fr.pierrezemb.fdb.layer.etcd.store.EtcdRecordLayer;
8+
import fr.pierrezemb.fdb.layer.etcd.store.LatestOperations;
9+
import io.vertx.core.AbstractVerticle;
10+
import io.vertx.core.Promise;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import java.util.concurrent.Semaphore;
15+
16+
public class WatchVerticle extends AbstractVerticle {
17+
18+
private static final Logger log = LoggerFactory.getLogger(WatchVerticle.class);
19+
20+
private final String tenantId;
21+
private final long watchID;
22+
private final EtcdRecordLayer recordLayer;
23+
private final QueryComponent keyQueryFilter;
24+
private final Semaphore mutex;
25+
private long timerID;
26+
private long lastCommitedVersion;
27+
28+
public WatchVerticle(String tenantId, long watchId, EtcdRecordLayer recordLayer, long commitVersion, ByteString rangeStart, ByteString rangeEnd) {
29+
this.tenantId = tenantId;
30+
this.watchID = watchId;
31+
this.recordLayer = recordLayer;
32+
this.lastCommitedVersion = commitVersion;
33+
this.mutex = new Semaphore(1);
34+
35+
this.keyQueryFilter = rangeEnd.size() == 0 ?
36+
Query.field("key").equalsValue(rangeStart.toByteArray()) :
37+
Query.and(
38+
Query.field("key").greaterThanOrEquals(rangeStart.toByteArray()),
39+
Query.field("key").lessThanOrEquals(rangeEnd.toByteArray())
40+
);
41+
}
42+
43+
@Override
44+
public void start(Promise<Void> startPromise) throws Exception {
45+
super.start(startPromise);
46+
log.debug("started WatchService {}/{}", tenantId, this.watchID);
47+
this.timerID = vertx.setTimer(10, id -> {
48+
this.poll();
49+
});
50+
}
51+
52+
private void poll() {
53+
long nextPollMS = 100;
54+
try {
55+
mutex.acquire();
56+
log.trace("poll started");
57+
long start = System.currentTimeMillis();
58+
LatestOperations ops = this.recordLayer.retrieveLatestOperations(tenantId, lastCommitedVersion, keyQueryFilter);
59+
long elapsed = System.currentTimeMillis() - start;
60+
log.trace("poll finished: found {} events in {}ms", ops.events.size(), elapsed);
61+
62+
if (elapsed > 100 && elapsed < 1000) {
63+
nextPollMS = elapsed * 2;
64+
}
65+
66+
ops.events.stream()
67+
.map(AbstractMessageLite::toByteArray)
68+
.forEach(e -> this.vertx.eventBus().publish(tenantId + watchID, e));
69+
log.trace("old readVersion: {}, new readVersion: {}", lastCommitedVersion, ops.readVersion);
70+
this.lastCommitedVersion = ops.readVersion;
71+
} catch (InterruptedException e) {
72+
// exception handling code
73+
} finally {
74+
mutex.release();
75+
}
76+
77+
log.trace("scheduling a poll in {}ms", nextPollMS);
78+
vertx.setTimer(nextPollMS, timerID -> poll());
79+
}
80+
81+
}

src/main/java/fr/pierrezemb/fdb/layer/etcd/grpc/AuthService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.slf4j.LoggerFactory;
88

99
public class AuthService extends AuthGrpc.AuthImplBase {
10-
private static Logger log = LoggerFactory.getLogger(AuthService.class);
10+
private static final Logger log = LoggerFactory.getLogger(AuthService.class);
1111

1212
/**
1313
* <pre>

0 commit comments

Comments
 (0)