Skip to content

Commit 3a8abf2

Browse files
committed
add leaky bucket throttler
make generic cell rate throttler atomic
1 parent 2730a0c commit 3a8abf2

File tree

4 files changed

+135
-19
lines changed

4 files changed

+135
-19
lines changed

README.MD

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func WithPriority(ctx context.Context, priority uint8) context.Context
7373
func WithWeight(ctx context.Context, weight int64) context.Context
7474
// WithKey adds the provided key to the provided context
7575
// to add additional call identifier to context.
76-
// Resulted context is used by: `before`, `after`, `timed`, `adaptive`, `semaphore` and `cellrate` throtttlers.
76+
// Resulted context is used by: `before`, `after`, `timed`, `adaptive`, `semaphore`, `cellrate` and `bucket` throtttlers.
7777
func WithKey(ctx context.Context, key string) context.Context
7878
// WithMessage adds the provided message to the provided context
7979
// to add additional message that need to be used to context.
@@ -167,6 +167,7 @@ You can find list of returning error types for all existing throttlers in thrott
167167
| generator | `func NewThrottlerGenerator(gen Generator, capacity uint64, eviction float64) Throttler` | Creates new throttler instance that throttles if found key matching throttler throttles.<br> If no key matching throttler has been found generator used insted to provide new throttler that will be added to existing throttlers map.<br> Generated throttlers are kept in bounded map with capacity *c* defined by the specified capacity and eviction rate *e* defined by specified eviction value is normalized to [0.0, 1.0], where eviction rate affects number of throttlers that will be removed from the map after bounds overflow.<br> Use `WithKey` to specify key for throttler matching and generation.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |
168168
| semaphore | `func NewThrottlerSemaphore(weight int64) Throttler` | Creates new throttler instance that throttles call if underlying semaphore throttles.<br>Use `WithWeight` to override context call weight, 1 by default.<br> - could return `ErrorThreshold`; |
169169
| cellrate | `func NewThrottlerCellRate(threshold uint64, interval time.Duration, monotone bool) Throttler` | Creates new throttler instance that uses generic cell rate algorithm to throttles call within provided interval and threshold.<br>If provided monotone flag is set class to release will have no effect on throttler.<br>Use `WithWeight` to override context call qunatity, 1 by default.<br> - could return `ErrorThreshold`; |
170+
| bucket | `func NewThrottlerBucket(threshold uint64, interval time.Duration, monotone bool) Throttler` | Creates new throttler instance that leaky bucket algorithm to throttles call within provided interval and threshold.<br>If provided monotone flag is set class to release will have no effect on throttler.<br>Use `WithWeight` to override context call qunatity, 1 by default.<br> - could return `ErrorThreshold`; |
170171

171172
## Integrations
172173

context.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func ctxPriority(ctx context.Context, limit uint8) uint8 {
5050

5151
// WithWeight adds the provided weight to the provided context
5252
// to differ `Acquire` weight levels.
53-
// Resulted context is used by: `before`, `after`, `timed`, `adaptive`, `semaphore` and `cellrate` throtttlers.
53+
// Resulted context is used by: `before`, `after`, `timed`, `adaptive`, `semaphore`, `cellrate` and `bucket` throtttlers.
5454
func WithWeight(ctx context.Context, weight int64) context.Context {
5555
return context.WithValue(ctx, ghctxweight, weight)
5656
}
@@ -64,6 +64,15 @@ func ctxWeight(ctx context.Context) int64 {
6464
return 1
6565
}
6666

67+
func ctxWeightMod(ctx context.Context) int64 {
68+
if val := ctx.Value(ghctxweight); val != nil {
69+
if weight, ok := val.(int64); ok && weight > 0 {
70+
return weight
71+
}
72+
}
73+
return 1
74+
}
75+
6776
// WithKey adds the provided key to the provided context
6877
// to add additional call identifier to context.
6978
// Resulted context is used by: `pattern` and `generator` throtttlers.

throttlers.go

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,7 @@ func NewThrottlerSemaphore(weight int64) Throttler {
10951095
}
10961096

10971097
func (thr tsemaphore) Acquire(ctx context.Context) error {
1098-
if ok := thr.sem.TryAcquire(ctxWeight(ctx)); !ok {
1098+
if ok := thr.sem.TryAcquire(ctxWeightMod(ctx)); !ok {
10991099
return ErrorThreshold{
11001100
Throttler: "semaphore",
11011101
Threshold: strbool(ok),
@@ -1107,7 +1107,7 @@ func (thr tsemaphore) Acquire(ctx context.Context) error {
11071107
func (thr tsemaphore) Release(ctx context.Context) error {
11081108
// prevent over releasing panic.
11091109
defer func() { _ = recover() }()
1110-
thr.sem.Release(ctxWeight(ctx))
1110+
thr.sem.Release(ctxWeightMod(ctx))
11111111
return nil
11121112
}
11131113

@@ -1129,21 +1129,20 @@ func NewThrottlerCellRate(threshold uint64, interval time.Duration, monotone boo
11291129
}
11301130

11311131
func (thr *tcellrate) Acquire(ctx context.Context) error {
1132-
current := atomicGet(&thr.current)
11331132
nowTs := uint64(time.Now().UTC().UnixNano())
1134-
if current < nowTs {
1135-
current = nowTs
1133+
delta := (uint64(thr.quantum) * uint64(ctxWeightMod(ctx)))
1134+
if current := atomicGet(&thr.current); current < nowTs {
1135+
delta += nowTs - current
11361136
}
1137-
updated := current + (uint64(thr.quantum) * uint64(ctxWeight(ctx)))
11381137
max := nowTs + (uint64(thr.quantum) * thr.threshold)
1139-
if updated > max {
1140-
current := uint64(math.Round(float64(updated-nowTs) / float64(thr.quantum)))
1138+
if current := atomicBAdd(&thr.current, delta); current > max {
1139+
atomicBSub(&thr.current, delta)
1140+
cur := thr.threshold + uint64(math.Ceil(float64(current-max)/float64(thr.quantum)))
11411141
return ErrorThreshold{
11421142
Throttler: "cellrate",
1143-
Threshold: strpair{current: current, threshold: thr.threshold},
1143+
Threshold: strpair{current: cur, threshold: thr.threshold},
11441144
}
11451145
}
1146-
atomicSet(&thr.current, updated)
11471146
return nil
11481147
}
11491148

@@ -1152,7 +1151,52 @@ func (thr *tcellrate) Release(ctx context.Context) error {
11521151
if thr.monotone {
11531152
return nil
11541153
}
1155-
updated := atomicGet(&thr.current) - (uint64(thr.quantum) * uint64(ctxWeight(ctx)))
1156-
atomicSet(&thr.current, updated)
1154+
atomicBSub(&thr.current, uint64(thr.quantum)*uint64(ctxWeightMod(ctx)))
1155+
return nil
1156+
}
1157+
1158+
type tbucket struct {
1159+
current uint64
1160+
lastTs uint64
1161+
threshold uint64
1162+
quantum time.Duration
1163+
monotone bool
1164+
}
1165+
1166+
// NewThrottlerBucket creates new throttler instance that
1167+
// uses leaky bucket algorithm to throttles call within provided interval and threshold.
1168+
// If provided monotone flag is set class to release will have no effect on throttler.
1169+
// Use `WithWeight` to override context call qunatity, 1 by default.
1170+
// - could return `ErrorThreshold`;
1171+
func NewThrottlerBucket(threshold uint64, interval time.Duration, monotone bool) Throttler {
1172+
quantum := time.Duration(math.Ceil(float64(interval) / float64(threshold)))
1173+
return &tbucket{threshold: threshold, quantum: quantum, monotone: monotone}
1174+
}
1175+
1176+
func (thr *tbucket) Acquire(ctx context.Context) error {
1177+
nowTs := uint64(time.Now().UTC().UnixNano())
1178+
var delta int64
1179+
if lastTs := atomicGet(&thr.lastTs); lastTs > 0 {
1180+
delta = int64(float64(nowTs-lastTs) / float64(thr.quantum))
1181+
nowTs = uint64(delta) * uint64(thr.quantum)
1182+
}
1183+
diff := ctxWeightMod(ctx) - delta
1184+
if current := atomicBSingAdd(&thr.current, diff); current > thr.threshold {
1185+
atomicBSingAdd(&thr.current, -diff)
1186+
return ErrorThreshold{
1187+
Throttler: "bucket",
1188+
Threshold: strpair{current: current, threshold: thr.threshold},
1189+
}
1190+
}
1191+
atomicBAdd(&thr.lastTs, nowTs)
1192+
return nil
1193+
}
1194+
1195+
func (thr *tbucket) Release(ctx context.Context) error {
1196+
// don't decrement calls for monotone cell.
1197+
if thr.monotone {
1198+
return nil
1199+
}
1200+
atomicBSub(&thr.current, uint64(ctxWeightMod(ctx)))
11571201
return nil
11581202
}

throttlers_test.go

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ const (
2020
ms3_0 time.Duration = 3 * time.Millisecond
2121
ms4_0 time.Duration = 4 * time.Millisecond
2222
ms5_0 time.Duration = 5 * time.Millisecond
23-
ms6_0 time.Duration = 6 * time.Millisecond
2423
ms7_0 time.Duration = 7 * time.Millisecond
2524
ms8_0 time.Duration = 8 * time.Millisecond
2625
ms9_0 time.Duration = 9 * time.Millisecond
2726
ms10_0 time.Duration = 10 * time.Millisecond
27+
ms12_0 time.Duration = 12 * time.Millisecond
2828
ms30_0 time.Duration = 30 * time.Millisecond
2929
)
3030

@@ -1255,7 +1255,7 @@ func TestThrottlers(t *testing.T) {
12551255
},
12561256
"Throttler monotone cellrate should throttle on threshold": {
12571257
tms: 5,
1258-
thr: NewThrottlerCellRate(2, ms6_0, true),
1258+
thr: NewThrottlerCellRate(2, ms12_0, true),
12591259
ctxs: []context.Context{
12601260
context.TODO(),
12611261
context.TODO(),
@@ -1267,8 +1267,8 @@ func TestThrottlers(t *testing.T) {
12671267
nope,
12681268
nope,
12691269
nope,
1270-
delayed(ms4_0, nope),
1271-
delayed(ms3_0, nope),
1270+
delayed(ms7_0, nope),
1271+
delayed(ms9_0, nope),
12721272
},
12731273
errs: []error{
12741274
nil,
@@ -1280,7 +1280,7 @@ func TestThrottlers(t *testing.T) {
12801280
nil,
12811281
ErrorThreshold{
12821282
Throttler: "cellrate",
1283-
Threshold: strpair{current: 3, threshold: 2},
1283+
Threshold: strpair{current: 4, threshold: 2},
12841284
},
12851285
},
12861286
},
@@ -1315,6 +1315,68 @@ func TestThrottlers(t *testing.T) {
13151315
nil,
13161316
},
13171317
},
1318+
"Throttler monotone bucket should throttle on threshold": {
1319+
tms: 5,
1320+
thr: NewThrottlerBucket(2, ms12_0, true),
1321+
ctxs: []context.Context{
1322+
context.TODO(),
1323+
context.TODO(),
1324+
context.TODO(),
1325+
context.TODO(),
1326+
WithWeight(context.TODO(), 2),
1327+
},
1328+
pres: []Runnable{
1329+
nope,
1330+
nope,
1331+
nope,
1332+
delayed(ms7_0, nope),
1333+
delayed(ms9_0, nope),
1334+
},
1335+
errs: []error{
1336+
nil,
1337+
nil,
1338+
ErrorThreshold{
1339+
Throttler: "bucket",
1340+
Threshold: strpair{current: 3, threshold: 2},
1341+
},
1342+
nil,
1343+
ErrorThreshold{
1344+
Throttler: "bucket",
1345+
Threshold: strpair{current: 4, threshold: 2},
1346+
},
1347+
},
1348+
},
1349+
"Throttler not monotone bucket should throttle on threshold": {
1350+
tms: 5,
1351+
thr: NewThrottlerBucket(2, ms9_0, false),
1352+
acts: []Runnable{
1353+
delayed(ms5_0, nope),
1354+
delayed(ms5_0, nope),
1355+
delayed(ms5_0, nope),
1356+
nope,
1357+
nope,
1358+
},
1359+
pres: []Runnable{
1360+
nope,
1361+
nope,
1362+
nope,
1363+
nope,
1364+
delayed(ms7_0, nope),
1365+
},
1366+
errs: []error{
1367+
nil,
1368+
nil,
1369+
ErrorThreshold{
1370+
Throttler: "bucket",
1371+
Threshold: strpair{current: 3, threshold: 2},
1372+
},
1373+
ErrorThreshold{
1374+
Throttler: "bucket",
1375+
Threshold: strpair{current: 3, threshold: 2},
1376+
},
1377+
nil,
1378+
},
1379+
},
13181380
}
13191381
for tname, ptrtcase := range table {
13201382
t.Run(tname, func(t *testing.T) {

0 commit comments

Comments
 (0)