Connect Worker 基本配置:bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
plugin.path=/usr/share/java,/etc/kafka-connect/jars
rest.port=8083
S3 Sink 连接器配置(s3-sink.json):{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "2",
"topics": "orders",
"s3.bucket.name": "my-bucket",
"s3.region": "us-east-1",
"aws.access.key.id": "AKIA...",
"aws.secret.access.key": "SECRET",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"flush.size": "1000",
"rotate.interval.ms": "600000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "yyyy/MM/dd/HH",
"timezone": "UTC",
"locale": "en",
"timestamp.extractor": "Record",
"schema.compatibility": "NONE"
}
}
通过 REST 创建并查看状态:curl -X POST -H "Content-Type: application/json" --data @s3-sink.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/s3-sink/status

发表评论 取消回复