Skip to content

Commit 5668c66

Browse files
authored
resolver/manual: allow calling UpdateState with an un-Built resolver (#8150)
1 parent 5199327 commit 5668c66

File tree

16 files changed

+59
-69
lines changed

16 files changed

+59
-69
lines changed

balancer/endpointsharding/endpointsharding_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (s) TestEndpointShardingBasic(t *testing.T) {
187187

188188
// When the resolver reports an error, the picker should get updated to
189189
// return the resolver error.
190-
mr.ReportError(errors.New("test error"))
190+
mr.CC().ReportError(errors.New("test error"))
191191
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
192192
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
193193
_, err := client.EmptyCall(ctx, &testpb.Empty{})

balancer/grpclb/grpclb_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,7 @@ func (s) TestGRPCLB_Fallback(t *testing.T) {
859859
// Push another update to the resolver, this time with a valid balancer
860860
// address in the attributes field.
861861
rs = resolver.State{
862-
ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
862+
ServiceConfig: r.CC().ParseServiceConfig(grpclbConfig),
863863
Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
864864
}
865865
rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
@@ -1023,7 +1023,7 @@ func (s) TestGRPCLB_FallBackWithNoServerAddress(t *testing.T) {
10231023
// fallback and use the fallback backend.
10241024
r.UpdateState(resolver.State{
10251025
Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
1026-
ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
1026+
ServiceConfig: r.CC().ParseServiceConfig(grpclbConfig),
10271027
})
10281028

10291029
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
@@ -1051,7 +1051,7 @@ func (s) TestGRPCLB_FallBackWithNoServerAddress(t *testing.T) {
10511051
// be used.
10521052
rs := resolver.State{
10531053
Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
1054-
ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
1054+
ServiceConfig: r.CC().ParseServiceConfig(grpclbConfig),
10551055
}
10561056
rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
10571057
r.UpdateState(rs)
@@ -1112,7 +1112,7 @@ func (s) TestGRPCLB_PickFirst(t *testing.T) {
11121112

11131113
// Push a service config with grpclb as the load balancing policy and
11141114
// configure pick_first as its child policy.
1115-
rs := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)}
1115+
rs := resolver.State{ServiceConfig: r.CC().ParseServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)}
11161116

11171117
// Push a resolver update with the remote balancer address specified via
11181118
// attributes.
@@ -1152,7 +1152,7 @@ func (s) TestGRPCLB_PickFirst(t *testing.T) {
11521152
},
11531153
},
11541154
}
1155-
rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}, s)
1155+
rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC().ParseServiceConfig(grpclbConfig)}, s)
11561156
r.UpdateState(rs)
11571157
testC := testgrpc.NewTestServiceClient(cc)
11581158
if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, beServerAddrs[1:]); err != nil {
@@ -1261,7 +1261,7 @@ func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
12611261
},
12621262
},
12631263
}
1264-
rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(svcfg)}, s)
1264+
rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC().ParseServiceConfig(svcfg)}, s)
12651265
r.UpdateState(rs)
12661266
t.Log("Perform an initial RPC and expect it to succeed...")
12671267
if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
@@ -1329,7 +1329,7 @@ func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
13291329
// Push a resolver update with grpclb configuration which does not contain the
13301330
// target_name field. Our fake remote balancer is configured to always
13311331
// expect `beServerName` as the server name in the initial request.
1332-
rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)},
1332+
rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC().ParseServiceConfig(grpclbConfig)},
13331333
&grpclbstate.State{BalancerAddresses: []resolver.Address{{
13341334
Addr: tss.lbAddr,
13351335
ServerName: lbServerName,
@@ -1366,7 +1366,7 @@ func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
13661366
},
13671367
},
13681368
}
1369-
rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(lbCfg)}, s)
1369+
rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC().ParseServiceConfig(lbCfg)}, s)
13701370
r.UpdateState(rs)
13711371
select {
13721372
case <-ctx.Done():
@@ -1432,7 +1432,7 @@ func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats,
14321432
cc.Connect()
14331433
defer cc.Close()
14341434

1435-
rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
1435+
rstate := resolver.State{ServiceConfig: r.CC().ParseServiceConfig(grpclbConfig)}
14361436
r.UpdateState(grpclbstate.Set(rstate, &grpclbstate.State{BalancerAddresses: []resolver.Address{{
14371437
Addr: tss.lbAddr,
14381438
ServerName: lbServerName,

balancer/lazy/lazy_ext_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (s) TestGoodUpdateThenResolverError(t *testing.T) {
271271
defer cc.Close()
272272
cc.Connect()
273273

274-
mr.ReportError(errors.New("test error"))
274+
mr.CC().ReportError(errors.New("test error"))
275275
// The channel should remain in IDLE as the ExitIdle calls are not
276276
// propagated to the lazy balancer from the stub balancer.
277277
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
@@ -367,7 +367,7 @@ func (s) TestResolverErrorThenGoodUpdate(t *testing.T) {
367367
cc.Connect()
368368

369369
// Send an error followed by a good update.
370-
mr.ReportError(errors.New("test error"))
370+
mr.CC().ReportError(errors.New("test error"))
371371
mr.UpdateState(resolver.State{
372372
Endpoints: []resolver.Endpoint{
373373
{Addresses: []resolver.Address{{Addr: backend.Address}}},

balancer/pickfirst/pickfirst_ext_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func Test(t *testing.T) {
7373
func parseServiceConfig(t *testing.T, r *manual.Resolver, sc string) *serviceconfig.ParseResult {
7474
t.Helper()
7575

76-
scpr := r.CC.ParseServiceConfig(sc)
76+
scpr := r.CC().ParseServiceConfig(sc)
7777
if scpr.Err != nil {
7878
t.Fatalf("Failed to parse service config %q: %v", sc, scpr.Err)
7979
}
@@ -756,7 +756,7 @@ func (s) TestPickFirst_ResolverError_NoPreviousUpdate(t *testing.T) {
756756
cc, r, _ := setupPickFirst(t, 0)
757757

758758
nrErr := errors.New("error from name resolver")
759-
r.ReportError(nrErr)
759+
r.CC().ReportError(nrErr)
760760

761761
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
762762
defer cancel()
@@ -789,7 +789,7 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Ready(t *testing.T) {
789789
}
790790

791791
nrErr := errors.New("error from name resolver")
792-
r.ReportError(nrErr)
792+
r.CC().ReportError(nrErr)
793793

794794
// Ensure that RPCs continue to succeed for the next second.
795795
client := testgrpc.NewTestServiceClient(cc)
@@ -848,7 +848,7 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T)
848848
testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
849849

850850
nrErr := errors.New("error from name resolver")
851-
r.ReportError(nrErr)
851+
r.CC().ReportError(nrErr)
852852

853853
// RPCs should fail with deadline exceed error as long as they are in
854854
// CONNECTING and not the error returned by the name resolver.
@@ -909,7 +909,7 @@ func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *test
909909
// error instead of the old error that caused the channel to move to
910910
// TRANSIENT_FAILURE in the first place.
911911
nrErr := errors.New("error from name resolver")
912-
r.ReportError(nrErr)
912+
r.CC().ReportError(nrErr)
913913
client := testgrpc.NewTestServiceClient(cc)
914914
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
915915
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) {

balancer/weightedroundrobin/balancer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) {
449449

450450
// Update to per-call weights.
451451
c := svcConfig(t, perCallConfig)
452-
parsedCfg := srv1.R.CC.ParseServiceConfig(c)
452+
parsedCfg := srv1.R.CC().ParseServiceConfig(c)
453453
if parsedCfg.Err != nil {
454454
panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
455455
}
@@ -563,7 +563,7 @@ func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) {
563563
newCfg := oobConfig
564564
newCfg.ErrorUtilizationPenalty = float64p(0.9)
565565
c := svcConfig(t, newCfg)
566-
parsedCfg := srv1.R.CC.ParseServiceConfig(c)
566+
parsedCfg := srv1.R.CC().ParseServiceConfig(c)
567567
if parsedCfg.Err != nil {
568568
panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
569569
}

clientconn_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func init() {
6262
}
6363

6464
func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
65-
scpr := r.CC.ParseServiceConfig(s)
65+
scpr := r.CC().ParseServiceConfig(s)
6666
if scpr.Err != nil {
6767
panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
6868
}
@@ -666,7 +666,7 @@ func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
666666
cc.Connect()
667667
// SwitchBalancer before NewAddress. There was no balancer created, this
668668
// makes sure we don't call close on nil balancerWrapper.
669-
r.UpdateState(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbServiceConfig)}) // This should not panic.
669+
r.UpdateState(resolver.State{ServiceConfig: r.CC().ParseServiceConfig(grpclbServiceConfig)}) // This should not panic.
670670

671671
time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
672672
}
@@ -681,7 +681,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
681681
cc.Connect()
682682
// Send a new service config while closing the ClientConn.
683683
go cc.Close()
684-
go r.UpdateState(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(rrServiceConfig)}) // This should not panic.
684+
go r.UpdateState(resolver.State{ServiceConfig: r.CC().ParseServiceConfig(rrServiceConfig)}) // This should not panic.
685685
}
686686
}
687687

@@ -775,7 +775,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
775775
}
776776
defer cc.Close()
777777
cc.Connect()
778-
r.UpdateState(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(`{
778+
r.UpdateState(resolver.State{ServiceConfig: r.CC().ParseServiceConfig(`{
779779
"methodConfig": [
780780
{
781781
"name": [

internal/stubserver/stubserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (ss *StubServer) Stop() {
276276
}
277277

278278
func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
279-
g := r.CC.ParseServiceConfig(s)
279+
g := r.CC().ParseServiceConfig(s)
280280
if g.Err != nil {
281281
panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err))
282282
}

resolver/manual/manual.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type Resolver struct {
6262
// Fields actually belong to the resolver.
6363
// Guards access to below fields.
6464
mu sync.Mutex
65-
CC resolver.ClientConn
65+
cc resolver.ClientConn
6666
// Storing the most recent state update makes this resolver resilient to
6767
// restarts, which is possible with channel idleness.
6868
lastSeenState *resolver.State
@@ -78,12 +78,12 @@ func (r *Resolver) InitialState(s resolver.State) {
7878
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
7979
r.mu.Lock()
8080
defer r.mu.Unlock()
81-
// Call BuildCallback after locking to avoid a race when UpdateState
82-
// or ReportError is called before Build returns.
81+
// Call BuildCallback after locking to avoid a race when UpdateState or CC
82+
// is called before Build returns.
8383
r.BuildCallback(target, cc, opts)
84-
r.CC = cc
84+
r.cc = cc
8585
if r.lastSeenState != nil {
86-
err := r.CC.UpdateState(*r.lastSeenState)
86+
err := r.cc.UpdateState(*r.lastSeenState)
8787
go r.UpdateStateCallback(err)
8888
}
8989
return r, nil
@@ -104,25 +104,27 @@ func (r *Resolver) Close() {
104104
r.CloseCallback()
105105
}
106106

107-
// UpdateState calls CC.UpdateState.
107+
// UpdateState calls UpdateState(s) on the channel. If the resolver has not
108+
// been Built before, this instead sets the initial state of the resolver, like
109+
// InitialState.
108110
func (r *Resolver) UpdateState(s resolver.State) {
109111
r.mu.Lock()
110112
defer r.mu.Unlock()
111-
var err error
112-
if r.CC == nil {
113-
panic("cannot update state as channel has not exited IDLE state")
114-
}
115-
err = r.CC.UpdateState(s)
116113
r.lastSeenState = &s
114+
if r.cc == nil {
115+
return
116+
}
117+
err := r.cc.UpdateState(s)
117118
r.UpdateStateCallback(err)
118119
}
119120

120-
// ReportError calls CC.ReportError.
121-
func (r *Resolver) ReportError(err error) {
121+
// CC returns r's ClientConn when r was last Built. Panics if the resolver has
122+
// not been Built before.
123+
func (r *Resolver) CC() resolver.ClientConn {
122124
r.mu.Lock()
123125
defer r.mu.Unlock()
124-
if r.CC == nil {
125-
panic("cannot report error as channel has not exited IDLE state")
126+
if r.cc == nil {
127+
panic("Manual resolver instance has not yet been built.")
126128
}
127-
r.CC.ReportError(err)
129+
return r.cc
128130
}

resolver/manual/manual_test.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,14 @@ func TestResolver(t *testing.T) {
3636
},
3737
})
3838

39-
t.Run("update_state_panics", func(t *testing.T) {
39+
t.Run("cc_panics", func(t *testing.T) {
4040
defer func() {
41-
want := "cannot update state as channel has not exited IDLE state"
41+
want := "Manual resolver instance has not yet been built."
4242
if r := recover(); r != want {
4343
t.Errorf("expected panic %q, got %q", want, r)
4444
}
4545
}()
46-
r.UpdateState(resolver.State{Addresses: []resolver.Address{
47-
{Addr: "address"},
48-
{Addr: "anotheraddress"},
49-
}})
50-
})
51-
t.Run("report_error_panics", func(t *testing.T) {
52-
defer func() {
53-
want := "cannot report error as channel has not exited IDLE state"
54-
if r := recover(); r != want {
55-
t.Errorf("expected panic %q, got %q", want, r)
56-
}
57-
}()
58-
r.ReportError(errors.New("example"))
46+
r.CC()
5947
})
6048

6149
t.Run("happy_path", func(t *testing.T) {
@@ -70,6 +58,6 @@ func TestResolver(t *testing.T) {
7058
r.UpdateState(resolver.State{Addresses: []resolver.Address{
7159
{Addr: "ok"},
7260
}})
73-
r.ReportError(errors.New("example"))
61+
r.CC().ReportError(errors.New("example"))
7462
})
7563
}

resolver_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func (s) TestResolverAddressesToEndpointsUsingNewAddresses(t *testing.T) {
180180
}
181181
cc.Connect()
182182
defer cc.Close()
183-
r.CC.NewAddress(addrs)
183+
r.CC().NewAddress(addrs)
184184

185185
select {
186186
case got := <-stateCh:

test/balancer_switching_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) {
510510
// report a ready picker until we ask it to do so.
511511
r.UpdateState(resolver.State{
512512
Addresses: addrs[:1],
513-
ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())),
513+
ServiceConfig: r.CC().ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())),
514514
})
515515
select {
516516
case <-ctx.Done():

test/balancer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ func (s) TestWaitForReady(t *testing.T) {
631631
client := testgrpc.NewTestServiceClient(cc)
632632

633633
// Report an error so non-WFR RPCs will give up early.
634-
r.CC.ReportError(errors.New("fake resolver error"))
634+
r.CC().ReportError(errors.New("fake resolver error"))
635635

636636
// Ensure the client is not connected to anything and fails non-WFR RPCs.
637637
if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Unavailable {

test/parse_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func parseServiceConfig(t *testing.T, r *manual.Resolver, sc string) *serviceconfig.ParseResult {
3131
t.Helper()
3232

33-
scpr := r.CC.ParseServiceConfig(sc)
33+
scpr := r.CC().ParseServiceConfig(sc)
3434
if scpr.Err != nil {
3535
t.Fatalf("Failed to parse service config %q: %v", sc, scpr.Err)
3636
}

test/resolver_update_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) {
119119
cc.Connect()
120120
defer cc.Close()
121121

122-
scpr := r.CC.ParseServiceConfig("bad json service config")
122+
scpr := r.CC().ParseServiceConfig("bad json service config")
123123
r.UpdateState(resolver.State{ServiceConfig: scpr})
124124

125125
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@@ -205,7 +205,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
205205
// Push a resolver update and verify that our balancer receives the update.
206206
addrs := []resolver.Address{{Addr: backend.Address}}
207207
const lbCfg = "wrapping balancer LB policy config"
208-
goodSC := r.CC.ParseServiceConfig(fmt.Sprintf(`
208+
goodSC := r.CC().ParseServiceConfig(fmt.Sprintf(`
209209
{
210210
"loadBalancingConfig": [
211211
{
@@ -244,7 +244,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
244244
// Push a bad resolver update and ensure that the update is propagated to our
245245
// stub balancer. But since the pushed update contains an invalid service
246246
// config, our balancer should continue to see the old loadBalancingConfig.
247-
badSC := r.CC.ParseServiceConfig("bad json service config")
247+
badSC := r.CC().ParseServiceConfig("bad json service config")
248248
wantCCS.ResolverState.ServiceConfig = badSC
249249
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: badSC})
250250
ccs, err = ccUpdateCh.Receive(ctx)

0 commit comments

Comments
 (0)