Producer发送消息步骤
Kafka producer的正常生产逻辑包含以下几个步骤:
- 配置生产者客户端参数常见生产者实例。
- 构建待发送的消息。
- 发送消息。
- 关闭生产者实例。
Producer发送消息的过程如下图所示,需要经过拦截器
,序列化器
和分区器
,最终由累加器
批量发送至 Broker。
Kafka Producer生产必备参数
- bootstrap.server:指定 Kafka 的 Broker 的地址
- key.serializer:key 序列化器
- value.serializer:value 序列化器
常见参数:
- batch.num.messages
默认值:200,每次批量消息的数量,只对 asyc 起作用。
- request.required.acks
默认值:0,0 表示 producer 毋须等待 leader 的确认,1 代表需要 leader 确认写入它的本地 log 并立即确认,-1 代表所有的备份都完成后确认。只对 async 模式起作用,这个参数的调整是数据不丢失和发送效率的 tradeoff,如果对数据丢失不敏感而在乎效率的场景可以考虑设置为 0,这样可以大大提高 producer 发送数据的效率。
- request.timeout.ms
默认值:10000,确认超时时间。
- partitioner.class
默认值:kafka.producer.DefaultPartitioner,必须实现 kafka.producer.Partitioner,根据 Key 提供一个分区策略。_有时候我们需要相同类型的消息必须顺序处理,这样我们就必须自定义分配策略,从而将相同类型的数据分配到同一个分区中。_
- producer.type
默认值:sync,指定消息发送是同步还是异步。异步 asyc 成批发送用 kafka.producer.AyncProducer, 同步 sync 用 kafka.producer.SyncProducer。同步和异步发送也会影响消息生产的效率。
- compression.topic
默认值:none,消息压缩,默认不压缩。其余压缩方式还有,”gzip”、”snappy”和”lz4″。对消息的压缩可以极大地减少网络传输量、降低网络 IO,从而提高整体性能。
- compressed.topics
默认值:null,在设置了压缩的情况下,可以指定特定的 topic 压缩,未指定则全部压缩。
- message.send.max.retries
默认值:3,消息发送最大尝试次数。
- retry.backoff.ms
默认值:300,每次尝试增加的额外的间隔时间。
- topic.metadata.refresh.interval.ms
默认值:600000,定期的获取元数据的时间。当分区丢失,leader 不可用时 producer 也会主动获取元数据,如果为 0,则每次发送完消息就获取元数据,不推荐。如果为负值,则只有在失败的情况下获取元数据。
- queue.buffering.max.ms
默认值:5000,在 producer queue 的缓存的数据最大时间,仅仅 for asyc。
- queue.buffering.max.message
默认值:10000,producer 缓存的消息的最大数量,仅仅 for asyc。
- queue.enqueue.timeout.ms
默认值:-1,0 当 queue 满时丢掉,负值是 queue 满时 block, 正值是 queue 满时 block 相应的时间,仅仅 for asyc。