Schema Registry Configuration
The Schema Registry manages the Kafka Topic message Schema, ensuring data consistency and compatibility between producers and consumers. AutoMQ Table Topic utilizes the Schema Registry to parse Topic data and synchronize with the Iceberg table structure, supporting Schema evolution.Constraints
Table Topic currently supports only the following Schema Registry implementations.- Confluent Schema Registry: Facilitates schema management for Kafka and offers a REST API.
- Aiven Karapace: An open-source Schema Registry that is compatible with the Confluent Schema Registry’s REST API.
Table Topic does not currently support authentication configurations for Schema Registry (such as Basic Auth, SASL, mTLS). You must configure the Schema Registry to permit anonymous access for AutoMQ clusters and ensure network security.
Configure Schema Registry
Users specify the Schema Registry service address via the following parameters (cluster-level configuration). Configuration Parameters:automq.table.topic.schema.registry.url: The URL of the Schema Registry (e.g.,http://schema-registry.example.com:8081). AutoMQ uses this URL to retrieve and synchronize the Schema.
- Configure this when using the deprecated
automq.table.topic.schema.type=schemavalue or when eitherautomq.table.topic.convert.value.typeorautomq.table.topic.convert.key.typeis set toby_schema_idorby_latest_schema.
Catalog Configuration
In Apache Iceberg, the Catalog plays a crucial role in managing table metadata. Its primary responsibilities include:-
Tracking the current metadata pointer for each Iceberg table. This pointer indicates the location of the latest metadata file (e.g.,
vN.metadata.json). - Providing atomic operations to update this metadata pointer. This is critical for ensuring data consistency during commit processes such as writing new data or evolving schemas.
- Organizing tables into namespaces and providing methods to list, create, delete, and rename tables.
Configuring the Catalog
Table Topic utilizes an external Iceberg Catalog to manage the metadata of its “table topics.”-
General Configuration Prefix: All Iceberg Catalog settings related to the AutoMQ Table Topic use the prefix
automq.table.topic.catalog.*. -
Primary Configuration Key: Specifies the type of Iceberg Catalog, as defined by the
automq.table.topic.catalog.typeattribute.
rest, glue, tablebucket, nessie, hive.
Supported Catalog Types and Configuration
1. REST Catalog
This method uses the standard Iceberg REST Catalog service.-
Type Setting:
automq.table.topic.catalog.type=restsql -
Configuration Property:
-
automq.table.topic.catalog.uri: The URI for the REST Catalog service (e.g.,http://rest:8181). -
automq.table.topic.catalog.warehouse: Defines the S3 path for the Iceberg data warehouse. If not set, it will default to the Iceberg directory of the DataBucket.
-
-
Required Permissions:
- Credentials (if authentication is needed) that allow listing, creating, reading, and updating table metadata.
- AutoMQ requires read/write/delete permissions for the S3 warehouse path.
2. AWS Glue Data Catalog
Utilizing AWS Glue as the Iceberg Catalog.-
Type Setting:
automq.table.topic.catalog.type=glue -
Configuration Property:
automq.table.topic.catalog.warehouse: Specifies the S3 path for the Iceberg data warehouse.
- Required Permissions:
- AutoMQ requires database and table management permissions for AWS Glue.
- AutoMQ needs read/write/delete permissions on the S3 warehouse path.
3. TableBucket (S3Table)
-
Type Setting:
automq.table.topic.catalog.type=tablebucket -
Configuration Property:
automq.table.topic.catalog.warehouse: Specifies the TableBucket ARN (e.g.,arn:aws:s3tables:::bucket-name).
-
Required Permissions:
- AutoMQ requires read and write access to the S3Table.
4. Nessie Catalog
Utilize Project Nessie, a transactional data lake catalog with Git-like semantics.-
Type Setting:
automq.table.topic.catalog.type=nessie -
Configuration Property:
-
automq.table.topic.catalog.uri: Specifies the URI of the Nessie server (e.g.,http://nessie-server:19120/api/v2). -
automq.table.topic.catalog.warehouse: Defines the S3 path for the Iceberg data warehouse. If not set, it will default to the Iceberg directory of the DataBucket.
-
5. Hive Metastore Catalog
Utilize the existing Hive Metastore as an Iceberg Catalog. When configuring Hive Metastore as the Catalog, refer to documentation for enabling Iceberg in Hive.-
Type Setting:
automq.table.topic.catalog.type=hive -
Configuration Property:
-
automq.table.topic.catalog.uri: Specifies the Hive Metastore URI (e.g.,thrift://hostname:9083). -
automq.table.topic.catalog.warehouse: Specifies the HDFS or S3 path for the Iceberg data warehouse. If not configured, the Iceberg directory of DataBucket will be used.
-
-
Authentication Configuration:
-
Simple Authentication:
automq.table.topic.catalog.auth=simple://?username=xxx.
- Kerberos Authentication:
-
Simple Authentication:
-
clientPrincipal: Base64 encoded client Kerberos principal. -
keytabFile: Base64 encoded content of the client keytab file. -
krb5confFile: Base64 encoded content of thekrb5.conffile. -
automq.table.topic.hadoop.metastore.kerberos.principal: Kerberos principal for the Hive Metastore server (e.g.,hive/_HOST@REALM). -
Required Permissions:
- Create, alter, and drop table permissions for the Hive Metastore.
- Kerberos authentication requires a valid principal and keytab.
- AutoMQ needs read, write, and delete permissions in the repository path (HDFS or S3).
TableTopic Configuration
The following configurations are all topic-level settings that allow for enabling TableTopic features, adjusting the Iceberg commit interval, and configuring features such as partitioning and upsert.Enabling the TableTopic Feature
The AutoMQ TableTopic feature allows for the conversion of data from a specified Kafka Topic into an Apache Iceberg table to support structured queries and analysis. Configuration Parameters:-
automq.table.topic.enable: Determines whether the TableTopic is enabled. Setting this totruewill result in the creation of an Iceberg table for data storage. -
automq.table.topic.namespace: Specifies the namespace for the Iceberg table under the Catalog, adhering to the naming conventions of the Catalog (such as a combination of letters, numbers, and underscores). This should be configured at the time of Topic creation and cannot be modified later. -
automq.table.topic.schema.type(deprecated): [DEPRECATED] The table topic schema type configuration. This setting will be removed in a future release. Please migrate to the converter and transform configurations.-
schemaless: Maps toautomq.table.topic.convert.value.type=rawandautomq.table.topic.transform.value.type=none. -
schema: Maps toautomq.table.topic.convert.value.type=by_schema_idandautomq.table.topic.transform.value.type=flatten.
-
Record Conversion Settings
Decide how Table Topic parses Kafka records before they are written to Iceberg. Value converter (automq.table.topic.convert.value.type)
raw: Pass the Kafka payload through without interpretation. Use this when the downstream query only needs the opaque bytes (for example, storing binary blobs). Schema Registry is not consulted.string: Decode the payload as UTF-8 text and expose it as a string column. Useful for topics that store plain text or JSON strings but you still want Iceberg queries to treat them as text.by_schema_id: Resolve the schema ID encoded in the Confluent wire format (leading magic byte plus 4-byte schema ID) and deserialize Avro/Protobuf data using Schema Registry. Choose this for producers that already register schemas and include the schema ID with every record.by_latest_schema: Only supports Protobuf format. Fetch the most recent schema from Schema Registry and use it to decode raw Protobuf payloads that do not carry a schema ID. Configure the subject and the fully qualified message name as described below.
automq.table.topic.convert.key.type)
The key converter accepts the same options (raw, string, by_schema_id, by_latest_schema). Configure it when the Kafka record key is meaningful for your table:
- Use
rawwhen the key should stay opaque (for example, binary hash keys). - Use
stringwhen the key is textual. - Use
by_schema_idorby_latest_schemawhen keys are structured Avro/Protobuf objects and you need individual fields for partitioning or equality deletes.
by_latest_schema, the service needs hints to select the right schema version.
automq.table.topic.convert.value.by_latest_schema.subject(or the key variant) overrides the Schema Registry subject. If unset, it defaults to<topic>-valueor<topic>-key.automq.table.topic.convert.value.by_latest_schema.message.full.name(and the key equivalent) specifies the fully qualified Protobuf message name. If not configured, the system defaults to using the first message definition in the schema file; explicit specification is required when a single schema file contains multiple messages to avoid ambiguity.- The latest version of the subject is used, so align your Schema Registry retention and compatibility rules accordingly.
Value Transformation
After conversion, you can apply an optional transformation to reshape the data before writing to Iceberg.none: Persist the converted value as-is. Choose this when you only need the raw payload or when downstream consumers will parse it later.flatten: Expand fields from structured Avro/Protobuf objects into top-level Iceberg columns. This is the typical choice for analytics-friendly tables.flatten_debezium: A specialized transformer that extracts the row state from the Debezium envelope. It adds a_cdc.opfield to indicate the operation type (e.g., ‘c’ for create, ‘u’ for update, ‘d’ for delete). This requires a schema-aware converter (by_schema_idorby_latest_schema).
Example: none vs flatten
The following shows a single Kafka record and how query results differ betweennone and flatten value transformations.
Kafka record (value payload):
- With
none:
- With
flatten(expanded business columns + system columns):
Error Handling
Theautomq.table.topic.errors.tolerance property controls how the Table Topic pipeline behaves when it encounters errors during record conversion or transformation.
-
none: In this mode, any error encountered during processing will halt the pipeline for the affected topic partition. The system will continuously retry to process the problematic record, effectively blocking any further records from that partition until the issue is resolved. This mode guarantees that no data is skipped, but it can cause data processing to stall. -
invalid_data(default): This mode skips records with content issues, such as a missing magic code or an unresolvable schema ID (InvalidDataException). It continuously retries onRestClientException/SerializationExceptionand exceptions in later processing stages to prevent data loss. Other data exceptions are skipped. This is the recommended setting for most production environments. -
all: This mode skips any record that causes an exception.
Configuration Commit Interval
To enhance real-time analytics, you can modify the commit frequency of an Iceberg table to improve data freshness. Configuration Parameters:automq.table.topic.commit.interval.ms: Specifies the data commit interval in milliseconds. The default is 300,000 (5 minutes), with a maximum of 900,000 (15 minutes). Shorter intervals improve real-time capability but increase processing overhead.
- Frequent commits may result in commit conflicts and MetadataFile bloat, which can raise storage and query costs.
-
AutoMQ automatically deletes snapshots that are older than one hour to prevent the
metadata.jsonfile from becoming excessively large over time. -
Regular table maintenance is necessary through compaction to merge small files, optimizing the size of the
ManifestFileand improving query performance.
Optimize Query Performance: Partition Configuration
To enhance the query performance of Iceberg tables, especially in scenarios involving selective filtering, partitioning rules can be configured. Configuration Parameters:automq.table.topic.partition.by: Specifies partitioning rules, supporting partitioning by fields or functions. For example,[bucket(name, 3), month(timestamp)]means partitioning by hashing thenamefield into 3 buckets and partitioning by the month of thetimestamp.
-
Bucket Partitioning:
bucket(field, N), partitions by the hash value of the field. -
Truncate Partitioning:
truncate(field, N), partitions by the truncated value of the field. -
Temporal Partitioning:
year(timestamp),month(timestamp),day(timestamp),hour(timestamp).
- Having too many partitions may increase metadata management overhead, leading to query planning delays and higher storage costs. It is recommended to set a reasonable number of partitions based on data volume and query patterns, and to regularly optimize small files through Compaction.
Upsert and CDC Mode Support
To facilitate Upsert or Change Data Capture (CDC) operations, you can enable the following features for dynamic data management. A primary key must be configured to support row-level operations. Configuration Parameters:-
automq.table.topic.id.columns: Specify the primary key columns (composite keys are supported), such as[region, name]. -
automq.table.topic.upsert.enable: Indicate whether to enable Upsert mode. Set totrueto have the system insert or update records based on the primary key. -
automq.table.topic.cdc.field: Indicates the type of CDC operation, which can be eitherI(insert),U(update), orD(delete).
-
To enable Upsert or CDC mode, it is necessary to configure
automq.table.topic.id.columns; otherwise, only append-only writes will be supported. -
Iceberg V2 tables support row-level operations:
- Creation Logic: Records inserted or updated are kept in the datafile, while deleted records are marked via a deletefile (which includes the primary key and deletion marker). An Upsert operation might create a new datafile, and the CDC’s delete operation results in a deletefile.
- Query Logic: Use the Merge-on-Read (MOR) mechanism to combine data files and delete files during queries, filtering out records marked for deletion using equality delete.
- Enhance query performance consistently by merging data files and delete files through Compaction.
- Enable Upsert Mode:
- Enable CDC Mode