9
9
"math"
10
10
"net/url"
11
11
"sync"
12
+ "sync/atomic"
12
13
"time"
13
14
14
15
"github.com/gorilla/websocket"
@@ -28,7 +29,7 @@ type Ticker struct {
28
29
29
30
url url.URL
30
31
callbacks callbacks
31
- lastPingTime time. Time
32
+ lastPingTime atomicTime
32
33
autoReconnect bool
33
34
reconnectMaxRetries int
34
35
reconnectMaxDelay time.Duration
@@ -41,6 +42,22 @@ type Ticker struct {
41
42
cancel context.CancelFunc
42
43
}
43
44
45
+ // atomicTime is wrapper over time.Time to safely access
46
+ // an updating timestamp concurrently.
47
+ type atomicTime struct {
48
+ v atomic.Value
49
+ }
50
+
51
+ // Get returns the current timestamp.
52
+ func (b * atomicTime ) Get () time.Time {
53
+ return b .v .Load ().(time.Time )
54
+ }
55
+
56
+ // Set sets the current timestamp.
57
+ func (b * atomicTime ) Set (value time.Time ) {
58
+ b .v .Store (value )
59
+ }
60
+
44
61
// callbacks represents callbacks available in ticker.
45
62
type callbacks struct {
46
63
onTick func (models.Tick )
@@ -311,7 +328,7 @@ func (t *Ticker) ServeWithContext(ctx context.Context) {
311
328
t .reconnectAttempt = 0
312
329
313
330
// Set current time as last ping time
314
- t .lastPingTime = time .Now ()
331
+ t .lastPingTime . Set ( time .Now () )
315
332
316
333
// Set on close handler
317
334
t .Conn .SetCloseHandler (t .handleClose )
@@ -339,6 +356,7 @@ func (t *Ticker) handleClose(code int, reason string) error {
339
356
return nil
340
357
}
341
358
359
+
342
360
// Trigger callback methods
343
361
func (t * Ticker ) triggerError (err error ) {
344
362
if t .callbacks .onError != nil {
@@ -370,6 +388,7 @@ func (t *Ticker) triggerNoReconnect(attempt int) {
370
388
}
371
389
}
372
390
391
+
373
392
func (t * Ticker ) triggerMessage (messageType int , message []byte ) {
374
393
if t .callbacks .onMessage != nil {
375
394
t .callbacks .onMessage (messageType , message )
@@ -401,7 +420,7 @@ func (t *Ticker) checkConnection(ctx context.Context, wg *sync.WaitGroup) {
401
420
402
421
// If last ping time is greater then timeout interval then close the
403
422
// existing connection and reconnect
404
- if time .Since (t .lastPingTime ) > dataTimeoutInterval {
423
+ if time .Since (t .lastPingTime . Get () ) > dataTimeoutInterval {
405
424
// Close the current connection without waiting for close frame
406
425
if t .Conn != nil {
407
426
t .Conn .Close ()
@@ -431,7 +450,7 @@ func (t *Ticker) readMessage(ctx context.Context, wg *sync.WaitGroup) {
431
450
}
432
451
433
452
// Update last ping time to check for connection
434
- t .lastPingTime = time .Now ()
453
+ t .lastPingTime . Set ( time .Now () )
435
454
436
455
// Trigger message.
437
456
t .triggerMessage (mType , msg )
0 commit comments