23
23
import com .google .cloud .spanner .SessionPool .PooledSessionFuture ;
24
24
import com .google .cloud .spanner .SpannerImpl .ClosedException ;
25
25
import com .google .common .annotations .VisibleForTesting ;
26
- import com .google .common .base .Function ;
27
26
import com .google .common .util .concurrent .ListenableFuture ;
28
27
import com .google .spanner .v1 .BatchWriteResponse ;
29
28
import io .opentelemetry .api .common .Attributes ;
29
+ import java .util .ArrayList ;
30
+ import java .util .Arrays ;
31
+ import java .util .Objects ;
32
+ import java .util .concurrent .atomic .AtomicInteger ;
33
+ import java .util .function .BiFunction ;
30
34
import javax .annotation .Nullable ;
31
35
32
36
class DatabaseClientImpl implements DatabaseClient {
@@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
40
44
@ VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient ;
41
45
@ VisibleForTesting final boolean useMultiplexedSessionPartitionedOps ;
42
46
@ VisibleForTesting final boolean useMultiplexedSessionForRW ;
47
+ private final int dbId ;
48
+ private final AtomicInteger nthRequest ;
43
49
44
50
final boolean useMultiplexedSessionBlindWrite ;
45
51
@@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
86
92
this .tracer = tracer ;
87
93
this .useMultiplexedSessionForRW = useMultiplexedSessionForRW ;
88
94
this .commonAttributes = commonAttributes ;
95
+
96
+ this .dbId = this .dbIdFromClientId (this .clientId );
97
+ this .nthRequest = new AtomicInteger (0 );
98
+ }
99
+
100
+ private int dbIdFromClientId (String clientId ) {
101
+ int i = clientId .indexOf ("-" );
102
+ String strWithValue = clientId .substring (i + 1 );
103
+ if (Objects .equals (strWithValue , "" )) {
104
+ strWithValue = "0" ;
105
+ }
106
+ return Integer .parseInt (strWithValue );
89
107
}
90
108
91
109
@ VisibleForTesting
@@ -159,7 +177,11 @@ public CommitResponse writeWithOptions(
159
177
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
160
178
return getMultiplexedSessionDatabaseClient ().writeWithOptions (mutations , options );
161
179
}
162
- return runWithSessionRetry (session -> session .writeWithOptions (mutations , options ));
180
+
181
+ return runWithSessionRetry (
182
+ (session , reqId ) -> {
183
+ return session .writeWithOptions (mutations , withReqId (reqId , options ));
184
+ });
163
185
} catch (RuntimeException e ) {
164
186
span .setStatus (e );
165
187
throw e ;
@@ -177,14 +199,23 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
177
199
public CommitResponse writeAtLeastOnceWithOptions (
178
200
final Iterable <Mutation > mutations , final TransactionOption ... options )
179
201
throws SpannerException {
202
+ return doWriteAtLeastOnceWithOptions (mutations , options );
203
+ }
204
+
205
+ private CommitResponse doWriteAtLeastOnceWithOptions (
206
+ final Iterable <Mutation > mutations , final TransactionOption ... options )
207
+ throws SpannerException {
180
208
ISpan span = tracer .spanBuilder (READ_WRITE_TRANSACTION , commonAttributes , options );
181
209
try (IScope s = tracer .withSpan (span )) {
182
210
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient () != null ) {
183
211
return getMultiplexedSessionDatabaseClient ()
184
212
.writeAtLeastOnceWithOptions (mutations , options );
185
213
}
214
+
186
215
return runWithSessionRetry (
187
- session -> session .writeAtLeastOnceWithOptions (mutations , options ));
216
+ (session , reqId ) -> {
217
+ return session .writeAtLeastOnceWithOptions (mutations , withReqId (reqId , options ));
218
+ });
188
219
} catch (RuntimeException e ) {
189
220
span .setStatus (e );
190
221
throw e ;
@@ -193,6 +224,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
193
224
}
194
225
}
195
226
227
+ private int nextNthRequest () {
228
+ return this .nthRequest .incrementAndGet ();
229
+ }
230
+
196
231
@ Override
197
232
public ServerStream <BatchWriteResponse > batchWriteAtLeastOnce (
198
233
final Iterable <MutationGroup > mutationGroups , final TransactionOption ... options )
@@ -202,7 +237,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
202
237
if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
203
238
return getMultiplexedSessionDatabaseClient ().batchWriteAtLeastOnce (mutationGroups , options );
204
239
}
205
- return runWithSessionRetry (session -> session .batchWriteAtLeastOnce (mutationGroups , options ));
240
+ return runWithSessionRetry (
241
+ (session , reqId ) ->
242
+ session .batchWriteAtLeastOnce (mutationGroups , withReqId (reqId , options )));
206
243
} catch (RuntimeException e ) {
207
244
span .setStatus (e );
208
245
throw e ;
@@ -346,27 +383,57 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
346
383
return executePartitionedUpdateWithPooledSession (stmt , options );
347
384
}
348
385
386
+ private UpdateOption [] withReqId (
387
+ final XGoogSpannerRequestId reqId , final UpdateOption ... options ) {
388
+ if (reqId == null ) {
389
+ return options ;
390
+ }
391
+ ArrayList <UpdateOption > allOptions = new ArrayList (Arrays .asList (options ));
392
+ allOptions .add (new Options .RequestIdOption (reqId ));
393
+ return allOptions .toArray (new UpdateOption [0 ]);
394
+ }
395
+
396
+ private TransactionOption [] withReqId (
397
+ final XGoogSpannerRequestId reqId , final TransactionOption ... options ) {
398
+ if (reqId == null ) {
399
+ return options ;
400
+ }
401
+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
402
+ allOptions .add (new Options .RequestIdOption (reqId ));
403
+ return allOptions .toArray (new TransactionOption [0 ]);
404
+ }
405
+
349
406
private long executePartitionedUpdateWithPooledSession (
350
407
final Statement stmt , final UpdateOption ... options ) {
351
408
ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION , commonAttributes );
352
409
try (IScope s = tracer .withSpan (span )) {
353
- return runWithSessionRetry (session -> session .executePartitionedUpdate (stmt , options ));
410
+ return runWithSessionRetry (
411
+ (session , reqId ) -> {
412
+ return session .executePartitionedUpdate (stmt , withReqId (reqId , options ));
413
+ });
354
414
} catch (RuntimeException e ) {
355
415
span .setStatus (e );
356
416
span .end ();
357
417
throw e ;
358
418
}
359
419
}
360
420
361
- private <T > T runWithSessionRetry (Function <Session , T > callable ) {
421
+ private <T > T runWithSessionRetry (BiFunction <Session , XGoogSpannerRequestId , T > callable ) {
362
422
PooledSessionFuture session = getSession ();
423
+ XGoogSpannerRequestId reqId =
424
+ XGoogSpannerRequestId .of (
425
+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
363
426
while (true ) {
364
427
try {
365
- return callable .apply (session );
428
+ reqId .incrementAttempt ();
429
+ return callable .apply (session , reqId );
366
430
} catch (SessionNotFoundException e ) {
367
431
session =
368
432
(PooledSessionFuture )
369
433
pool .getPooledSessionReplacementHandler ().replaceSession (e , session );
434
+ reqId =
435
+ XGoogSpannerRequestId .of (
436
+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
370
437
}
371
438
}
372
439
}
0 commit comments