24
24
import com .google .cloud .spanner .SpannerImpl .ClosedException ;
25
25
import com .google .cloud .spanner .Statement .StatementFactory ;
26
26
import com .google .common .annotations .VisibleForTesting ;
27
- import com .google .common .base .Function ;
28
27
import com .google .common .util .concurrent .ListenableFuture ;
29
28
import com .google .spanner .v1 .BatchWriteResponse ;
30
29
import io .opentelemetry .api .common .Attributes ;
30
+ import java .util .ArrayList ;
31
+ import java .util .Arrays ;
32
+ import java .util .Objects ;
31
33
import java .util .concurrent .ExecutionException ;
32
34
import java .util .concurrent .Future ;
33
35
import java .util .concurrent .TimeUnit ;
34
36
import java .util .concurrent .TimeoutException ;
37
+ import java .util .concurrent .atomic .AtomicInteger ;
38
+ import java .util .function .BiFunction ;
35
39
import javax .annotation .Nullable ;
36
40
37
41
class DatabaseClientImpl implements DatabaseClient {
@@ -45,6 +49,8 @@ class DatabaseClientImpl implements DatabaseClient {
45
49
@ VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient ;
46
50
@ VisibleForTesting final boolean useMultiplexedSessionPartitionedOps ;
47
51
@ VisibleForTesting final boolean useMultiplexedSessionForRW ;
52
+ private final int dbId ;
53
+ private final AtomicInteger nthRequest ;
48
54
49
55
final boolean useMultiplexedSessionBlindWrite ;
50
56
@@ -91,6 +97,18 @@ class DatabaseClientImpl implements DatabaseClient {
91
97
this .tracer = tracer ;
92
98
this .useMultiplexedSessionForRW = useMultiplexedSessionForRW ;
93
99
this .commonAttributes = commonAttributes ;
100
+
101
+ this .dbId = this .dbIdFromClientId (this .clientId );
102
+ this .nthRequest = new AtomicInteger (0 );
103
+ }
104
+
105
+ private int dbIdFromClientId (String clientId ) {
106
+ int i = clientId .indexOf ("-" );
107
+ String strWithValue = clientId .substring (i + 1 );
108
+ if (Objects .equals (strWithValue , "" )) {
109
+ strWithValue = "0" ;
110
+ }
111
+ return Integer .parseInt (strWithValue );
94
112
}
95
113
96
114
@ VisibleForTesting
@@ -188,7 +206,11 @@ public CommitResponse writeWithOptions(
188
206
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
189
207
return getMultiplexedSessionDatabaseClient ().writeWithOptions (mutations , options );
190
208
}
191
- return runWithSessionRetry (session -> session .writeWithOptions (mutations , options ));
209
+
210
+ return runWithSessionRetry (
211
+ (session , reqId ) -> {
212
+ return session .writeWithOptions (mutations , withReqId (reqId , options ));
213
+ });
192
214
} catch (RuntimeException e ) {
193
215
span .setStatus (e );
194
216
throw e ;
@@ -213,7 +235,8 @@ public CommitResponse writeAtLeastOnceWithOptions(
213
235
.writeAtLeastOnceWithOptions (mutations , options );
214
236
}
215
237
return runWithSessionRetry (
216
- session -> session .writeAtLeastOnceWithOptions (mutations , options ));
238
+ (session , reqId ) ->
239
+ session .writeAtLeastOnceWithOptions (mutations , withReqId (reqId , options )));
217
240
} catch (RuntimeException e ) {
218
241
span .setStatus (e );
219
242
throw e ;
@@ -222,6 +245,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
222
245
}
223
246
}
224
247
248
+ private int nextNthRequest () {
249
+ return this .nthRequest .incrementAndGet ();
250
+ }
251
+
225
252
@ Override
226
253
public ServerStream <BatchWriteResponse > batchWriteAtLeastOnce (
227
254
final Iterable <MutationGroup > mutationGroups , final TransactionOption ... options )
@@ -231,7 +258,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
231
258
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
232
259
return getMultiplexedSessionDatabaseClient ().batchWriteAtLeastOnce (mutationGroups , options );
233
260
}
234
- return runWithSessionRetry (session -> session .batchWriteAtLeastOnce (mutationGroups , options ));
261
+ return runWithSessionRetry (
262
+ (session , reqId ) ->
263
+ session .batchWriteAtLeastOnce (mutationGroups , withReqId (reqId , options )));
235
264
} catch (RuntimeException e ) {
236
265
span .setStatus (e );
237
266
throw e ;
@@ -383,27 +412,57 @@ private Future<Dialect> getDialectAsync() {
383
412
return pool .getDialectAsync ();
384
413
}
385
414
415
+ private UpdateOption [] withReqId (
416
+ final XGoogSpannerRequestId reqId , final UpdateOption ... options ) {
417
+ if (reqId == null ) {
418
+ return options ;
419
+ }
420
+ ArrayList <UpdateOption > allOptions = new ArrayList (Arrays .asList (options ));
421
+ allOptions .add (new Options .RequestIdOption (reqId ));
422
+ return allOptions .toArray (new UpdateOption [0 ]);
423
+ }
424
+
425
+ private TransactionOption [] withReqId (
426
+ final XGoogSpannerRequestId reqId , final TransactionOption ... options ) {
427
+ if (reqId == null ) {
428
+ return options ;
429
+ }
430
+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
431
+ allOptions .add (new Options .RequestIdOption (reqId ));
432
+ return allOptions .toArray (new TransactionOption [0 ]);
433
+ }
434
+
386
435
private long executePartitionedUpdateWithPooledSession (
387
436
final Statement stmt , final UpdateOption ... options ) {
388
437
ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION , commonAttributes );
389
438
try (IScope s = tracer .withSpan (span )) {
390
- return runWithSessionRetry (session -> session .executePartitionedUpdate (stmt , options ));
439
+ return runWithSessionRetry (
440
+ (session , reqId ) -> {
441
+ return session .executePartitionedUpdate (stmt , withReqId (reqId , options ));
442
+ });
391
443
} catch (RuntimeException e ) {
392
444
span .setStatus (e );
393
445
span .end ();
394
446
throw e ;
395
447
}
396
448
}
397
449
398
- private <T > T runWithSessionRetry (Function <Session , T > callable ) {
450
+ private <T > T runWithSessionRetry (BiFunction <Session , XGoogSpannerRequestId , T > callable ) {
399
451
PooledSessionFuture session = getSession ();
452
+ XGoogSpannerRequestId reqId =
453
+ XGoogSpannerRequestId .of (
454
+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
400
455
while (true ) {
401
456
try {
402
- return callable .apply (session );
457
+ reqId .incrementAttempt ();
458
+ return callable .apply (session , reqId );
403
459
} catch (SessionNotFoundException e ) {
404
460
session =
405
461
(PooledSessionFuture )
406
462
pool .getPooledSessionReplacementHandler ().replaceSession (e , session );
463
+ reqId =
464
+ XGoogSpannerRequestId .of (
465
+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
407
466
}
408
467
}
409
468
}
0 commit comments