Skip to content

Commit eb13fb5

Browse files
committed
Fix reported progress around defrag
1 parent ac7ce8e commit eb13fb5

File tree

4 files changed

+66
-7
lines changed

4 files changed

+66
-7
lines changed

crates/core/src/sync/storage_adapter.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ impl StorageAdapter {
115115
}))
116116
}
117117

118+
pub fn reset_progress(&self) -> Result<(), ResultCode> {
119+
self.db
120+
.exec_safe("UPDATE ps_buckets SET count_since_last = 0, count_at_last = 0;")?;
121+
Ok(())
122+
}
123+
118124
pub fn lookup_bucket(&self, bucket: &str) -> Result<BucketInfo, ResultCode> {
119125
// We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
120126
// We can consider splitting this into separate SELECT and INSERT statements.

crates/core/src/sync/streaming_sync.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use super::{
2323
line::{BucketChecksum, Checkpoint, CheckpointDiff, SyncLine},
2424
operations::insert_bucket_operations,
2525
storage_adapter::{StorageAdapter, SyncLocalResult},
26-
sync_status::{SyncDownloadProgress, SyncStatusContainer},
26+
sync_status::{SyncDownloadProgress, SyncProgressFromCheckpoint, SyncStatusContainer},
2727
Checksum,
2828
};
2929

@@ -428,10 +428,16 @@ impl StreamingSyncIteration {
428428
checkpoint: &OwnedCheckpoint,
429429
) -> Result<SyncDownloadProgress, SQLiteError> {
430430
let local_progress = self.adapter.local_progress()?;
431-
Ok(SyncDownloadProgress::for_checkpoint(
432-
checkpoint,
433-
local_progress,
434-
)?)
431+
let SyncProgressFromCheckpoint {
432+
progress,
433+
needs_counter_reset,
434+
} = SyncDownloadProgress::for_checkpoint(checkpoint, local_progress)?;
435+
436+
if needs_counter_reset {
437+
self.adapter.reset_progress()?;
438+
}
439+
440+
Ok(progress)
435441
}
436442

437443
/// Prepares a sync iteration by handling the initial [SyncEvent::Initialize].

crates/core/src/sync/sync_status.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,20 @@ pub struct SyncDownloadProgress {
179179
buckets: BTreeMap<String, BucketProgress>,
180180
}
181181

182+
pub struct SyncProgressFromCheckpoint {
183+
pub progress: SyncDownloadProgress,
184+
pub needs_counter_reset: bool,
185+
}
186+
182187
impl SyncDownloadProgress {
183188
pub fn for_checkpoint<'a>(
184189
checkpoint: &OwnedCheckpoint,
185190
mut local_progress: impl StreamingIterator<
186191
Item = Result<PersistedBucketProgress<'a>, ResultCode>,
187192
>,
188-
) -> Result<Self, ResultCode> {
193+
) -> Result<SyncProgressFromCheckpoint, ResultCode> {
189194
let mut buckets = BTreeMap::<String, BucketProgress>::new();
195+
let mut needs_reset = false;
190196
for bucket in checkpoint.buckets.values() {
191197
buckets.insert(
192198
bucket.bucket.clone(),
@@ -212,9 +218,24 @@ impl SyncDownloadProgress {
212218

213219
progress.at_last = row.count_at_last;
214220
progress.since_last = row.count_since_last;
221+
222+
if progress.target_count < row.count_at_last + row.count_since_last {
223+
needs_reset = true;
224+
// Either due to a defrag / sync rule deploy or a compactioon operation, the size
225+
// of the bucket shrank so much that the local ops exceed the ops in the updated
226+
// bucket. We can't possibly report progress in this case (it would overshoot 100%).
227+
for (_, progress) in &mut buckets {
228+
progress.at_last = 0;
229+
progress.since_last = 0;
230+
}
231+
break;
232+
}
215233
}
216234

217-
Ok(Self { buckets })
235+
Ok(SyncProgressFromCheckpoint {
236+
progress: Self { buckets },
237+
needs_counter_reset: needs_reset,
238+
})
218239
}
219240

220241
pub fn increment_download_count(&mut self, line: &DataLine) {

dart/test/sync_test.dart

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,32 @@ void _syncTests<T>({
543543
expect(progress, isNull);
544544
});
545545

546+
test('interrupt and defrag', () {
547+
applyInstructions(invokeControl('start', null));
548+
applyInstructions(pushCheckpoint(
549+
buckets: [bucketDescription('a', count: 10)], lastOpId: 10));
550+
expect(totalProgress(), (0, 10));
551+
552+
pushSyncData('a', 5);
553+
expect(totalProgress(), (5, 10));
554+
555+
// Emulate stream closing
556+
applyInstructions(invokeControl('stop', null));
557+
expect(progress, isNull);
558+
559+
applyInstructions(invokeControl('start', null));
560+
// A defrag in the meantime shrank the bucket.
561+
applyInstructions(pushCheckpoint(
562+
buckets: [bucketDescription('a', count: 4)], lastOpId: 14));
563+
// So we shouldn't report 5/4.
564+
expect(totalProgress(), (0, 4));
565+
566+
// This should also reset the persisted progress counters.
567+
final [bucket] = db.select('SELECT * FROM ps_buckets');
568+
expect(bucket, containsPair('count_since_last', 0));
569+
expect(bucket, containsPair('count_at_last', 0));
570+
});
571+
546572
test('different priorities', () {
547573
void expectProgress((int, int) prio0, (int, int) prio2) {
548574
expect(priorityProgress(0), prio0);

0 commit comments

Comments
 (0)