Skip to content

Commit 2730a0c

Browse files
committed
use weight in context in more throttlers
1 parent bcb2a1a commit 2730a0c

File tree

5 files changed

+44
-18
lines changed

5 files changed

+44
-18
lines changed

README.MD

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func WithPriority(ctx context.Context, priority uint8) context.Context
7373
func WithWeight(ctx context.Context, weight int64) context.Context
7474
// WithKey adds the provided key to the provided context
7575
// to add additional call identifier to context.
76-
// Resulted context is used by: `pattern` and `generator` throtttlers.
76+
// Resulted context is used by: `before`, `after`, `timed`, `adaptive`, `semaphore` and `cellrate` throtttlers.
7777
func WithKey(ctx context.Context, key string) context.Context
7878
// WithMessage adds the provided message to the provided context
7979
// to add additional message that need to be used to context.
@@ -141,21 +141,21 @@ You can find list of returning error types for all existing throttlers in thrott
141141
| context | `func NewThrottlerContext() Throttler` | Always throttless on *done* context.<br> - could return `ErrorInternal`; |
142142
| panic | `func NewThrottlerPanic() Throttler` | Always panics with `ErrorInternal`. |
143143
| each | `func NewThrottlerEach(threshold uint64) Throttler` | Throttles each periodic *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
144-
| before | `func NewThrottlerBefore(threshold uint64) Throttler` | Throttles each call below the *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
145-
| after | `func NewThrottlerAfter(threshold uint64) Throttler` | Throttles each call after the *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
144+
| before | `func NewThrottlerBefore(threshold uint64) Throttler` | Throttles each call below the *i-th* call defined by the specified threshold.<br>Use `WithWeight` to override context call qunatity, 1 by default.<br> - could return `ErrorThreshold`; |
145+
| after | `func NewThrottlerAfter(threshold uint64) Throttler` | Throttles each call after the *i-th* call defined by the specified threshold.<br>Use `WithWeight` to override context call qunatity, 1 by default.<br> - could return `ErrorThreshold`; |
146146
| past | `func NewThrottlerPast(threshold time.Time) Throttler` | Throttles each call befor timestamp defined by the specified UTC time threshold.<br> - could return `ErrorThreshold`; |
147147
| future | `func NewThrottlerFuture(threshold time.Time) Throttler` | Throttles each call after timestamp defined by the specified UTC time threshold.<br> - could return `ErrorThreshold`; |
148148
| chance | `func NewThrottlerChance(threshold float64) Throttler` | Throttles each call with the chance *p* defined by the specified threshold.<br> Chance value is normalized to *[0.0, 1.0]* range.<br> Implementation uses secure `crypto/rand` as PRNG function.<br> - could return `ErrorThreshold`; |
149149
| running | `func NewThrottlerRunning(threshold uint64) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold.<br> - could return `ErrorThreshold`; |
150150
| buffered | `func NewThrottlerBuffered(threshold uint64) Throttler` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until the running quota is available again. |
151151
| priority | `func NewThrottlerPriority(threshold uint64, levels uint8) Throttler` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until the running quota is available again.<br> Running quota is not equally distributed between *n* levels of priority defined by the specified levels.<br> Use `func WithPriority(ctx context.Context, priority uint8) context.Context` to override context call priority, *1* by default. |
152-
| timed | `func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.<br> - could return `ErrorThreshold`; |
152+
| timed | `func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.<br>Use `WithWeight` to override context call qunatity, 1 by default.<br> - could return `ErrorThreshold`; |
153153
| latency | `func NewThrottlerLatency(threshold time.Duration, retention time.Duration) Throttler` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.<br> If retention is set then throttler state will be reseted after retention duration.<br> Use `func WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*.<br> - could return `ErrorThreshold`; |
154154
| percentile | `func NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration) Throttler` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.<br> Percentile values are kept in bounded buffer with capacity *c* defined by the specified capacity. <br> If retention is set then throttler state will be reseted after retention duration.<br> Use `func WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*.<br> - could return `ErrorThreshold`; |
155155
| monitor | `func NewThrottlerMonitor(mnt Monitor, threshold Stats) Throttler` | Throttles call if any of the stats returned by provided monitor exceeds any of the stats defined by the specified threshold or if any internal error occurred.<br> Builtin `Monitor` implementations come with stats caching by default.<br> Use builtin `NewMonitorSystem` to create go system monitor instance.<br> - could return `ErrorInternal`;<br> - could return `ErrorThreshold`; |
156156
| metric | `func NewThrottlerMetric(mtc Metric) Throttler` | Throttles call if boolean metric defined by the specified boolean metric is reached or if any internal error occurred.<br> Builtin `Metric` implementations come with boolean metric caching by default.<br> Use builtin `NewMetricPrometheus` to create Prometheus metric instance.<br> - could return `ErrorInternal`;<br> - could return `ErrorThreshold`; |
157157
| enqueuer | `func NewThrottlerEnqueue(enq Enqueuer) Throttler` | Always enqueues message to the specified queue throttles only if any internal error occurred.<br> Use `func WithMessage(ctx context.Context, message interface{}) context.Context` to specify context message for enqueued message and `func WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context` to specify context message marshaler.<br> Builtin `Enqueuer` implementations come with connection reuse and retries by default.<br> Use builtin `func NewEnqueuerRabbit(url string, queue string, retries uint64) Enqueuer` to create RabbitMQ enqueuer instance or `func NewEnqueuerKafka(net string, url string, topic string, retries uint64) Enqueuer` to create Kafka enqueuer instance.<br> - could return `ErrorInternal`; |
158-
| adaptive | `func NewThrottlerAdaptive(threshold uint64, interval time.Duration, quantum time.Duration, step uint64, thr Throttler) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.<br> Provided adapted throttler adjusts the running quota of adapter throttler by changing the value by *d* defined by the specified step, it subtracts *d^2* from the running quota if adapted throttler throttles or adds *d* to the running quota if it doesn't.<br> - could return `ErrorThreshold`; |
158+
| adaptive | `func NewThrottlerAdaptive(threshold uint64, interval time.Duration, quantum time.Duration, step uint64, thr Throttler) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.<br> Provided adapted throttler adjusts the running quota of adapter throttler by changing the value by *d* defined by the specified step, it subtracts *d^2* from the running quota if adapted throttler throttles or adds *d* to the running quota if it doesn't.<br>Use `WithWeight` to override context call qunatity, 1 by default.<br> - could return `ErrorThreshold`; |
159159
| pattern | `func NewThrottlerPattern(patterns ...Pattern) Throttler` | Throttles if matching throttler from provided patterns throttles.<br> Use `func WithKey(ctx context.Context, key string) context.Context` to specify key for regexp pattern throttler matching.<br> `Pattern` defines a pair of regexp and related throttler.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |
160160
| ring | `func NewThrottlerRing(thrs ...Throttler) Throttler` | Throttles if the *i-th* call throttler from provided list throttle.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |
161161
| all | `func NewThrottlerAll(thrs ...Throttler) Throttler` | Throttles call if all provided throttlers throttle.<br> - could return `ErrorInternal`; |

atomic.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ import (
55
"sync/atomic"
66
)
77

8+
func atomicBSingAdd(number *uint64, delta int64) (res uint64) {
9+
if delta > 0 {
10+
return atomicBAdd(number, uint64(delta))
11+
}
12+
return atomicBSub(number, uint64(-delta))
13+
}
14+
815
func atomicBAdd(number *uint64, delta uint64) (res uint64) {
916
prev := atomic.LoadUint64(number)
1017
if res = atomic.AddUint64(number, delta); res < prev {

context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ func ctxPriority(ctx context.Context, limit uint8) uint8 {
5050

5151
// WithWeight adds the provided weight to the provided context
5252
// to differ `Acquire` weight levels.
53-
// Resulted context is used by: `semaphore` and `cellrate` throtttlers.
53+
// Resulted context is used by: `before`, `after`, `timed`, `adaptive`, `semaphore` and `cellrate` throtttlers.
5454
func WithWeight(ctx context.Context, weight int64) context.Context {
5555
return context.WithValue(ctx, ghctxweight, weight)
5656
}
5757

5858
func ctxWeight(ctx context.Context) int64 {
5959
if val := ctx.Value(ghctxweight); val != nil {
60-
if weight, ok := val.(int64); ok && weight > 0 {
60+
if weight, ok := val.(int64); ok {
6161
return weight
6262
}
6363
}

throttlers.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,13 +219,14 @@ type tbefore struct {
219219

220220
// NewThrottlerBefore creates new throttler instance that
221221
// throttles each call below the i-th call defined by the specified threshold.
222+
// Use `WithWeight` to override context call qunatity, 1 by default.
222223
// - could return `ErrorThreshold`;
223224
func NewThrottlerBefore(threshold uint64) Throttler {
224225
return &tbefore{threshold: threshold}
225226
}
226227

227-
func (thr *tbefore) Acquire(context.Context) error {
228-
if current := atomicBIncr(&thr.current); current <= thr.threshold {
228+
func (thr *tbefore) Acquire(ctx context.Context) error {
229+
if current := atomicBSingAdd(&thr.current, ctxWeight(ctx)); current <= thr.threshold {
229230
return ErrorThreshold{
230231
Throttler: "before",
231232
Threshold: strpair{current: current, threshold: thr.threshold},
@@ -245,13 +246,14 @@ type tafter struct {
245246

246247
// NewThrottlerAfter creates new throttler instance that
247248
// throttles each call after the i-th call defined by the specified threshold.
249+
// Use `WithWeight` to override context call qunatity, 1 by default.
248250
// - could return `ErrorThreshold`;
249251
func NewThrottlerAfter(threshold uint64) Throttler {
250252
return &tafter{threshold: threshold}
251253
}
252254

253-
func (thr *tafter) Acquire(context.Context) error {
254-
if current := atomicBIncr(&thr.current); current > thr.threshold {
255+
func (thr *tafter) Acquire(ctx context.Context) error {
256+
if current := atomicBSingAdd(&thr.current, ctxWeight(ctx)); current > thr.threshold {
255257
return ErrorThreshold{
256258
Throttler: "after",
257259
Threshold: strpair{current: current, threshold: thr.threshold},
@@ -453,6 +455,7 @@ type ttimed struct {
453455
// q defined by the specified threshold in the specified interval.
454456
// Periodically each specified interval the running quota number is reseted.
455457
// If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.
458+
// Use `WithWeight` to override context call qunatity, 1 by default.
456459
// - could return `ErrorThreshold`;
457460
func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) Throttler {
458461
tafter := NewThrottlerAfter(threshold).(*tafter)
@@ -725,6 +728,7 @@ type tadaptive struct {
725728
// Provided adapted throttler adjusts the running quota of adapter throttler by changing the value by d
726729
// defined by the specified step, it subtracts *d^2* from the running quota
727730
// if adapted throttler throttles or adds *d* to the running quota if it doesn't.
731+
// Use `WithWeight` to override context call qunatity, 1 by default.
728732
// - could return `ErrorThreshold`;
729733
func NewThrottlerAdaptive(
730734
threshold uint64,

throttlers_test.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,19 +258,27 @@ func TestThrottlers(t *testing.T) {
258258
},
259259
"Throttler before should throttle before threshold": {
260260
tms: 6,
261-
thr: NewThrottlerBefore(3),
261+
thr: NewThrottlerBefore(4),
262+
ctxs: []context.Context{
263+
context.TODO(),
264+
context.TODO(),
265+
context.TODO(),
266+
WithWeight(context.TODO(), 2),
267+
context.TODO(),
268+
context.TODO(),
269+
},
262270
errs: []error{
263271
ErrorThreshold{
264272
Throttler: "before",
265-
Threshold: strpair{current: 1, threshold: 3},
273+
Threshold: strpair{current: 1, threshold: 4},
266274
},
267275
ErrorThreshold{
268276
Throttler: "before",
269-
Threshold: strpair{current: 2, threshold: 3},
277+
Threshold: strpair{current: 2, threshold: 4},
270278
},
271279
ErrorThreshold{
272280
Throttler: "before",
273-
Threshold: strpair{current: 3, threshold: 3},
281+
Threshold: strpair{current: 3, threshold: 4},
274282
},
275283
nil,
276284
nil,
@@ -280,6 +288,13 @@ func TestThrottlers(t *testing.T) {
280288
"Throttler after should throttle after threshold": {
281289
tms: 6,
282290
thr: NewThrottlerAfter(3),
291+
ctxs: []context.Context{
292+
WithWeight(context.TODO(), 3),
293+
WithWeight(context.TODO(), -1),
294+
context.TODO(),
295+
context.TODO(),
296+
context.TODO(),
297+
},
283298
errs: []error{
284299
nil,
285300
nil,
@@ -388,9 +403,9 @@ func TestThrottlers(t *testing.T) {
388403
tms: 3,
389404
thr: NewThrottlerRunning(1),
390405
acts: []Runnable{
391-
delayed(ms1_0, nope),
392-
delayed(ms1_0, nope),
393-
delayed(ms1_0, nope),
406+
delayed(ms2_0, nope),
407+
delayed(ms2_0, nope),
408+
delayed(ms2_0, nope),
394409
},
395410
errs: []error{
396411
nil,

0 commit comments

Comments
 (0)