---
title: Kafka Connect S3 Sink 连接器配置与对象存储落盘实践
keywords: Kafka Connect, S3 Sink, TimeBasedPartitioner, JsonFormat, flush.size, plugin.path,
REST API
description: 配置并部署 S3 Sink 连接器,将 Kafka 主题按时间分区写入对象存储并控制落盘粒度与滚动周期。
tags:
- JsonFormat
- Kafka
- Kafka Connect
- REST API
- S3
- S3 Sink
- TimeBasedPartitioner
- flush.size
- plugin.path
- 数据流
categories:
- 文章资讯
- 技术教程
---
Connect Worker 基本配置:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
plugin.path=/usr/share/java,/etc/kafka-connect/jars
rest.port=8083
S3 Sink 连接器配置(s3-sink.json):
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "2",
"topics": "orders",
"s3.bucket.name": "my-bucket",
"s3.region": "us-east-1",
"aws.access.key.id": "AKIA...",
"aws.secret.access.key": "SECRET",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"flush.size": "1000",
"rotate.interval.ms": "600000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "yyyy/MM/dd/HH",
"timezone": "UTC",
"locale": "en",
"timestamp.extractor": "Record",
"schema.compatibility": "NONE"
}
}
通过 REST 创建并查看状态:
curl -X POST -H "Content-Type: application/json" --data @s3-sink.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/s3-sink/status

发表评论 取消回复