diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index cf4ad7216..4a45c13ba 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -18,20 +18,20 @@ //! This module contains memory catalog implementation. use std::collections::HashMap; +use std::str::FromStr; use async_trait::async_trait; -use futures::lock::Mutex; +use futures::lock::{Mutex, MutexGuard}; use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + TableIdent, TableRequirement, TableUpdate, }; use itertools::Itertools; -use uuid::Uuid; -use crate::namespace_state::NamespaceState; +use crate::namespace_state::{MetadataLocation, NamespaceState}; /// namespace `location` property const LOCATION: &str = "location"; @@ -53,6 +53,139 @@ impl MemoryCatalog { warehouse_location, } } + + /// Loads a table from the locked namespace state. + async fn load_table_from_locked_state( + &self, + table_ident: &TableIdent, + root_namespace_state: &MutexGuard<'_, NamespaceState>, + ) -> Result { + let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; + let metadata = self.read_metadata(metadata_location).await?; + + Table::builder() + .identifier(table_ident.clone()) + .metadata(metadata) + .metadata_location(metadata_location.to_string()) + .file_io(self.file_io.clone()) + .build() + } + + async fn update_table_in_locked_state( + &self, + mut commit: TableCommit, + locked_state: &MutexGuard<'_, NamespaceState>, + ) -> Result<(Table, MetadataLocation)> { + let current_table = self + .load_table_from_locked_state(commit.identifier(), locked_state) + .await?; + + // Checks whether the commit's expectations are met by the current table state. + let location = + Self::check_current_table_state(¤t_table, commit.take_requirements()).await?; + + self.apply_table_updates_and_write_metadata( + ¤t_table, + &location, + commit.take_updates(), + ) + .await + } + + async fn apply_table_updates_and_write_metadata( + &self, + current_table: &Table, + current_location: &MetadataLocation, + updates: Vec, + ) -> Result<(Table, MetadataLocation)> { + let new_location = current_location.with_next_version(); + + // Build the new table metadata. + let new_metadata = + Self::apply_table_updates(current_table, current_location, &new_location, updates)?; + + // Write the updated metadata to it's new location. + self.write_metadata(&new_metadata, &new_location).await?; + + // Return a table representing the updated version. + let new_table = Table::builder() + .identifier(current_table.identifier().clone()) + .metadata(new_metadata) + .metadata_location(new_location.to_string()) + .file_io(self.file_io.clone()) + .build()?; + + Ok((new_table, new_location)) + } + + async fn read_metadata(&self, location: &MetadataLocation) -> Result { + let input_file = self.file_io.new_input(location.to_string())?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + + Ok(metadata) + } + + async fn write_metadata( + &self, + metadata: &TableMetadata, + metadata_location: &MetadataLocation, + ) -> Result<()> { + self.file_io + .new_output(metadata_location.to_string())? + .write(serde_json::to_vec(metadata)?.into()) + .await + } + + /// Verifies that the a TableCommit's requirements are met by the current table state. + /// If not, there's a conflict and the client should retry the commit. + async fn check_current_table_state( + current_table: &Table, + requirements: Vec, + ) -> Result { + let location = + MetadataLocation::from_str(current_table.metadata_location().ok_or(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata location is not set: {}", + current_table.identifier() + ), + ))?)?; + + // Check that the commit's point of view is still reflected by the current state of the table. + for requirement in requirements { + requirement + .check(Some(current_table.metadata())) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Conflict: One or more requirements failed, the client my retry", + ) + .with_source(e) + })?; + } + + Ok(location) + } + + fn apply_table_updates( + table: &Table, + current_location: &MetadataLocation, + new_location: &MetadataLocation, + updates: Vec, + ) -> Result { + let mut builder = TableMetadataBuilder::new_from_metadata( + table.metadata().clone(), + Some(current_location.to_string()), + ) + .set_location(new_location.to_string()); + + for update in updates { + builder = update.apply(builder)?; + } + + Ok(builder.build()?.metadata) + } } #[async_trait] @@ -197,23 +330,15 @@ impl Catalog for MemoryCatalog { let metadata = TableMetadataBuilder::from_table_creation(table_creation)? .build()? .metadata; - let metadata_location = format!( - "{}/metadata/{}-{}.metadata.json", - &location, - 0, - Uuid::new_v4() - ); + let metadata_location = MetadataLocation::new(&location); - self.file_io - .new_output(&metadata_location)? - .write(serde_json::to_vec(&metadata)?.into()) - .await?; + self.write_metadata(&metadata, &metadata_location).await?; root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; Table::builder() .file_io(self.file_io.clone()) - .metadata_location(metadata_location) + .metadata_location(metadata_location.to_string()) .metadata(metadata) .identifier(table_ident) .build() @@ -223,17 +348,8 @@ impl Catalog for MemoryCatalog { async fn load_table(&self, table_ident: &TableIdent) -> Result
{ let root_namespace_state = self.root_namespace_state.lock().await; - let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; - let input_file = self.file_io.new_input(metadata_location)?; - let metadata_content = input_file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; - - Table::builder() - .file_io(self.file_io.clone()) - .metadata_location(metadata_location.clone()) - .metadata(metadata) - .identifier(table_ident.clone()) - .build() + self.load_table_from_locked_state(table_ident, &root_namespace_state) + .await } /// Drop a table from the catalog. @@ -241,7 +357,7 @@ impl Catalog for MemoryCatalog { let mut root_namespace_state = self.root_namespace_state.lock().await; let metadata_location = root_namespace_state.remove_existing_table(table_ident)?; - self.file_io.delete(&metadata_location).await + self.file_io.delete(metadata_location.to_string()).await } /// Check if a table exists in the catalog. @@ -270,12 +386,23 @@ impl Catalog for MemoryCatalog { Ok(()) } - /// Update a table to the catalog. - async fn update_table(&self, _commit: TableCommit) -> Result
{ - Err(Error::new( - ErrorKind::FeatureUnsupported, - "MemoryCatalog does not currently support updating tables.", - )) + /// Update a table in the catalog. + async fn update_table(&self, commit: TableCommit) -> Result
{ + let mut locked_namespace_state = self.root_namespace_state.lock().await; + + // Updates the current table version and writes a new metadata file. + let (staged_updated_table, new_metadata_location) = self + .update_table_in_locked_state(commit, &locked_namespace_state) + .await?; + + // Flip the pointer to reference the new metadata file. + locked_namespace_state + .commit_table_update(staged_updated_table.identifier(), new_metadata_location)?; + + // After the update is committed, the table is now the current version. + let updated_table = staged_updated_table; + + Ok(updated_table) } } @@ -284,9 +411,13 @@ mod tests { use std::collections::HashSet; use std::hash::Hash; use std::iter::FromIterator; + use std::vec; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{ + NestedField, NullOrder, PartitionSpec, PrimitiveType, Schema, SortOrder, Type, + }; + use iceberg::transaction::Transaction; use regex::Regex; use tempfile::TempDir; @@ -332,8 +463,8 @@ mod tests { .unwrap() } - async fn create_table(catalog: &C, table_ident: &TableIdent) { - let _ = catalog + async fn create_table(catalog: &C, table_ident: &TableIdent) -> Table { + catalog .create_table( &table_ident.namespace, TableCreation::builder() @@ -342,7 +473,7 @@ mod tests { .build(), ) .await - .unwrap(); + .unwrap() } async fn create_tables(catalog: &C, table_idents: Vec<&TableIdent>) { @@ -351,6 +482,14 @@ mod tests { } } + async fn create_table_with_namespace(catalog: &C) -> Table { + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(catalog, &namespace_ident).await; + + let table_ident = TableIdent::new(namespace_ident, "test".to_string()); + create_table(catalog, &table_ident).await + } + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { assert_eq!(table.identifier(), expected_table_ident); @@ -1673,9 +1812,118 @@ mod tests { .unwrap_err() .to_string(), format!( - "TableAlreadyExists => Cannot create table {:? }. Table already exists.", + "TableAlreadyExists => Cannot create table {:?}. Table already exists.", &dst_table_ident ), ); } + + #[tokio::test] + async fn test_update_table() { + let catalog = new_memory_catalog(); + + let table = create_table_with_namespace(&catalog).await; + + // Assert the table doesn't contain the update yet + assert!(!table.metadata().properties().contains_key("key")); + + // Update table metadata + let updated_table = Transaction::new(&table) + .set_properties(HashMap::from([("key".to_string(), "value".to_string())])) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + assert_eq!( + updated_table.metadata().properties().get("key").unwrap(), + "value" + ); + + assert_eq!(table.identifier(), updated_table.identifier()); + assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid()); + assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms()); + assert_ne!(table.metadata_location(), updated_table.metadata_location()); + assert!( + table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len() + ); + } + + #[tokio::test] + async fn test_update_table_fails_if_commit_conflicts() { + let catalog = new_memory_catalog(); + let base_table = create_table_with_namespace(&catalog).await; + + // Update the table by adding a new sort order. + let _sorted_table = Transaction::new(&base_table) + .replace_sort_order() + .asc("foo", NullOrder::First) + .unwrap() + .apply() + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + // Try to update the -now old- table again with a different sort order. + let err = Transaction::new(&base_table) + .replace_sort_order() + .desc("foo", NullOrder::Last) + .unwrap() + .apply() + .unwrap() + .commit(&catalog) + .await + .unwrap_err(); + + // The second transaction should fail because it didn't take the new update + // into account. + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.message().to_lowercase().contains("conflict")); + } + + #[tokio::test] + async fn test_update_table_fails_if_table_doesnt_exist() { + let catalog = new_memory_catalog(); + + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + // This table is not known to the catalog. + let table_ident = TableIdent::new(namespace_ident, "test".to_string()); + let table = build_table(table_ident); + + let err = Transaction::new(&table) + .set_properties(HashMap::from([("key".to_string(), "value".to_string())])) + .unwrap() + .commit(&catalog) + .await + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::TableNotFound); + } + + fn build_table(ident: TableIdent) -> Table { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + let temp_dir = TempDir::new().unwrap(); + let location = temp_dir.path().to_str().unwrap().to_string(); + + let table_creation = TableCreation::builder() + .name(ident.name().to_string()) + .schema(simple_table_schema()) + .location(location) + .build(); + let metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap() + .metadata; + + Table::builder() + .identifier(ident) + .metadata(metadata) + .file_io(file_io) + .build() + .unwrap() + } } diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index de1532203..c5eab995a 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -16,9 +16,12 @@ // under the License. use std::collections::{hash_map, HashMap}; +use std::fmt::Display; +use std::str::FromStr; use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use itertools::Itertools; +use uuid::Uuid; // Represents the state of a namespace #[derive(Debug, Clone, Default)] @@ -28,7 +31,7 @@ pub(crate) struct NamespaceState { // Namespaces nested inside this namespace namespaces: HashMap, // Mapping of tables to metadata locations in this namespace - table_metadata_locations: HashMap, + table_metadata_locations: HashMap, } fn no_such_namespace_err(namespace_ident: &NamespaceIdent) -> Result { @@ -253,7 +256,10 @@ impl NamespaceState { } // Returns the metadata location of the given table or an error if doesn't exist - pub(crate) fn get_existing_table_location(&self, table_ident: &TableIdent) -> Result<&String> { + pub(crate) fn get_existing_table_location( + &self, + table_ident: &TableIdent, + ) -> Result<&MetadataLocation> { let namespace = self.get_namespace(table_ident.namespace())?; match namespace.table_metadata_locations.get(table_ident.name()) { @@ -266,7 +272,7 @@ impl NamespaceState { pub(crate) fn insert_new_table( &mut self, table_ident: &TableIdent, - metadata_location: String, + location: MetadataLocation, ) -> Result<()> { let namespace = self.get_mut_namespace(table_ident.namespace())?; @@ -276,7 +282,7 @@ impl NamespaceState { { hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident), hash_map::Entry::Vacant(entry) => { - let _ = entry.insert(metadata_location); + let _ = entry.insert(location); Ok(()) } @@ -284,7 +290,10 @@ impl NamespaceState { } // Removes the given table or returns an error if doesn't exist - pub(crate) fn remove_existing_table(&mut self, table_ident: &TableIdent) -> Result { + pub(crate) fn remove_existing_table( + &mut self, + table_ident: &TableIdent, + ) -> Result { let namespace = self.get_mut_namespace(table_ident.namespace())?; match namespace @@ -295,4 +304,217 @@ impl NamespaceState { Some(metadata_location) => Ok(metadata_location), } } + + /// Updates the metadata location of the given table or returns an error if doesn't exist + pub(crate) fn commit_table_update( + &mut self, + table_ident: &TableIdent, + new_location: MetadataLocation, + ) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + + let _ = namespace + .table_metadata_locations + .insert(table_ident.name().to_string(), new_location) + .ok_or(Error::new( + ErrorKind::TableNotFound, + format!("No such table: {:?}", table_ident), + ))?; + + Ok(()) + } +} + +/// Represents a location of the format: `/metadata/-.metadata.json` +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct MetadataLocation { + prefix: String, + version: i32, + id: Uuid, +} + +impl MetadataLocation { + /// Creates a completely new metadata location starting at version 0. + /// Only used for creating a new table. For updates, see `with_next_version`. + pub(crate) fn new(prefix: &str) -> Self { + Self { + prefix: prefix.to_string(), + version: 0, + id: Uuid::new_v4(), + } + } + + /// Creates a new metadata location for an updated metadata file. + pub(crate) fn with_next_version(&self) -> Self { + Self { + prefix: self.prefix.clone(), + version: self.version + 1, + id: Uuid::new_v4(), + } + } + + fn parse_metadata_path_prefix(path: &str) -> Result { + let prefix = path.strip_suffix("/metadata").ok_or(Error::new( + ErrorKind::Unexpected, + format!( + "Metadata location not under \"/metadata\" subdirectory: {}", + path + ), + ))?; + + Ok(prefix.to_string()) + } + + /// Parses a file name of the format `-.metadata.json`. + fn parse_file_name(file_name: &str) -> Result<(i32, Uuid)> { + let (version, id) = file_name + .strip_suffix(".metadata.json") + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file ending: {}", file_name), + ))? + .split_once('-') + .ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file name format: {}", file_name), + ))?; + + let version = version + .parse::() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "Metadata version not a number").with_source(e) + }) + .and_then(|v| { + if v < 0 { + Err(Error::new( + ErrorKind::Unexpected, + format!("Negative metadata version: {}", version), + )) + } else { + Ok(v) + } + })?; + + Ok((version, Uuid::parse_str(id)?)) + } +} + +impl Display for MetadataLocation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}/metadata/{}-{}.metadata.json", + self.prefix, self.version, self.id + ) + } +} + +impl FromStr for MetadataLocation { + type Err = Error; + + fn from_str(s: &str) -> Result { + let (path, file_name) = s.rsplit_once('/').ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata location: {}", s), + ))?; + + let prefix = Self::parse_metadata_path_prefix(path)?; + let (version, id) = Self::parse_file_name(file_name)?; + + Ok(MetadataLocation { + prefix, + version, + id, + }) + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use uuid::Uuid; + + use super::MetadataLocation; + + #[test] + fn test_metadata_location_from_string() { + let test_cases = vec![ + // No prefix + ("/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Some prefix + ("/abc/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "/abc".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Longer prefix + ("/abc/def/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "/abc/def".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Prefix with special characters + ("https://127.0.0.1/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "https://127.0.0.1".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Another id + ("/abc/metadata/1234567-81056704-ce5b-41c4-bb83-eb6408081af6.metadata.json", Ok(MetadataLocation{ + prefix: "/abc".to_string(), + version: 1234567, + id: Uuid::from_str("81056704-ce5b-41c4-bb83-eb6408081af6").unwrap(), + })), + // Version 0 + ("/abc/metadata/0-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Ok(MetadataLocation{ + prefix: "/abc".to_string(), + version: 0, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + })), + // Negative version + ("/metadata/-123-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Err("".to_string())), + // Invalid uuid + ("/metadata/1234567-no-valid-id.metadata.json", Err("".to_string())), + // Non-numeric version + ("/metadata/noversion-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Err("".to_string())), + // No /metadata subdirectory + ("/wrongsubdir/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", Err("".to_string())), + // No .metadata.json suffix + ("/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata", Err("".to_string())), + ("/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.wrong.file", Err("".to_string())), + ]; + + for (input, expected) in test_cases { + match MetadataLocation::from_str(input) { + Ok(metadata_location) => { + assert!(expected.is_ok()); + assert_eq!(metadata_location, expected.unwrap()); + } + Err(_) => assert!(expected.is_err()), + } + } + } + + #[test] + fn test_metadata_location_with_next_version() { + let test_cases = vec![ + MetadataLocation::new("/abc"), + MetadataLocation::from_str( + "/abc/def/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", + ) + .unwrap(), + ]; + + for input in test_cases { + let next = input.with_next_version(); + assert_eq!(next.prefix, input.prefix); + assert_eq!(next.version, input.version + 1); + assert_ne!(next.id, input.id); + } + } } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 1eef1bcc4..dd07bea57 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -282,6 +282,12 @@ define_from_err!( "handling invalid utf-8 characters" ); +define_from_err!( + core::num::ParseIntError, + ErrorKind::Unexpected, + "parsing integer from string" +); + define_from_err!( std::array::TryFromSliceError, ErrorKind::DataInvalid,