Skip to content

Commit 0dc2b40

Browse files
authored
Add retry logic of connection lifetime to cluster client and sentinel client (#833)
* Add retry logic of connection lifetime to cluster client * Add retry logic of connection lifetime to sentinel client * Fix retry on RedirectMove * Fix retry of asking of cluster client * Add test for asking of cluster client * Fix transaction block for cluster * Add test of transaction block when using connection lifetime * Fix transaction blcok with connection lifetime for sentinel
1 parent 439ca07 commit 0dc2b40

File tree

4 files changed

+888
-10
lines changed

4 files changed

+888
-10
lines changed

cluster.go

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type clusterClient struct {
3535
stop uint32
3636
cmd Builder
3737
retry bool
38+
hasLftm bool
3839
}
3940

4041
// NOTE: connrole and conn must be initialized at the same time
@@ -57,6 +58,7 @@ func newClusterClient(opt *ClientOption, connFn connFn, retryer retryHandler) (*
5758
retry: !opt.DisableRetry,
5859
retryHandler: retryer,
5960
stopCh: make(chan struct{}),
61+
hasLftm: opt.ConnLifetime > 0,
6062
}
6163

6264
if opt.ReplicaOnly && opt.SendToReplicas != nil {
@@ -514,14 +516,27 @@ retry:
514516
return newErrResult(err)
515517
}
516518
resp = cc.Do(ctx, cmd)
519+
if resp.Error() == errConnExpired {
520+
goto retry
521+
}
517522
process:
518523
switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode {
519524
case RedirectMove:
520-
resp = c.redirectOrNew(addr, cc, cmd.Slot(), mode).Do(ctx, cmd)
525+
ncc := c.redirectOrNew(addr, cc, cmd.Slot(), mode)
526+
recover1:
527+
resp = ncc.Do(ctx, cmd)
528+
if resp.Error() == errConnExpired {
529+
goto recover1
530+
}
521531
goto process
522532
case RedirectAsk:
523-
results := c.redirectOrNew(addr, cc, cmd.Slot(), mode).DoMulti(ctx, cmds.AskingCmd, cmd)
533+
ncc := c.redirectOrNew(addr, cc, cmd.Slot(), mode)
534+
recover2:
535+
results := ncc.DoMulti(ctx, cmds.AskingCmd, cmd)
524536
resp = results.s[1]
537+
if resp.Error() == errConnExpired {
538+
goto recover2
539+
}
525540
resultsp.Put(results)
526541
goto process
527542
case RedirectRetry:
@@ -755,11 +770,38 @@ func (c *clusterClient) doretry(
755770
clean := true
756771
if len(re.commands) != 0 {
757772
resps := cc.DoMulti(ctx, re.commands...)
773+
if c.hasLftm {
774+
var ml []Completed
775+
recover:
776+
ml = ml[:0]
777+
var txIdx int // check transaction block, if zero then not in transaction
778+
for i, resp := range resps.s {
779+
if resp.Error() == errConnExpired {
780+
if txIdx > 0 {
781+
ml = re.commands[txIdx:]
782+
} else {
783+
ml = re.commands[i:]
784+
}
785+
break
786+
}
787+
// if no error then check if transaction block
788+
if isMulti(re.commands[i]) {
789+
txIdx = i
790+
} else if isExec(re.commands[i]) {
791+
txIdx = 0
792+
}
793+
}
794+
if len(ml) > 0 {
795+
rs := cc.DoMulti(ctx, ml...).s
796+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
797+
goto recover
798+
}
799+
}
758800
clean = c.doresultfn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts, hasInit)
759801
resultsp.Put(resps)
760802
}
761803
if len(re.cAskings) != 0 {
762-
resps := askingMulti(cc, ctx, re.cAskings)
804+
resps := c.askingMulti(cc, ctx, re.cAskings)
763805
clean = c.doresultfn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts, hasInit) && clean
764806
resultsp.Put(resps)
765807
}
@@ -845,13 +887,21 @@ retry:
845887
return newErrResult(err)
846888
}
847889
resp = cc.DoCache(ctx, cmd, ttl)
890+
if resp.Error() == errConnExpired {
891+
goto retry
892+
}
848893
process:
849894
switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode {
850895
case RedirectMove:
851-
resp = c.redirectOrNew(addr, cc, cmd.Slot(), mode).DoCache(ctx, cmd, ttl)
896+
ncc := c.redirectOrNew(addr, cc, cmd.Slot(), mode)
897+
recover:
898+
resp = ncc.DoCache(ctx, cmd, ttl)
899+
if resp.Error() == errConnExpired {
900+
goto recover
901+
}
852902
goto process
853903
case RedirectAsk:
854-
results := askingMultiCache(c.redirectOrNew(addr, cc, cmd.Slot(), mode), ctx, []CacheableTTL{CT(cmd, ttl)})
904+
results := c.askingMultiCache(c.redirectOrNew(addr, cc, cmd.Slot(), mode), ctx, []CacheableTTL{CT(cmd, ttl)})
855905
resp = results.s[0]
856906
resultsp.Put(results)
857907
goto process
@@ -875,7 +925,7 @@ func (c *clusterClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dur
875925
return resp
876926
}
877927

878-
func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults {
928+
func (c *clusterClient) askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults {
879929
var inTx bool
880930
commands := make([]Completed, 0, len(multi)*2)
881931
for _, cmd := range multi {
@@ -889,6 +939,26 @@ func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults
889939
}
890940
results := resultsp.Get(0, len(multi))
891941
resps := cc.DoMulti(ctx, commands...)
942+
if c.hasLftm {
943+
var ml []Completed
944+
recover:
945+
ml = ml[:0]
946+
var askingIdx int
947+
for i, resp := range resps.s {
948+
if commands[i] == cmds.AskingCmd {
949+
askingIdx = i
950+
}
951+
if resp.Error() == errConnExpired {
952+
ml = commands[askingIdx:]
953+
break
954+
}
955+
}
956+
if len(ml) > 0 {
957+
rs := cc.DoMulti(ctx, ml...).s
958+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
959+
goto recover
960+
}
961+
}
892962
for i, resp := range resps.s {
893963
if commands[i] != cmds.AskingCmd {
894964
results.s = append(results.s, resp)
@@ -898,14 +968,30 @@ func askingMulti(cc conn, ctx context.Context, multi []Completed) *redisresults
898968
return results
899969
}
900970

901-
func askingMultiCache(cc conn, ctx context.Context, multi []CacheableTTL) *redisresults {
971+
func (c *clusterClient) askingMultiCache(cc conn, ctx context.Context, multi []CacheableTTL) *redisresults {
902972
commands := make([]Completed, 0, len(multi)*6)
903973
for _, cmd := range multi {
904974
ck, _ := cmds.CacheKey(cmd.Cmd)
905975
commands = append(commands, cc.OptInCmd(), cmds.AskingCmd, cmds.MultiCmd, cmds.NewCompleted([]string{"PTTL", ck}), Completed(cmd.Cmd), cmds.ExecCmd)
906976
}
907977
results := resultsp.Get(0, len(multi))
908978
resps := cc.DoMulti(ctx, commands...)
979+
if c.hasLftm {
980+
var ml []Completed
981+
recover:
982+
ml = ml[:0]
983+
for i := 5; i < len(resps.s); i += 6 { // check exec command error only
984+
if resps.s[i].Error() == errConnExpired {
985+
ml = commands[i-5:]
986+
break
987+
}
988+
}
989+
if len(ml) > 0 {
990+
rs := cc.DoMulti(ctx, ml...).s
991+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
992+
goto recover
993+
}
994+
}
909995
for i := 5; i < len(resps.s); i += 6 {
910996
if arr, err := resps.s[i].ToArray(); err != nil {
911997
if preErr := resps.s[i-1].Error(); preErr != nil { // if {Cmd} get a RedisError
@@ -1049,11 +1135,27 @@ func (c *clusterClient) doretrycache(
10491135
clean := true
10501136
if len(re.commands) != 0 {
10511137
resps := cc.DoMultiCache(ctx, re.commands...)
1138+
if c.hasLftm {
1139+
var ml []CacheableTTL
1140+
recover:
1141+
ml = ml[:0]
1142+
for i, resp := range resps.s {
1143+
if resp.Error() == errConnExpired {
1144+
ml = re.commands[i:]
1145+
break
1146+
}
1147+
}
1148+
if len(ml) > 0 {
1149+
rs := cc.DoMultiCache(ctx, ml...).s
1150+
resps.s = append(resps.s[:len(resps.s)-len(rs)], rs...)
1151+
goto recover
1152+
}
1153+
}
10521154
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.cIndexes, re.commands, resps.s, attempts)
10531155
resultsp.Put(resps)
10541156
}
10551157
if len(re.cAskings) != 0 {
1056-
resps := askingMultiCache(cc, ctx, re.cAskings)
1158+
resps := c.askingMultiCache(cc, ctx, re.cAskings)
10571159
clean = c.resultcachefn(ctx, results, retries, mu, cc, re.aIndexes, re.cAskings, resps.s, attempts) && clean
10581160
resultsp.Put(resps)
10591161
}
@@ -1130,6 +1232,9 @@ retry:
11301232
goto ret
11311233
}
11321234
err = cc.Receive(ctx, subscribe, fn)
1235+
if err == errConnExpired {
1236+
goto retry
1237+
}
11331238
if _, mode := c.shouldRefreshRetry(err, ctx); c.retry && mode != RedirectNone {
11341239
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
11351240
if shouldRetry {

0 commit comments

Comments
 (0)