Skip to content

Commit ad628ef

Browse files
author
ZENOTME
committed
support rewrite manifest
1 parent 3e1d59d commit ad628ef

File tree

6 files changed

+1198
-40
lines changed

6 files changed

+1198
-40
lines changed

crates/iceberg/src/spec/manifest/entry.rs

+6
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ impl ManifestEntry {
136136
self.sequence_number
137137
}
138138

139+
/// File sequence number.
140+
#[inline]
141+
pub fn file_sequence_number(&self) -> Option<i64> {
142+
self.file_sequence_number
143+
}
144+
139145
/// File size in bytes.
140146
#[inline]
141147
pub fn file_size_in_bytes(&self) -> u64 {

crates/iceberg/src/transaction/mod.rs

+42
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@
1818
//! This module contains transaction api.
1919
2020
mod append;
21+
mod rewrite_manifest;
22+
pub use rewrite_manifest::{
23+
CREATED_MANIFESTS_COUNT, KEPT_MANIFESTS_COUNT, PROCESSED_ENTRY_COUNT, REPLACED_MANIFESTS_COUNT,
24+
};
2125
mod snapshot;
2226
mod sort_order;
2327

2428
use std::cmp::Ordering;
2529
use std::collections::HashMap;
30+
use std::hash::Hash;
2631
use std::mem::discriminant;
2732
use std::sync::Arc;
2833

34+
use rewrite_manifest::{ClusterFunc, PredicateFunc, RewriteManifsetAction};
2935
use uuid::Uuid;
3036

3137
use crate::error::Result;
@@ -166,6 +172,42 @@ impl<'a> Transaction<'a> {
166172
)
167173
}
168174

175+
/// Rewrite manifest file.
176+
pub fn rewrite_manifest<T: Hash + Eq>(
177+
self,
178+
cluster_by_func: Option<ClusterFunc<T>>,
179+
manifest_predicate: Option<PredicateFunc>,
180+
snapshot_id: Option<i64>,
181+
commit_uuid: Option<Uuid>,
182+
key_metadata: Vec<u8>,
183+
) -> Result<RewriteManifsetAction<'a, T>> {
184+
let snapshot_id = if let Some(snapshot_id) = snapshot_id {
185+
if self
186+
.current_table
187+
.metadata()
188+
.snapshots()
189+
.any(|s| s.snapshot_id() == snapshot_id)
190+
{
191+
return Err(Error::new(
192+
ErrorKind::DataInvalid,
193+
format!("Snapshot id {} already exists", snapshot_id),
194+
));
195+
}
196+
snapshot_id
197+
} else {
198+
self.generate_unique_snapshot_id()
199+
};
200+
RewriteManifsetAction::new(
201+
self,
202+
cluster_by_func,
203+
manifest_predicate,
204+
snapshot_id,
205+
commit_uuid.unwrap_or_else(Uuid::now_v7),
206+
key_metadata,
207+
HashMap::new(),
208+
)
209+
}
210+
169211
/// Creates replace sort order action.
170212
pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
171213
ReplaceSortOrderAction {

0 commit comments

Comments
 (0)