Skip to content

chore: wire up x-goog-spanner-request-id for BatchCreateSessions #3677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
Expand All @@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;
private final int dbId;
private final AtomicInteger nthRequest;

final boolean useMultiplexedSessionBlindWrite;

Expand Down Expand Up @@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;

this.dbId = this.dbIdFromClientId(this.clientId);
this.nthRequest = new AtomicInteger(0);
}

private int dbIdFromClientId(String clientId) {
int i = clientId.indexOf("-");
String strWithValue = clientId.substring(i + 1);
if (Objects.equals(strWithValue, "")) {
strWithValue = "0";
}
return Integer.parseInt(strWithValue);
}

@VisibleForTesting
Expand Down Expand Up @@ -179,8 +197,21 @@ public CommitResponse writeAtLeastOnceWithOptions(
return getMultiplexedSessionDatabaseClient()
.writeAtLeastOnceWithOptions(mutations, options);
}

int nthRequest = this.nextNthRequest();
int channelId = 1; /* TODO: infer the channelId from the gRPC channel of the session */
XGoogSpannerRequestId reqId = XGoogSpannerRequestId.of(this.dbId, channelId, nthRequest, 0);

return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
(session) -> {
reqId.incrementAttempt();
// TODO: Update the channelId depending on the session that is inferred.
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
System.out.println("\033[35msession.class: " + session.getClass() + "\033[00m");
allOptions.add(new Options.RequestIdOption(reqId));
return session.writeAtLeastOnceWithOptions(
mutations, allOptions.toArray(new TransactionOption[0]));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -189,6 +220,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private int nextNthRequest() {
return this.nthRequest.incrementAndGet();
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ void appendToOptions(Options options) {
private RpcOrderBy orderBy;
private RpcLockHint lockHint;
private Boolean lastStatement;
private XGoogSpannerRequestId reqId;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -568,6 +569,14 @@ String pageToken() {
return pageToken;
}

boolean hasReqId() {
return reqId != null;
}

XGoogSpannerRequestId reqId() {
return reqId;
}

boolean hasFilter() {
return filter != null;
}
Expand Down Expand Up @@ -1018,4 +1027,30 @@ public boolean equals(Object o) {
return o instanceof LastStatementUpdateOption;
}
}

static final class RequestIdOption extends InternalOption
implements TransactionOption, UpdateOption {
private final XGoogSpannerRequestId reqId;

RequestIdOption(XGoogSpannerRequestId reqId) {
this.reqId = reqId;
}

@Override
void appendToOptions(Options options) {
options.reqId = this.reqId;
}

@Override
public int hashCode() {
return RequestIdOption.class.hashCode();
}

@Override
public boolean equals(Object o) {
// TODO: Examine why the precedent for LastStatementUpdateOption
// does not check against the actual value.
return o instanceof RequestIdOption;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;

/** Client for creating single sessions and batches of sessions. */
class SessionClient implements AutoCloseable {
class SessionClient implements AutoCloseable, XGoogSpannerRequestId.RequestIdCreator {
static class SessionId {
private static final PathTemplate NAME_TEMPLATE =
PathTemplate.create(
Expand Down Expand Up @@ -174,6 +175,12 @@ interface SessionConsumer {
private final DatabaseId db;
private final Attributes commonAttributes;

// SessionClient is created long before a DatabaseClientImpl is created,
// as batch sessions are firstly created then later attached to each Client.
private static AtomicInteger NTH_ID = new AtomicInteger(0);
private final int nthId;
private final AtomicInteger nthRequest;

@GuardedBy("this")
private volatile long sessionChannelCounter;

Expand All @@ -186,6 +193,8 @@ interface SessionConsumer {
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
this.nthId = SessionClient.NTH_ID.incrementAndGet();
this.nthRequest = new AtomicInteger(0);
}

@Override
Expand All @@ -201,28 +210,38 @@ DatabaseId getDatabaseId() {
return db;
}

@Override
public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
return XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelId, 1);
}

/** Create a single session. */
SessionImpl createSession() {
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
// which is also a valid channel hint.
final Map<SpannerRpc.Option, ?> options;
final long channelId;
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
channelId = sessionChannelCounter;
}
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
XGoogSpannerRequestId reqId = this.nextRequestId(channelId, 1);
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
reqId.withOptions(options));
SessionReference sessionReference =
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
return new SessionImpl(spanner, sessionReference);
SessionImpl sessionImpl = new SessionImpl(spanner, sessionReference);
sessionImpl.setRequestIdCreator(this);
return sessionImpl;
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -273,6 +292,7 @@ SessionImpl createMultiplexedSession() {
spanner,
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
sessionImpl.setRequestIdCreator(this);
span.addAnnotation(
String.format("Request for %d multiplexed session returned %d session", 1, 1));
return sessionImpl;
Expand Down Expand Up @@ -387,6 +407,8 @@ private List<SessionImpl> internalBatchCreateSessions(
.spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
try (IScope s = spanner.getTracer().withSpan(span)) {
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
List<com.google.spanner.v1.Session> sessions =
spanner
.getRpc()
Expand All @@ -395,21 +417,20 @@ private List<SessionImpl> internalBatchCreateSessions(
sessionCount,
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
reqId.withOptions(options));
span.addAnnotation(
String.format(
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
span.end();
List<SessionImpl> res = new ArrayList<>(sessionCount);
for (com.google.spanner.v1.Session session : sessions) {
res.add(
SessionImpl sessionImpl =
new SessionImpl(
spanner,
new SessionReference(
session.getName(),
session.getCreateTime(),
session.getMultiplexed(),
options)));
session.getName(), session.getCreateTime(), session.getMultiplexed(), options));
sessionImpl.setRequestIdCreator(this);
res.add(sessionImpl);
}
return res;
} catch (RuntimeException e) {
Expand All @@ -425,6 +446,8 @@ SessionImpl sessionWithId(String name) {
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
}
return new SessionImpl(spanner, new SessionReference(name, options));
SessionImpl sessionImpl = new SessionImpl(spanner, new SessionReference(name, options));
sessionImpl.setRequestIdCreator(this);
return sessionImpl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,31 @@ interface SessionTransaction {
private final Clock clock;
private final Map<SpannerRpc.Option, ?> options;
private final ErrorHandler errorHandler;
private XGoogSpannerRequestId.RequestIdCreator requestIdCreator;

SessionImpl(SpannerImpl spanner, SessionReference sessionReference) {
this(spanner, sessionReference, NO_CHANNEL_HINT);
}

SessionImpl(SpannerImpl spanner, SessionReference sessionReference, int channelHint) {
this(spanner, sessionReference, channelHint, new XGoogSpannerRequestId.NoopRequestIdCreator());
}

SessionImpl(
SpannerImpl spanner,
SessionReference sessionReference,
int channelHint,
XGoogSpannerRequestId.RequestIdCreator requestIdCreator) {
this.spanner = spanner;
this.tracer = spanner.getTracer();
this.sessionReference = sessionReference;
this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
this.options = createOptions(sessionReference, channelHint);
this.errorHandler = createErrorHandler(spanner.getOptions());
this.requestIdCreator = requestIdCreator;
if (this.requestIdCreator == null) {
this.requestIdCreator = new XGoogSpannerRequestId.NoopRequestIdCreator();
}
}

static Map<SpannerRpc.Option, ?> createOptions(
Expand Down Expand Up @@ -269,8 +282,14 @@ public CommitResponse writeAtLeastOnceWithOptions(
CommitRequest request = requestBuilder.build();
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
try (IScope s = tracer.withSpan(span)) {
// TODO: Derive the channelId from the session being used currently.
XGoogSpannerRequestId reqId = this.requestIdCreator.nextRequestId(1 /* channelId */, 0);
return SpannerRetryHelper.runTxWithRetriesOnAborted(
() -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
() -> {
reqId.incrementAttempt();
return new CommitResponse(
spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -530,4 +549,8 @@ void onTransactionDone() {}
TraceWrapper getTracer() {
return tracer;
}

public void setRequestIdCreator(XGoogSpannerRequestId.RequestIdCreator creator) {
this.requestIdCreator = creator;
}
}
Loading