Connect Worker 基本配置:bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false plugin.path=/usr/share/java,/etc/kafka-connect/jars rest.port=8083 S3 Sink 连接器配置(s3-sink.json):{ "name": "s3-sink", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "2", "topics": "orders", "s3.bucket.name": "my-bucket", "s3.region": "us-east-1", "aws.access.key.id": "AKIA...", "aws.secret.access.key": "SECRET", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "flush.size": "1000", "rotate.interval.ms": "600000", "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", "path.format": "yyyy/MM/dd/HH", "timezone": "UTC", "locale": "en", "timestamp.extractor": "Record", "schema.compatibility": "NONE" } } 通过 REST 创建并查看状态:curl -X POST -H "Content-Type: application/json" --data @s3-sink.json http://localhost:8083/connectors curl http://localhost:8083/connectors/s3-sink/status

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论
立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部