From 63f06b218ac2ea0984f1357d8cc5fa10a7bbea56 Mon Sep 17 00:00:00 2001 From: samliok Date: Tue, 29 Apr 2025 13:21:39 -0700 Subject: [PATCH 1/9] rebroadcast empty votes working --- epoch.go | 32 ++++++++++-- epoch_failover_test.go | 113 ++++++++++++++++++++++++++++++++++++++++ epoch_multinode_test.go | 23 ++++++++ replication.go | 32 ++++++++++-- timeout_handler.go | 25 +++------ timeout_handler_test.go | 26 +++++---- 6 files changed, 215 insertions(+), 36 deletions(-) diff --git a/epoch.go b/epoch.go index e77e88b3..992a8ef0 100644 --- a/epoch.go +++ b/epoch.go @@ -24,8 +24,10 @@ const ( DefaultMaxRoundWindow = 10 DefaultMaxPendingBlocks = 20 - DefaultMaxProposalWaitTime = 5 * time.Second - DefaultReplicationRequestTimeout = 5 * time.Second + DefaultMaxProposalWaitTime = 5 * time.Second + DefaultReplicationRequestTimeout = 5 * time.Second + DefaultEmptyVoteRebroadcastTimeout = 5 * time.Second + EmptyVoteTimeoutID = "rebroadcast_empty_vote" ) type EmptyVoteSet struct { @@ -54,6 +56,7 @@ func NewRound(block VerifiedBlock) *Round { type EpochConfig struct { MaxProposalWait time.Duration + MaxRebrodcastWait time.Duration QCDeserializer QCDeserializer Logger Logger ID NodeID @@ -92,8 +95,8 @@ type Epoch struct { monitor *Monitor haltedError error cancelWaitForBlockNotarization context.CancelFunc - - replicationState *ReplicationState + timeoutHandler *TimeoutHandler + replicationState *ReplicationState } func NewEpoch(conf EpochConfig) (*Epoch, error) { @@ -107,6 +110,7 @@ func NewEpoch(conf EpochConfig) (*Epoch, error) { func (e *Epoch) AdvanceTime(t time.Time) { e.monitor.AdvanceTime(t) e.replicationState.AdvanceTime(t) + e.timeoutHandler.Tick(t) } // HandleMessage notifies the engine about a reception of a message. @@ -176,6 +180,7 @@ func (e *Epoch) init() error { e.eligibleNodeIDs = make(map[string]struct{}, len(e.nodes)) e.futureMessages = make(messagesFromNode, len(e.nodes)) e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.maxRoundWindow, e.ReplicationEnabled, e.StartTime) + e.timeoutHandler = NewTimeoutHandler(e.Logger, e.StartTime, e.nodes) for _, node := range e.nodes { e.futureMessages[string(node)] = make(map[uint64]*messagesForRound) @@ -1955,12 +1960,29 @@ func (e *Epoch) triggerProposalWaitTimeExpired(round uint64) { e.Comm.Broadcast(&Message{EmptyVoteMessage: &signedEV}) + e.addEmptyVoteRebroadcastTimeout(&signedEV) + if err := e.maybeAssembleEmptyNotarization(); err != nil { e.Logger.Error("Failed assembling empty notarization", zap.Error(err)) e.haltedError = err } } +func (e *Epoch) addEmptyVoteRebroadcastTimeout(vote *EmptyVote) { + task := &TimeoutTask{ + NodeID: e.ID, + TaskID: EmptyVoteTimeoutID, + Deadline: e.timeoutHandler.GetTime().Add(e.EpochConfig.MaxRebrodcastWait), + Task: func() { + e.Logger.Debug("Rebroadcasting empty vote because round has not advanced", zap.Uint64("round", vote.Vote.Round)) + e.Comm.Broadcast(&Message{EmptyVoteMessage: vote}) + e.addEmptyVoteRebroadcastTimeout(vote) + }, + } + + e.timeoutHandler.AddTask(task) +} + func (e *Epoch) monitorProgress(round uint64) { e.Logger.Debug("Monitoring progress", zap.Uint64("round", round)) ctx, cancelContext := context.WithCancel(context.Background()) @@ -2103,6 +2125,8 @@ func (e *Epoch) increaseRound() { // we advanced to the next round. e.cancelWaitForBlockNotarization() + // remove the rebroadcast empty vote task + e.timeoutHandler.RemoveTask(e.ID, EmptyVoteTimeoutID) e.deleteEmptyVoteForPreviousRound() leader := LeaderForRound(e.nodes, e.round) diff --git a/epoch_failover_test.go b/epoch_failover_test.go index db53f761..0e581df3 100644 --- a/epoch_failover_test.go +++ b/epoch_failover_test.go @@ -6,6 +6,7 @@ package simplex_test import ( "context" "fmt" + "simplex" . "simplex" "simplex/testutil" "sync/atomic" @@ -765,6 +766,118 @@ func TestEpochLeaderFailoverNotNeeded(t *testing.T) { require.False(t, timedOut.Load()) } +type rebroadcastCounterComm struct { + nodes []NodeID + count atomic.Uint64 + mostRecentRound atomic.Uint64 +} + +func (r *rebroadcastCounterComm) ListNodes() []NodeID { + return r.nodes +} + +func (r *rebroadcastCounterComm) SendMessage(*Message, NodeID) { + +} + +func (r *rebroadcastCounterComm) Broadcast(msg *Message) { + if msg.EmptyVoteMessage != nil { + r.mostRecentRound.Store(msg.EmptyVoteMessage.Vote.Round) + r.count.Add(1) + } +} + +func TestEpochRebroadcastsEmptyVote(t *testing.T) { + l := testutil.MakeLogger(t, 2) + bb := &testBlockBuilder{out: make(chan *testBlock, 1), blockShouldBeBuilt: make(chan struct{}, 1)} + storage := newInMemStorage() + + nodes := []NodeID{{1}, {2}, {3}, {4}} + + wal := newTestWAL(t) + + epochTime := time.Now() + comm := &rebroadcastCounterComm{ + nodes: nodes, + } + conf := EpochConfig{ + MaxProposalWait: DefaultMaxProposalWaitTime, + MaxRebrodcastWait: 500 * time.Millisecond, + StartTime: epochTime, + Logger: l, + ID: nodes[3], // so we are not the leader + Signer: &testSigner{}, + WAL: wal, + Verifier: &testVerifier{}, + Storage: storage, + Comm: comm, + BlockBuilder: bb, + SignatureAggregator: &testSignatureAggregator{}, + } + + e, err := NewEpoch(conf) + require.NoError(t, err) + + require.NoError(t, e.Start()) + require.Equal(t, uint64(0), e.Metadata().Round) + require.Equal(t, uint64(0), e.Metadata().Round) + require.False(t, wal.containsEmptyVote(0)) + + bb.blockShouldBeBuilt <- struct{}{} + time.Sleep(10 * time.Millisecond) + epochTime = epochTime.Add(DefaultMaxProposalWaitTime) + e.AdvanceTime(epochTime) + wal.assertEmptyVote(0) + wal.assertWALSize(1) + require.Equal(t, uint64(1), comm.count.Load()) + + // reset to get rebroadcast count + comm.count.Store(0) + for i := range 10 { + epochTime = epochTime.Add(e.MaxRebrodcastWait) + e.AdvanceTime(epochTime) + time.Sleep(10 * time.Millisecond) + require.Equal(t, uint64(i + 1), comm.count.Load()) + require.Equal(t, uint64(0), comm.mostRecentRound.Load()) + wal.assertWALSize(1) + } + + emptyNotarization := newEmptyNotarization(nodes, 0, 0) + e.HandleMessage(&simplex.Message{ + EmptyNotarization: emptyNotarization, + }, nodes[2]) + + wal.assertNotarization(0) + + comm.count.Store(0) + // ensure the rebroadcast was canceled + for range 10 { + epochTime = epochTime.Add(e.MaxRebrodcastWait) + e.AdvanceTime(epochTime) + time.Sleep(10 * time.Millisecond) + require.Equal(t, uint64(0), comm.count.Load()) + } + + // assert that we continue to rebraodcast, but for a different round now + bb.blockShouldBeBuilt <- struct{}{} + time.Sleep(10 * time.Millisecond) + epochTime = epochTime.Add(DefaultMaxProposalWaitTime) + e.AdvanceTime(epochTime) + wal.assertEmptyVote(1) + wal.assertWALSize(3) + + // reset to get rebroadcast count + comm.count.Store(0) + for i := range 10 { + epochTime = epochTime.Add(e.MaxRebrodcastWait) + e.AdvanceTime(epochTime) + time.Sleep(10 * time.Millisecond) + require.Equal(t, uint64(i + 1), comm.count.Load()) + require.Equal(t, uint64(1), comm.mostRecentRound.Load()) + wal.assertWALSize(3) + } +} + func runCrashAndRestartExecution(t *testing.T, e *Epoch, bb *testBlockBuilder, wal *testWAL, storage *InMemStorage, f epochExecution) { // Split the test into two scenarios: // 1) The node proceeds as usual. diff --git a/epoch_multinode_test.go b/epoch_multinode_test.go index 5231701d..6219c8c1 100644 --- a/epoch_multinode_test.go +++ b/epoch_multinode_test.go @@ -268,6 +268,29 @@ func (tw *testWAL) assertNotarizationOrFinalization(round uint64, qc QCDeseriali } +func (tw *testWAL) assertEmptyVote(round uint64) { + tw.lock.Lock() + defer tw.lock.Unlock() + + for { + rawRecords, err := tw.WriteAheadLog.ReadAll() + require.NoError(tw.t, err) + + for _, rawRecord := range rawRecords { + if binary.BigEndian.Uint16(rawRecord[:2]) == record.EmptyVoteRecordType { + vote, err := ParseEmptyVoteRecord(rawRecord) + require.NoError(tw.t, err) + + if vote.Round == round { + return + } + } + } + + tw.signal.Wait() + } +} + func (tw *testWAL) containsEmptyVote(round uint64) bool { tw.lock.Lock() defer tw.lock.Unlock() diff --git a/replication.go b/replication.go index f1070c9f..22d13d23 100644 --- a/replication.go +++ b/replication.go @@ -170,8 +170,10 @@ func (r *ReplicationState) createReplicationTimeoutTask(start, end uint64, nodes r.sendRequestToNode(start, end, nodes, (index+1)%len(nodes)) } timeoutTask := &TimeoutTask{ - Start: start, - End: end, + Data: &ReplicationTimeoutData{ + Start: start, + End: end, + }, NodeID: nodes[index], TaskID: getTimeoutID(start, end), Task: taskFunc, @@ -193,15 +195,17 @@ func (r *ReplicationState) receivedReplicationResponse(data []QuorumRound, node slices.Sort(seqs) - task := r.timeoutHandler.FindTask(node, seqs) + task := FindTask(r.timeoutHandler, node, seqs) if task == nil { r.logger.Debug("Could not find a timeout task associated with the replication response", zap.Stringer("from", node)) return } r.timeoutHandler.RemoveTask(node, task.TaskID) + taskData := task.Data + replicationData := taskData.(*ReplicationTimeoutData) // we found the timeout, now make sure all seqs were returned - missing := findMissingNumbersInRange(task.Start, task.End, seqs) + missing := findMissingNumbersInRange(replicationData.Start, replicationData.End, seqs) if len(missing) == 0 { return } @@ -322,3 +326,23 @@ func (r *ReplicationState) GetQuroumRoundWithSeq(seq uint64) *QuorumRound { } return nil } + +// FindTask returns the first TimeoutTask assigned to [node] that contains any sequence in [seqs]. +// A sequence is considered "contained" if it falls between a task's Start (inclusive) and End (inclusive). +// func (t *TimeoutHandler) FindTask(node NodeID, seqs []uint64) *TimeoutTask { +func FindTask(t *TimeoutHandler, node NodeID, seqs []uint64) *TimeoutTask { + t.lock.Lock() + defer t.lock.Unlock() + + for _, task := range t.tasks[string(node)] { + data := task.Data + replicationData := data.(*ReplicationTimeoutData) + for _, seq := range seqs { + if seq >= replicationData.Start && seq <= replicationData.End { + return task + } + } + } + + return nil +} diff --git a/timeout_handler.go b/timeout_handler.go index b8e13b96..ba9d5c0a 100644 --- a/timeout_handler.go +++ b/timeout_handler.go @@ -12,11 +12,15 @@ import ( "go.uber.org/zap" ) +type ReplicationTimeoutData struct { + Start uint64 + End uint64 +} + type TimeoutTask struct { NodeID NodeID TaskID string - Start uint64 - End uint64 + Data any Task func() Deadline time.Time @@ -168,23 +172,6 @@ func (t *TimeoutHandler) Close() { } } -// FindTask returns the first TimeoutTask assigned to [node] that contains any sequence in [seqs]. -// A sequence is considered "contained" if it falls between a task's Start (inclusive) and End (inclusive). -func (t *TimeoutHandler) FindTask(node NodeID, seqs []uint64) *TimeoutTask { - t.lock.Lock() - defer t.lock.Unlock() - - for _, seq := range seqs { - for _, t := range t.tasks[string(node)] { - if seq >= t.Start && seq <= t.End { - return t - } - } - } - - return nil -} - const delimiter = "_" func getTimeoutID(start, end uint64) string { diff --git a/timeout_handler_test.go b/timeout_handler_test.go index 509aa0ab..671fd42f 100644 --- a/timeout_handler_test.go +++ b/timeout_handler_test.go @@ -231,29 +231,37 @@ func TestFindTask(t *testing.T) { task1 := &simplex.TimeoutTask{ TaskID: "task1", NodeID: nodes[0], - Start: 5, - End: 10, + Data: &simplex.ReplicationTimeoutData{ + Start: 5, + End: 10, + }, } taskSameRangeDiffNode := &simplex.TimeoutTask{ TaskID: "taskSameDiff", NodeID: nodes[1], - Start: 5, - End: 10, + Data: &simplex.ReplicationTimeoutData{ + Start: 5, + End: 10, + }, } task3 := &simplex.TimeoutTask{ TaskID: "task3", NodeID: nodes[1], - Start: 25, - End: 30, + Data: &simplex.ReplicationTimeoutData{ + Start: 25, + End: 30, + }, } task4 := &simplex.TimeoutTask{ TaskID: "task4", NodeID: nodes[1], - Start: 31, - End: 36, + Data: &simplex.ReplicationTimeoutData{ + Start: 31, + End: 36, + }, } // Add tasks to handler @@ -320,7 +328,7 @@ func TestFindTask(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := handler.FindTask(tt.node, tt.seqs) + result := simplex.FindTask(handler, tt.node, tt.seqs) require.Equal(t, tt.expected, result) }) } From f5f9f1f4f40e4262d1bafe89bd1148c69200d74b Mon Sep 17 00:00:00 2001 From: samliok Date: Tue, 29 Apr 2025 14:17:47 -0700 Subject: [PATCH 2/9] remove time.sleep from test --- epoch.go | 2 +- epoch_failover_test.go | 128 ++++++++++++++++++++++------------------ epoch_multinode_test.go | 23 -------- replication.go | 15 +++-- timeout_handler.go | 5 -- timeout_handler_test.go | 2 +- 6 files changed, 82 insertions(+), 93 deletions(-) diff --git a/epoch.go b/epoch.go index 992a8ef0..6b33b8ef 100644 --- a/epoch.go +++ b/epoch.go @@ -56,7 +56,7 @@ func NewRound(block VerifiedBlock) *Round { type EpochConfig struct { MaxProposalWait time.Duration - MaxRebrodcastWait time.Duration + MaxRebrodcastWait time.Duration QCDeserializer QCDeserializer Logger Logger ID NodeID diff --git a/epoch_failover_test.go b/epoch_failover_test.go index 0e581df3..1a2e17eb 100644 --- a/epoch_failover_test.go +++ b/epoch_failover_test.go @@ -766,24 +766,29 @@ func TestEpochLeaderFailoverNotNeeded(t *testing.T) { require.False(t, timedOut.Load()) } -type rebroadcastCounterComm struct { - nodes []NodeID - count atomic.Uint64 - mostRecentRound atomic.Uint64 +type rebroadcastComm struct { + nodes []NodeID + emptyVotes chan *EmptyVote } -func (r *rebroadcastCounterComm) ListNodes() []NodeID { +func newRebroadcastComm(nodes []NodeID) *rebroadcastComm { + return &rebroadcastComm{ + nodes: nodes, + emptyVotes: make(chan *EmptyVote), + } +} + +func (r *rebroadcastComm) ListNodes() []NodeID { return r.nodes } -func (r *rebroadcastCounterComm) SendMessage(*Message, NodeID) { +func (r *rebroadcastComm) SendMessage(*Message, NodeID) { } -func (r *rebroadcastCounterComm) Broadcast(msg *Message) { +func (r *rebroadcastComm) Broadcast(msg *Message) { if msg.EmptyVoteMessage != nil { - r.mostRecentRound.Store(msg.EmptyVoteMessage.Vote.Round) - r.count.Add(1) + r.emptyVotes <- msg.EmptyVoteMessage } } @@ -797,20 +802,18 @@ func TestEpochRebroadcastsEmptyVote(t *testing.T) { wal := newTestWAL(t) epochTime := time.Now() - comm := &rebroadcastCounterComm{ - nodes: nodes, - } + comm := newRebroadcastComm(nodes) conf := EpochConfig{ - MaxProposalWait: DefaultMaxProposalWaitTime, - MaxRebrodcastWait: 500 * time.Millisecond, - StartTime: epochTime, - Logger: l, - ID: nodes[3], // so we are not the leader - Signer: &testSigner{}, - WAL: wal, - Verifier: &testVerifier{}, - Storage: storage, - Comm: comm, + MaxProposalWait: DefaultMaxProposalWaitTime, + MaxRebrodcastWait: DefaultEmptyVoteRebroadcastTimeout, + StartTime: epochTime, + Logger: l, + ID: nodes[3], // so we are not the leader + Signer: &testSigner{}, + WAL: wal, + Verifier: &testVerifier{}, + Storage: storage, + Comm: comm, BlockBuilder: bb, SignatureAggregator: &testSignatureAggregator{}, } @@ -824,56 +827,67 @@ func TestEpochRebroadcastsEmptyVote(t *testing.T) { require.False(t, wal.containsEmptyVote(0)) bb.blockShouldBeBuilt <- struct{}{} - time.Sleep(10 * time.Millisecond) - epochTime = epochTime.Add(DefaultMaxProposalWaitTime) - e.AdvanceTime(epochTime) - wal.assertEmptyVote(0) - wal.assertWALSize(1) - require.Equal(t, uint64(1), comm.count.Load()) - - // reset to get rebroadcast count - comm.count.Store(0) - for i := range 10 { + + // wait for the initial empty vote broadcast + done := false + for !done { + select { + case emptyVote := <-comm.emptyVotes: + require.True(t, wal.containsEmptyVote(0)) + require.Equal(t, uint64(0), emptyVote.Vote.Round) + done = true + case <-time.After(10 * time.Millisecond): + epochTime = epochTime.Add(e.MaxRebrodcastWait) + e.AdvanceTime(epochTime) + } + } + + // since the round does not advance, continue to rebroadcast + for range 10 { epochTime = epochTime.Add(e.MaxRebrodcastWait) e.AdvanceTime(epochTime) - time.Sleep(10 * time.Millisecond) - require.Equal(t, uint64(i + 1), comm.count.Load()) - require.Equal(t, uint64(0), comm.mostRecentRound.Load()) + emptyVote := <-comm.emptyVotes + require.Equal(t, uint64(0), emptyVote.Vote.Round) wal.assertWALSize(1) } - + emptyNotarization := newEmptyNotarization(nodes, 0, 0) e.HandleMessage(&simplex.Message{ EmptyNotarization: emptyNotarization, - }, nodes[2]) - - wal.assertNotarization(0) - - comm.count.Store(0) + }, nodes[2]) + + wal.assertNotarization(0) + // ensure the rebroadcast was canceled - for range 10 { - epochTime = epochTime.Add(e.MaxRebrodcastWait) - e.AdvanceTime(epochTime) - time.Sleep(10 * time.Millisecond) - require.Equal(t, uint64(0), comm.count.Load()) - } + epochTime = epochTime.Add(e.MaxRebrodcastWait * 2) + e.AdvanceTime(epochTime) + + // ensure the timeout was canceled + require.Len(t, comm.emptyVotes, 0) // assert that we continue to rebraodcast, but for a different round now bb.blockShouldBeBuilt <- struct{}{} - time.Sleep(10 * time.Millisecond) - epochTime = epochTime.Add(DefaultMaxProposalWaitTime) - e.AdvanceTime(epochTime) - wal.assertEmptyVote(1) + + // wait for the initial empty vote broadcast + done = false + for !done { + select { + case emptyVote := <-comm.emptyVotes: + require.True(t, wal.containsEmptyVote(1)) + require.Equal(t, uint64(1), emptyVote.Vote.Round) + done = true + case <-time.After(10 * time.Millisecond): + epochTime = epochTime.Add(e.MaxRebrodcastWait) + e.AdvanceTime(epochTime) + } + } wal.assertWALSize(3) - // reset to get rebroadcast count - comm.count.Store(0) - for i := range 10 { + for range 10 { epochTime = epochTime.Add(e.MaxRebrodcastWait) e.AdvanceTime(epochTime) - time.Sleep(10 * time.Millisecond) - require.Equal(t, uint64(i + 1), comm.count.Load()) - require.Equal(t, uint64(1), comm.mostRecentRound.Load()) + emptyVote := <-comm.emptyVotes + require.Equal(t, uint64(1), emptyVote.Vote.Round) wal.assertWALSize(3) } } diff --git a/epoch_multinode_test.go b/epoch_multinode_test.go index 6219c8c1..5231701d 100644 --- a/epoch_multinode_test.go +++ b/epoch_multinode_test.go @@ -268,29 +268,6 @@ func (tw *testWAL) assertNotarizationOrFinalization(round uint64, qc QCDeseriali } -func (tw *testWAL) assertEmptyVote(round uint64) { - tw.lock.Lock() - defer tw.lock.Unlock() - - for { - rawRecords, err := tw.WriteAheadLog.ReadAll() - require.NoError(tw.t, err) - - for _, rawRecord := range rawRecords { - if binary.BigEndian.Uint16(rawRecord[:2]) == record.EmptyVoteRecordType { - vote, err := ParseEmptyVoteRecord(rawRecord) - require.NoError(tw.t, err) - - if vote.Round == round { - return - } - } - } - - tw.signal.Wait() - } -} - func (tw *testWAL) containsEmptyVote(round uint64) bool { tw.lock.Lock() defer tw.lock.Unlock() diff --git a/replication.go b/replication.go index 22d13d23..43796ab0 100644 --- a/replication.go +++ b/replication.go @@ -195,15 +195,14 @@ func (r *ReplicationState) receivedReplicationResponse(data []QuorumRound, node slices.Sort(seqs) - task := FindTask(r.timeoutHandler, node, seqs) + task := FindReplicationTask(r.timeoutHandler, node, seqs) if task == nil { r.logger.Debug("Could not find a timeout task associated with the replication response", zap.Stringer("from", node)) return } r.timeoutHandler.RemoveTask(node, task.TaskID) - taskData := task.Data - replicationData := taskData.(*ReplicationTimeoutData) + replicationData := task.Data.(*ReplicationTimeoutData) // we found the timeout, now make sure all seqs were returned missing := findMissingNumbersInRange(replicationData.Start, replicationData.End, seqs) if len(missing) == 0 { @@ -327,10 +326,14 @@ func (r *ReplicationState) GetQuroumRoundWithSeq(seq uint64) *QuorumRound { return nil } -// FindTask returns the first TimeoutTask assigned to [node] that contains any sequence in [seqs]. +type ReplicationTimeoutData struct { + Start uint64 + End uint64 +} + +// FindReplicationTask returns the first TimeoutTask assigned to [node] that contains any sequence in [seqs]. // A sequence is considered "contained" if it falls between a task's Start (inclusive) and End (inclusive). -// func (t *TimeoutHandler) FindTask(node NodeID, seqs []uint64) *TimeoutTask { -func FindTask(t *TimeoutHandler, node NodeID, seqs []uint64) *TimeoutTask { +func FindReplicationTask(t *TimeoutHandler, node NodeID, seqs []uint64) *TimeoutTask { t.lock.Lock() defer t.lock.Unlock() diff --git a/timeout_handler.go b/timeout_handler.go index ba9d5c0a..5f697527 100644 --- a/timeout_handler.go +++ b/timeout_handler.go @@ -12,11 +12,6 @@ import ( "go.uber.org/zap" ) -type ReplicationTimeoutData struct { - Start uint64 - End uint64 -} - type TimeoutTask struct { NodeID NodeID TaskID string diff --git a/timeout_handler_test.go b/timeout_handler_test.go index 671fd42f..1e6a988d 100644 --- a/timeout_handler_test.go +++ b/timeout_handler_test.go @@ -328,7 +328,7 @@ func TestFindTask(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := simplex.FindTask(handler, tt.node, tt.seqs) + result := simplex.FindReplicationTask(handler, tt.node, tt.seqs) require.Equal(t, tt.expected, result) }) } From ef3a765755fb80d863db5b5b4c6586899544a6ab Mon Sep 17 00:00:00 2001 From: samliok Date: Wed, 30 Apr 2025 14:25:37 -0700 Subject: [PATCH 3/9] testing --- .github/workflows/ci.yaml | 19 +++++++++---------- fuzz.sh => scripts/fuzz.sh | 0 scripts/test.sh | 13 +++++++++++++ 3 files changed, 22 insertions(+), 10 deletions(-) rename fuzz.sh => scripts/fuzz.sh (100%) create mode 100644 scripts/test.sh diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 82525c35..2b350544 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -8,18 +8,17 @@ on: branches: ['**'] jobs: - build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v4 - - name: Set up Go - uses: actions/setup-go@v4 - with: - go-version: '1.23' + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.23' - - name: Test - run: go test -race ./... - - name: Fuzz - run: bash fuzz.sh + - name: Test + run: bash ./scripts/test.sh + - name: Fuzz + run: bash ./scripts/fuzz.sh diff --git a/fuzz.sh b/scripts/fuzz.sh similarity index 100% rename from fuzz.sh rename to scripts/fuzz.sh diff --git a/scripts/test.sh b/scripts/test.sh new file mode 100644 index 00000000..7373ad0f --- /dev/null +++ b/scripts/test.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env -euxo pipefail bash + + +function dots() { + while :; do + echo "." + sleep 1 + done +} + +dots & + +go test -race ./... \ No newline at end of file From 6b873c6292f05f688cd512823cc53fabce9a40e4 Mon Sep 17 00:00:00 2001 From: samliok Date: Wed, 30 Apr 2025 14:32:57 -0700 Subject: [PATCH 4/9] newline at end of test.sh --- scripts/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test.sh b/scripts/test.sh index 7373ad0f..014d5e74 100644 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -10,4 +10,4 @@ function dots() { dots & -go test -race ./... \ No newline at end of file +go test -race ./... From d35a738385a64b146a01fa9e323bebd30817f764 Mon Sep 17 00:00:00 2001 From: samliok Date: Wed, 30 Apr 2025 14:40:16 -0700 Subject: [PATCH 5/9] add dmesg --- scripts/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test.sh b/scripts/test.sh index 014d5e74..0795d1b9 100644 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -10,4 +10,4 @@ function dots() { dots & -go test -race ./... +go test -race ./... || dmesg From 1071b4517bf591602da4c50d5e2974145cf103b1 Mon Sep 17 00:00:00 2001 From: samliok Date: Thu, 1 May 2025 21:44:18 -0700 Subject: [PATCH 6/9] remove -race from ci --- scripts/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test.sh b/scripts/test.sh index 0795d1b9..785710df 100644 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -10,4 +10,4 @@ function dots() { dots & -go test -race ./... || dmesg +go test ./... || dmesg From e6861aa89cb58154ba2287422295413307c64c76 Mon Sep 17 00:00:00 2001 From: samliok Date: Thu, 1 May 2025 22:20:44 -0700 Subject: [PATCH 7/9] find bug because maps dont iterate in order --- replication.go | 10 ++++++++-- scripts/test.sh | 2 +- timeout_handler_test.go | 3 +++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/replication.go b/replication.go index 43796ab0..df2d813c 100644 --- a/replication.go +++ b/replication.go @@ -337,15 +337,21 @@ func FindReplicationTask(t *TimeoutHandler, node NodeID, seqs []uint64) *Timeout t.lock.Lock() defer t.lock.Unlock() + lowestSeqStart := uint64(math.MaxUint64) + var lowestTask *TimeoutTask for _, task := range t.tasks[string(node)] { data := task.Data replicationData := data.(*ReplicationTimeoutData) for _, seq := range seqs { if seq >= replicationData.Start && seq <= replicationData.End { - return task + if replicationData.Start < lowestSeqStart { + lowestTask = task + lowestSeqStart = seq + break + } } } } - return nil + return lowestTask } diff --git a/scripts/test.sh b/scripts/test.sh index 785710df..bb593193 100644 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -10,4 +10,4 @@ function dots() { dots & -go test ./... || dmesg +go test ./... diff --git a/timeout_handler_test.go b/timeout_handler_test.go index 1e6a988d..0256c100 100644 --- a/timeout_handler_test.go +++ b/timeout_handler_test.go @@ -329,6 +329,9 @@ func TestFindTask(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { result := simplex.FindReplicationTask(handler, tt.node, tt.seqs) + if tt.expected != result { + require.Fail(t, "not equal") + } require.Equal(t, tt.expected, result) }) } From f70590b59cfaa5e5f3bcc8b8743f3ba0a29cada1 Mon Sep 17 00:00:00 2001 From: samliok Date: Fri, 2 May 2025 09:33:32 -0700 Subject: [PATCH 8/9] re-add race --- scripts/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/test.sh b/scripts/test.sh index bb593193..4f893194 100644 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -10,4 +10,4 @@ function dots() { dots & -go test ./... +go test ./... -race From 244912338a18412436895ab358cdf0779dcef35e Mon Sep 17 00:00:00 2001 From: samliok Date: Fri, 2 May 2025 09:38:33 -0700 Subject: [PATCH 9/9] remove variable --- replication.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/replication.go b/replication.go index df2d813c..af81d489 100644 --- a/replication.go +++ b/replication.go @@ -337,16 +337,14 @@ func FindReplicationTask(t *TimeoutHandler, node NodeID, seqs []uint64) *Timeout t.lock.Lock() defer t.lock.Unlock() - lowestSeqStart := uint64(math.MaxUint64) var lowestTask *TimeoutTask for _, task := range t.tasks[string(node)] { data := task.Data replicationData := data.(*ReplicationTimeoutData) for _, seq := range seqs { if seq >= replicationData.Start && seq <= replicationData.End { - if replicationData.Start < lowestSeqStart { + if lowestTask == nil || replicationData.Start < lowestTask.Data.(*ReplicationTimeoutData).Start { lowestTask = task - lowestSeqStart = seq break } }