跳转到主要内容
本文档介绍 AutoMQ Table Topic 功能的详细配置,包括 Schema Registry、Catalog 以及 AutoMQ 服务端的配置。

Schema Registry 配置

Schema Registry 管理 Kafka Topic 消息 Schema,确保生产者和消费者之间的数据一致性和兼容性。AutoMQ TableTopic 利用 Schema Registry 解析 Topic 数据并同步 Iceberg 表结构,支持 Schema 演进。

约束限制

Table Topic 当前仅支持以下 Schema Registry 实现。
  • Confluent Schema Registry :支持 Kafka 的 Schema 管理,提供 REST API。
  • Aiven Karapace :开源 Schema Registry,兼容 Confluent Schema Registry 的 REST API。
Table Topic 当前不支持 Schema Registry 的鉴权配置(如 Basic Auth、SASL、mTLS)。需将 Schema Registry 配置为允许 AutoMQ 集群的匿名访问,并确保网络安全。

配置 Schema Registry

用户通过以下参数指定 Schema Registry 服务地址(集群级配置)。 配置参数
  • automq.table.topic.schema.registry.url :Schema Registry 的 URL(如 http://schema-registry.example.com:8081 )。AutoMQ 使用此 URL 获取和同步 Schema。
注意事项
  • 当使用已弃用的 automq.table.topic.schema.type=schema,或将 automq.table.topic.convert.value.type/automq.table.topic.convert.key.type 设置为 by_schema_idby_latest_schema 时,需要配置该参数。
示例

automq.table.topic.schema.registry.url=https://schema-registry.example.com:8081

Catalog 配置

在 Apache Iceberg 中,Catalog 在管理表元数据方面扮演着至关重要的角色。其主要职责包括:
  1. 追踪每个 Iceberg 表的当前元数据指针 。该指针指示了最新元数据文件(例如 vN.metadata.json )的位置。
  2. 为更新此元数据指针提供原子操作 。这对于在提交过程中(例如,写入新数据或演进 Schema 时)确保数据一致性至关重要。
  3. 将表组织到命名空间中 ,并提供列出、创建、删除和重命名表的方法。
从本质上讲,Catalog 是任何 Iceberg 操作的入口点,它告诉查询引擎在哪里可以找到关于表的 Schema、分区、快照和数据文件的权威信息。

配置 Catalog

Table Topic 利用外部 Iceberg Catalog 来管理其 “table topics” 的元数据。
  • 通用配置前缀 :所有与 AutoMQ Table Topic 相关的 Iceberg Catalog 设置都使用前缀 automq.table.topic.catalog.*
  • 主要配置键 :指定 Iceberg Catalog 类型,由 automq.table.topic.catalog.type 属性定义。
AutoMQ Table Topic 支持以下 Catalog 类型: restgluetablebucketnessiehive

支持的 Catalog 类型及配置

1. REST Catalog

此方式使用标准的 Iceberg REST Catalog 服务。
  • 类型设置automq.table.topic.catalog.type=rest
  • 配置属性
    • automq.table.topic.catalog.uri :REST Catalog 服务的 URI(如 http://rest:8181 )。
    • automq.table.topic.catalog.warehouse :定义 Iceberg 数据仓库的 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
  • 所需权限
    • 凭证(若需认证)允许列出、创建、读取和更新表元数据。
    • AutoMQ 在 S3 仓库路径的读/写/删除权限。
示例

automq.table.topic.catalog.type=rest
automq.table.topic.catalog.uri=http://rest:8181
automq.table.topic.catalog.warehouse=s3://automq-bucket/wh/

2. AWS Glue Data Catalog

利用 AWS Glue 作为 Iceberg Catalog。
  • 类型设置automq.table.topic.catalog.type=glue
  • 配置属性
    • automq.table.topic.catalog.warehouse :定义 Iceberg 数据仓库的 S3 路径。
  • 所需权限
  • AutoMQ 对 AWS Glue 的数据库和表管理权限。
  • AutoMQ 在 S3 仓库路径的读/写/删除权限。
示例:

automq.table.topic.catalog.type=glue
automq.table.topic.catalog.warehouse=s3://automq-bucket/glue/

3. TableBucket (S3Table)

  • 类型设置automq.table.topic.catalog.type=tablebucket
  • 配置属性
    • automq.table.topic.catalog.warehouse :指定 TableBucket ARN (例如: arn:aws:s3tables:::bucket-name )。
  • 所需权限
    • AutoMQ 对 S3Table 的读写权限。
示例

automq.table.topic.catalog.type=tablebucket
automq.table.topic.catalog.warehouse=arn:aws:s3tables:us-east-1:xxxxxx:bucket/xxxxx

4. Nessie Catalog

使用 Project Nessie,这是一个具有类 Git 语义的事务性数据湖 Catalog。
  • 类型设置automq.table.topic.catalog.type=nessie
  • 配置属性
    • automq.table.topic.catalog.uri :指定 Nessie 服务器 URI (例如: http://nessie-server:19120/api/v2 )。
    • automq.table.topic.catalog.warehouse :定义 Iceberg 数据仓库的 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
示例

automq.table.topic.catalog.type=nessie
automq.table.topic.catalog.uri=http://nessie-server:19120/api/v2
automq.table.topic.catalog.warehouse=s3://automq-bucket/nessie/

5. Hive Metastore Catalog

使用现有的 Hive Metastore 作为 Iceberg Catalog。配置 Hive Metastore 作为 Catalog 时,可以查看文档在 Hive 中启用 Iceberg。
  • 类型设置automq.table.topic.catalog.type=hive
  • 配置属性
    • automq.table.topic.catalog.uri :指定 Hive Metastore URI (例如: thrift://hostname:9083 )。
    • automq.table.topic.catalog.warehouse :定义 Iceberg 数据仓库的 HDFS 或 S3 路径。若未设置,则会使用 DataBucket 的 iceberg目录。
  • 认证配置
    • 简单认证
      • automq.table.topic.catalog.auth=simple://?username=xxx .
    • Kerberos 认证
    
    automq.table.topic.catalog.auth=kerberos://?principal=base64(clientPrincipal)&keytab=base64(keytabFile)&krb5conf=base64(krb5confFile)
    
    
    • clientPrincipal :Base64 编码的客户端 Kerberos principal。
    • keytabFile :Base64 编码的客户端 keytab 文件内容。
    • krb5confFile :Base64 编码的 krb5.conf 文件内容。
    • automq.table.topic.hadoop.metastore.kerberos.principal :Hive Metastore 服务器的 Kerberos principal (例如: hive/_HOST@REALM )。
  • 所需权限
    • Hive Metastore 的创建、更改、删除表权限。
    • Kerberos 认证需有效 principal 和 keytab。
    • AutoMQ 在仓库路径(HDFS 或 S3)的读/写/删除权限。
示例

automq.table.topic.catalog.type=hive
automq.table.topic.catalog.uri=thrift://hostname:9083
automq.table.topic.catalog.warehouse=s3://automq-bucket/hive/
automq.table.topic.catalog.auth=simple://?username=user&password=pass

TableTopic 配置

以下配置均为 Topic 级别配置,通过此配置项,可进行 TableTopic 的功能启用设置,iceberg 提交间隔的调整,以及分区,upsert,分区等特性的配置。

启用 TableTopic 功能

AutoMQ TableTopic 功能可将指定 Kafka Topic 的数据转换为 Apache Iceberg 表以支持结构化查询和分析。 配置参数
  • automq.table.topic.enable : 是否启用 TableTopic。设为 `true` 后,将创建 Iceberg 表存储数据。
  • automq.table.topic.namespace : 指定 Iceberg 表在 Catalog 下的命名空间,需符合 Catalog 的命名规范(例如,字母、数字、下划线组合),在 Topic 创建时配置,不可后续修改。
  • automq.table.topic.schema.type (已弃用):该配置将在未来版本移除,请迁移至新的 converter 与 transform 配置。
    • schemaless :等价于 automq.table.topic.convert.value.type=rawautomq.table.topic.transform.value.type=none
    • schema :等价于 automq.table.topic.convert.value.type=by_schema_idautomq.table.topic.transform.value.type=flatten
automq.table.topic.enable=true
automq.table.topic.namespace=default
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.transform.value.type=flatten
兼容示例(已弃用)
automq.table.topic.schema.type=schema

记录转换配置

决定写入 Iceberg 之前如何解析 Kafka record。 Value 转换器(automq.table.topic.convert.value.type
  • raw:绕过解析,直接保留 Kafka payload 字节。适用于仅需保存原始二进制的场景,Schema Registry 不会被访问。
  • string:将 payload 按 UTF-8 解码为字符串列,适合保存纯文本或 JSON 字符串并在 Iceberg 中以文本形式查询。
  • by_schema_id:解析 Confluent wire format(开头 magic byte + 4 字节 schema ID),并通过 Schema Registry 反序列化 Avro/Protobuf 数据。适用于已经在生产端注册 schema 且消息携带 schema ID 的 Topic。
  • by_latest_schema仅支持 Protobuf 格式。针对未携带 schema ID 的原始 Protobuf payload,从 Schema Registry 拉取最新 schema 进行解析。需按照下文配置 subject 以及完整消息名。
Key 转换器(automq.table.topic.convert.key.type Key 转换器支持相同的取值(rawstringby_schema_idby_latest_schema)。当 Kafka record key 与 Iceberg 表相关时进行配置:
  • key 为不可见的二进制(如哈希)时可使用 raw
  • 若键为文本时可使用 string
  • key 为结构化 Avro/Protobuf 对象并希望在分区或主键中引用字段时,选择 by_schema_idby_latest_schema
配置 latest-schema 查找 当某个转换器使用 by_latest_schema 时,需要提供额外信息帮助服务定位正确的 schema 版本。
  • automq.table.topic.convert.value.by_latest_schema.subject(以及 key 端对应参数)可覆盖 Schema Registry subject,未设置时默认 <topic>-value / <topic>-key
  • automq.table.topic.convert.value.by_latest_schema.message.full.name(以及 key 端对应参数)用于指定 Protobuf 消息的完全限定名称。未配置时默认使用 schema 文件中的第一个 message 定义;当单个 schema 文件包含多个 message 时需显式指定以避免歧义。
  • 系统始终使用该 subject 的最新版本,请确保 Schema Registry 的保留与兼容策略满足要求。
automq.table.topic.convert.value.type=by_latest_schema
automq.table.topic.convert.value.by_latest_schema.subject=product-value
automq.table.topic.convert.value.by_latest_schema.message.full.name=examples.clients.proto.ProductData

Value 转换

完成解析后,可选择性地对 value 进行变换再写入 Iceberg。
  • none:直接写入解析结果,适用于仅需保留原始内容或后续由自定义查询逻辑解析的场景。
  • flatten:将结构化 Avro/Protobuf 对象展开成表的顶层列,是分析型表的常用选择。
  • flatten_debezium:用于从 Debezium envelope 中提取行记录,并添加 _cdc.op 字段以标记操作类型(例如,‘c’ 代表创建,‘u’ 代表更新,‘d’ 代表删除)。此转换器需要配合基于 schema 的解析方式(by_schema_idby_latest_schema)。
automq.table.topic.transform.value.type=flatten_debezium
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.cdc.field=_cdc.op
automq.table.topic.id.columns=[id]
示例:none 与 flatten 的差异
以下用一个简单示例展示同一条 Kafka 记录在 noneflatten 两种 Value 转换下的查询结果差异。 Kafka 记录(value 部分):
{
    "order_id": 1,
    "product_name": "str_1_xaji0y"
}
  • 使用 none 时(仅系统列):
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.transform.value.type=none
查询结果:
 _kafka_value                            | _kafka_key |                 _kafka_metadata                  | _kafka_header 
--------------------------------------------------------+------------+--------------------------------------------------+---------------
 {order_id=1, product_name=str_1_xaji0y} |            | {partition=0, offset=0, timestamp=1762138970865} | {}            
  • 使用 flatten 时(展开业务列 + 系统列):
automq.table.topic.convert.value.type=by_schema_id
automq.table.topic.transform.value.type=flatten
查询结果:
 order_id | product_name  | _kafka_header | _kafka_key |                 _kafka_metadata                  
----------+---------------+---------------+------------+--------------------------------------------------
    1     | str_1_xaji0y  | {}            |            | {partition=0, offset=0, timestamp=1762138970865} 

错误处理

automq.table.topic.errors.tolerance 用于配置 Table Topic 在处理记录的转换或变换过程中遇到错误时的行为。
  • none:在此模式下,处理过程中遇到的任何错误都将导致相关 Topic 分区的管道暂停。系统将持续重试处理有问题的记录,这会阻塞该分区后续记录的处理,直到问题解决。此模式确保没有数据被跳过,但可能导致数据处理停滞。
  • invalid_data(默认值):此模式跳过内容存在问题的记录,例如缺少 magic code 或无法解析的 schema ID(InvalidDataException)。对于 RestClientException / SerializationException 以及后续处理阶段的异常,它会持续重试以防止数据丢失。其他数据异常则会被跳过。这是大多数生产环境的推荐设置。
  • all:此模式跳过任何导致异常的记录。
automq.table.topic.errors.tolerance=invalid_data

配置提交间隔

为支持实时分析,可调整 Iceberg 表提交频率以提升数据新鲜度。 配置参数
  • automq.table.topic.commit.interval.ms :数据提交间隔(毫秒)。默认 300000(5 分钟),最大 900000(15 分钟)。较短间隔提升实时性,但增加处理开销。
注意事项
  • 高频率提交可能导致 Commit 冲突和 MetadataFile 膨胀,增加存储和查询成本。
  • 当前AutoMQ会自动删除1个小时前的Snapshot,避免随着时间的推移,metadata.json 文件变得过度臃肿。
  • 需定期维护表,通过 Compaction 合并小文件,以优化 ManifestFile 体积和查询性能。
示例

automq.table.topic.commit.interval.ms=60000

优化查询性能:分区配置

为提升 Iceberg 表查询性能(尤其在选择性过滤场景),可配置分区规则。 配置参数
  • automq.table.topic.partition.by :定义分区规则,支持基于字段或函数分区。例如, [bucket(name, 3), month(timestamp)] 表示按 name 哈希分桶(3 个桶)并按 timestamp 的月份分区。
支持的分区策略
  • 桶分区: bucket(field, N) ,按字段哈希值分区。
  • 截断分区: truncate(field, N) ,按字段截断值分区。
  • 时间分区: year(timestamp)month(timestamp)day(timestamp)hour(timestamp)
详情可参考 Iceberg 分区文档 注意事项
  • 分区过多可能增加元数据管理开销,导致查询计划生成延迟和存储成本上升。建议根据数据量和查询模式合理设置分区数,并定期通过 Compaction 优化小文件。
示例

automq.table.topic.partition.by=[bucket(name, 3), month(timestamp)]

支持 Upsert 和 CDC 模式

为支持更新(Upsert)或变更数据捕获(CDC)操作,可启用以下功能以实现动态数据管理。需配置主键以支持行级操作。 配置参数
  • automq.table.topic.id.columns :指定主键列(支持复合主键),如 [region, name]
  • automq.table.topic.upsert.enable :是否启用 Upsert 模式。设为 true 后,系统根据主键插入或更新记录。
  • automq.table.topic.cdc.field :指定 CDC 操作类型字段,值包括 I (插入)、 U (更新)、 D (删除)。
注意事项
  • 需配置 automq.table.topic.id.columns 以启用 Upsert 或 CDC 模式,否则仅支持追加写入。
  • Iceberg V2 表支持行级操作:
    • 创建逻辑 :插入或更新记录存储在 datafile 中,删除记录通过 deletefile 标记(包含主键和删除标记)。Upsert 操作可能生成新的 datafile,CDC 的删除操作生成 deletefile。
    • 查询逻辑 :使用 Merge-on-Read(MOR)机制,查询时合并 datafile 和 deletefile,通过 equality delete 过滤已标记删除的记录。
  • 定期通过 Compaction 合并 datafile 和 deletefile,以优化查询性能。
示例
  1. 启用 Upsert 模式:

automq.table.topic.upsert.enable=true
automq.table.topic.id.columns=[id, name]

  1. 启用 CDC 模式

automq.table.topic.cdc.field=op_type
automq.table.topic.id.columns=[id, name]