Skip to main content
This document outlines the configuration of the AutoMQ Table Topic feature, which includes the Schema Registry, Catalog, and the setup of the AutoMQ server.

Schema Registry Configuration

The Schema Registry manages the Kafka Topic message Schema, ensuring data consistency and compatibility between producers and consumers. AutoMQ Table Topic utilizes the Schema Registry to parse Topic data and synchronize with the Iceberg table structure, supporting Schema evolution.

Constraints

Table Topic currently supports only the following Schema Registry implementations.
  • Confluent Schema Registry: Facilitates schema management for Kafka and offers a REST API.
  • Aiven Karapace: An open-source Schema Registry that is compatible with the Confluent Schema Registry’s REST API.
Table Topic does not currently support authentication configurations for Schema Registry (such as Basic Auth, SASL, mTLS). You must configure the Schema Registry to permit anonymous access for AutoMQ clusters and ensure network security.

Configure Schema Registry

Users specify the Schema Registry service address via the following parameters (cluster-level configuration). Configuration Parameters:
  • automq.table.topic.schema.registry.url: The URL of the Schema Registry (e.g., http://schema-registry.example.com:8081). AutoMQ uses this URL to retrieve and synchronize the Schema.
Note:
  • Configure this when using the deprecated automq.table.topic.schema.type=schema value or when either automq.table.topic.convert.value.type or automq.table.topic.convert.key.type is set to by_schema_id or by_latest_schema.
Example:
automq.table.topic.schema.registry.url=https://schema-registry.example.com:8081

Catalog Configuration

In Apache Iceberg, the Catalog plays a crucial role in managing table metadata. Its primary responsibilities include:
  1. Tracking the current metadata pointer for each Iceberg table. This pointer indicates the location of the latest metadata file (e.g., vN.metadata.json).
  2. Providing atomic operations to update this metadata pointer. This is critical for ensuring data consistency during commit processes such as writing new data or evolving schemas.
  3. Organizing tables into namespaces and providing methods to list, create, delete, and rename tables.
Essentially, a Catalog serves as the entry point for any Iceberg operation, guiding the query engine on where to find authoritative information regarding a table’s schema, partitions, snapshots, and data files.

Configuring the Catalog

Table Topic utilizes an external Iceberg Catalog to manage the metadata of its “table topics.”
  • General Configuration Prefix: All Iceberg Catalog settings related to the AutoMQ Table Topic use the prefix automq.table.topic.catalog.*.
  • Primary Configuration Key: Specifies the type of Iceberg Catalog, as defined by the automq.table.topic.catalog.type attribute.
AutoMQ Table Topic supports the following catalog types: rest, glue, tablebucket, nessie, hive.

Supported Catalog Types and Configuration

1. REST Catalog

This method uses the standard Iceberg REST Catalog service.
  • Type Setting: automq.table.topic.catalog.type=restsql
  • Configuration Property:
    • automq.table.topic.catalog.uri : The URI for the REST Catalog service (e.g., http://rest:8181).
    • automq.table.topic.catalog.warehouse : Defines the S3 path for the Iceberg data warehouse. If not set, it will default to the Iceberg directory of the DataBucket.
  • Required Permissions:
    • Credentials (if authentication is needed) that allow listing, creating, reading, and updating table metadata.
    • AutoMQ requires read/write/delete permissions for the S3 warehouse path.
Example:
automq.table.topic.catalog.type=rest
automq.table.topic.catalog.uri=http://rest:8181
automq.table.topic.catalog.warehouse=s3://automq-bucket/wh/

2. AWS Glue Data Catalog

Utilizing AWS Glue as the Iceberg Catalog.
  • Type Setting: automq.table.topic.catalog.type=glue
  • Configuration Property:
    • automq.table.topic.catalog.warehouse: Specifies the S3 path for the Iceberg data warehouse.
  • Required Permissions:
  • AutoMQ requires database and table management permissions for AWS Glue.
  • AutoMQ needs read/write/delete permissions on the S3 warehouse path.
Example:
automq.table.topic.catalog.type=glue
automq.table.topic.catalog.warehouse=s3://automq-bucket/glue/

3. TableBucket (S3Table)

  • Type Setting: automq.table.topic.catalog.type=tablebucket
  • Configuration Property:
    • automq.table.topic.catalog.warehouse: Specifies the TableBucket ARN (e.g., arn:aws:s3tables:::bucket-name).
  • Required Permissions:
    • AutoMQ requires read and write access to the S3Table.
Example:
automq.table.topic.catalog.type=tablebucket
automq.table.topic.catalog.warehouse=arn:aws:s3tables:us-east-1:xxxxxx:bucket/xxxxx

4. Nessie Catalog

Utilize Project Nessie, a transactional data lake catalog with Git-like semantics.
  • Type Setting: automq.table.topic.catalog.type=nessie
  • Configuration Property:
    • automq.table.topic.catalog.uri : Specifies the URI of the Nessie server (e.g., http://nessie-server:19120/api/v2 ).
    • automq.table.topic.catalog.warehouse : Defines the S3 path for the Iceberg data warehouse. If not set, it will default to the Iceberg directory of the DataBucket.
Example:
automq.table.topic.catalog.type=nessie
automq.table.topic.catalog.uri=http://nessie-server:19120/api/v2
automq.table.topic.catalog.warehouse=s3://automq-bucket/nessie/

5. Hive Metastore Catalog

Utilize the existing Hive Metastore as an Iceberg Catalog. When configuring Hive Metastore as the Catalog, refer to documentation for enabling Iceberg in Hive.
  • Type Setting: automq.table.topic.catalog.type=hive
  • Configuration Property:
    • automq.table.topic.catalog.uri: Specifies the Hive Metastore URI (e.g., thrift://hostname:9083).
    • automq.table.topic.catalog.warehouse : Specifies the HDFS or S3 path for the Iceberg data warehouse. If not configured, the Iceberg directory of DataBucket will be used.
  • Authentication Configuration:
    • Simple Authentication:
      • automq.table.topic.catalog.auth=simple://?username=xxx .
    • Kerberos Authentication:
automq.table.topic.catalog.auth=kerberos://?principal=base64(clientPrincipal)&keytab=base64(keytabFile)&krb5conf=base64(krb5confFile)
  • clientPrincipal : Base64 encoded client Kerberos principal.
  • keytabFile: Base64 encoded content of the client keytab file.
  • krb5confFile: Base64 encoded content of the krb5.conf file.
  • automq.table.topic.hadoop.metastore.kerberos.principal: Kerberos principal for the Hive Metastore server (e.g., hive/_HOST@REALM).
  • Required Permissions:
    • Create, alter, and drop table permissions for the Hive Metastore.
    • Kerberos authentication requires a valid principal and keytab.
    • AutoMQ needs read, write, and delete permissions in the repository path (HDFS or S3).
Example:
automq.table.topic.catalog.type=hive
automq.table.topic.catalog.uri=thrift://hostname:9083
automq.table.topic.catalog.warehouse=s3://automq-bucket/hive/
automq.table.topic.catalog.auth=simple://?username=user&password=pass

TableTopic Configuration

The following configurations are all topic-level settings that allow for enabling TableTopic features, adjusting the Iceberg commit interval, and configuring features such as partitioning and upsert.

Enabling the TableTopic Feature

The AutoMQ TableTopic feature allows for the conversion of data from a specified Kafka Topic into an Apache Iceberg table to support structured queries and analysis. Configuration Parameters:
  • automq.table.topic.enable : Determines whether the TableTopic is enabled. Setting this to true will result in the creation of an Iceberg table for data storage.
  • automq.table.topic.namespace : Specifies the namespace for the Iceberg table under the Catalog, adhering to the naming conventions of the Catalog (such as a combination of letters, numbers, and underscores). This should be configured at the time of Topic creation and cannot be modified later.
  • automq.table.topic.schema.type (deprecated): [DEPRECATED] The table topic schema type configuration. This setting will be removed in a future release. Please migrate to the converter and transform configurations.
    • schemaless: Maps to automq.table.topic.convert.value.type=raw and automq.table.topic.transform.value.type=none.
    • schema: Maps to automq.table.topic.convert.value.type=by_schema_id and automq.table.topic.transform.value.type=flatten.
automq.table.topic.enable=true
automq.table.topic.namespace=default
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.transform.value.type=flatten
Compatibility Example (deprecated):
automq.table.topic.schema.type=schema

Record Conversion Settings

Decide how Table Topic parses Kafka records before they are written to Iceberg. Value converter (automq.table.topic.convert.value.type)
  • raw: Pass the Kafka payload through without interpretation. Use this when the downstream query only needs the opaque bytes (for example, storing binary blobs). Schema Registry is not consulted.
  • string: Decode the payload as UTF-8 text and expose it as a string column. Useful for topics that store plain text or JSON strings but you still want Iceberg queries to treat them as text.
  • by_schema_id: Resolve the schema ID encoded in the Confluent wire format (leading magic byte plus 4-byte schema ID) and deserialize Avro/Protobuf data using Schema Registry. Choose this for producers that already register schemas and include the schema ID with every record.
  • by_latest_schema: Only supports Protobuf format. Fetch the most recent schema from Schema Registry and use it to decode raw Protobuf payloads that do not carry a schema ID. Configure the subject and the fully qualified message name as described below.
Key converter (automq.table.topic.convert.key.type) The key converter accepts the same options (raw, string, by_schema_id, by_latest_schema). Configure it when the Kafka record key is meaningful for your table:
  • Use raw when the key should stay opaque (for example, binary hash keys).
  • Use string when the key is textual.
  • Use by_schema_id or by_latest_schema when keys are structured Avro/Protobuf objects and you need individual fields for partitioning or equality deletes.
Configuring latest-schema lookup When either converter uses by_latest_schema, the service needs hints to select the right schema version.
  • automq.table.topic.convert.value.by_latest_schema.subject (or the key variant) overrides the Schema Registry subject. If unset, it defaults to <topic>-value or <topic>-key.
  • automq.table.topic.convert.value.by_latest_schema.message.full.name (and the key equivalent) specifies the fully qualified Protobuf message name. If not configured, the system defaults to using the first message definition in the schema file; explicit specification is required when a single schema file contains multiple messages to avoid ambiguity.
  • The latest version of the subject is used, so align your Schema Registry retention and compatibility rules accordingly.
automq.table.topic.convert.value.type=by_latest_schema
automq.table.topic.convert.value.by_latest_schema.subject=product-value
automq.table.topic.convert.value.by_latest_schema.message.full.name=examples.clients.proto.ProductData

Value Transformation

After conversion, you can apply an optional transformation to reshape the data before writing to Iceberg.
  • none: Persist the converted value as-is. Choose this when you only need the raw payload or when downstream consumers will parse it later.
  • flatten: Expand fields from structured Avro/Protobuf objects into top-level Iceberg columns. This is the typical choice for analytics-friendly tables.
  • flatten_debezium: A specialized transformer that extracts the row state from the Debezium envelope. It adds a _cdc.op field to indicate the operation type (e.g., ‘c’ for create, ‘u’ for update, ‘d’ for delete). This requires a schema-aware converter (by_schema_id or by_latest_schema).
automq.table.topic.transform.value.type=flatten_debezium
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.cdc.field=_cdc.op
automq.table.topic.id.columns=[id]

Example: none vs flatten

The following shows a single Kafka record and how query results differ between none and flatten value transformations. Kafka record (value payload):
{
    "order_id": 1,
    "product_name": "str_1_xaji0y"
}
  • With none:
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.transform.value.type=none
Query result:
 _kafka_value                            | _kafka_key |                 _kafka_metadata                  | _kafka_header 
--------------------------------------------------------+------------+--------------------------------------------------+---------------
 {order_id=1, product_name=str_1_xaji0y} |            | {partition=0, offset=0, timestamp=1762138970865} | {}            
  • With flatten (expanded business columns + system columns):
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.transform.value.type=flatten
Query result (one row, illustrative):
 order_id | product_name  | _kafka_header | _kafka_key |                 _kafka_metadata                  
----------+---------------+---------------+------------+--------------------------------------------------
    1     | str_1_xaji0y  | {}            |            | {partition=0, offset=0, timestamp=1762138970865} 

Error Handling

The automq.table.topic.errors.tolerance property controls how the Table Topic pipeline behaves when it encounters errors during record conversion or transformation.
  • none: In this mode, any error encountered during processing will halt the pipeline for the affected topic partition. The system will continuously retry to process the problematic record, effectively blocking any further records from that partition until the issue is resolved. This mode guarantees that no data is skipped, but it can cause data processing to stall.
  • invalid_data (default): This mode skips records with content issues, such as a missing magic code or an unresolvable schema ID (InvalidDataException). It continuously retries on RestClientException / SerializationException and exceptions in later processing stages to prevent data loss. Other data exceptions are skipped. This is the recommended setting for most production environments.
  • all: This mode skips any record that causes an exception.
automq.table.topic.errors.tolerance=invalid_data

Configuration Commit Interval

To enhance real-time analytics, you can modify the commit frequency of an Iceberg table to improve data freshness. Configuration Parameters:
  • automq.table.topic.commit.interval.ms: Specifies the data commit interval in milliseconds. The default is 300,000 (5 minutes), with a maximum of 900,000 (15 minutes). Shorter intervals improve real-time capability but increase processing overhead.
Note:
  • Frequent commits may result in commit conflicts and MetadataFile bloat, which can raise storage and query costs.
  • AutoMQ automatically deletes snapshots that are older than one hour to prevent the metadata.json file from becoming excessively large over time.
  • Regular table maintenance is necessary through compaction to merge small files, optimizing the size of the ManifestFile and improving query performance.
Example:
automq.table.topic.commit.interval.ms=60000

Optimize Query Performance: Partition Configuration

To enhance the query performance of Iceberg tables, especially in scenarios involving selective filtering, partitioning rules can be configured. Configuration Parameters:
  • automq.table.topic.partition.by: Specifies partitioning rules, supporting partitioning by fields or functions. For example, [bucket(name, 3), month(timestamp)] means partitioning by hashing the name field into 3 buckets and partitioning by the month of the timestamp.
Supported Partition Strategies:
  • Bucket Partitioning: bucket(field, N), partitions by the hash value of the field.
  • Truncate Partitioning: truncate(field, N), partitions by the truncated value of the field.
  • Temporal Partitioning: year(timestamp), month(timestamp), day(timestamp), hour(timestamp).
For more details, refer to Iceberg Partitioning Documentation. Note:
  • Having too many partitions may increase metadata management overhead, leading to query planning delays and higher storage costs. It is recommended to set a reasonable number of partitions based on data volume and query patterns, and to regularly optimize small files through Compaction.
Example:
automq.table.topic.partition.by=[bucket(name, 3), month(timestamp)]

Upsert and CDC Mode Support

To facilitate Upsert or Change Data Capture (CDC) operations, you can enable the following features for dynamic data management. A primary key must be configured to support row-level operations. Configuration Parameters:
  • automq.table.topic.id.columns: Specify the primary key columns (composite keys are supported), such as [region, name].
  • automq.table.topic.upsert.enable: Indicate whether to enable Upsert mode. Set to true to have the system insert or update records based on the primary key.
  • automq.table.topic.cdc.field: Indicates the type of CDC operation, which can be either I (insert), U (update), or D (delete).
Note:
  • To enable Upsert or CDC mode, it is necessary to configure automq.table.topic.id.columns; otherwise, only append-only writes will be supported.
  • Iceberg V2 tables support row-level operations:
    • Creation Logic: Records inserted or updated are kept in the datafile, while deleted records are marked via a deletefile (which includes the primary key and deletion marker). An Upsert operation might create a new datafile, and the CDC’s delete operation results in a deletefile.
    • Query Logic: Use the Merge-on-Read (MOR) mechanism to combine data files and delete files during queries, filtering out records marked for deletion using equality delete.
  • Enhance query performance consistently by merging data files and delete files through Compaction.
Example:
  1. Enable Upsert Mode:
automq.table.topic.upsert.enable=true
automq.table.topic.id.columns=[id, name]
  1. Enable CDC Mode
automq.table.topic.cdc.field=op_type
automq.table.topic.id.columns=[id, name]