1.问题背景
kafka是以高吞吐量著称的,但日前解决一个实际问题中,发现使用不当仍会无法充分利用起吞吐量。我们的场景如下:
有两个kafka集群,需要从上游kafka读一个topic的消息,做一些自定义处理,再写到下游kafka的特定topic(有人说用flume,确实可以,不过自定义处理比较复杂的时候用flume就有点麻烦了)
这里集中在写这一端(读没有问题),开始使用最简单的方式,配一个Producer的bean,然后Producer.send()写下游。压测的时候发现写出去的流量很低,单进程出口流量大概只有1-2Mbps,低的难以接受
2.配置项
开始以为是配置有问题,所以尝试修改一些Producer的配置项。我们用的是异步模式(producer.sync=false),设置了一些提高吞吐量的配置项(包括有些可能牺牲数据一致性的选项),主要包括下面这些项
queue.enqueue.timeout.ms=0 #异步队列满后不阻塞
batch.num.messages=500 #加大异步发送批次大小(减少连接次数)
compression.codec=snappy #使用消息压缩
request.required.acks=-1 #不要求接收端回复ack
修改配置后吞吐量确实有一些提升,出口流量到5Mbps左右,但是仍然远低于预期,说明配置不是主要问题
3.源码排查
配置无法解决,只好去查一下源码。看到异步模式下Producer实际发送是在一个独立的线程类ProducerSendThread中进行,然后关键来了:一个Producer实例只包含一个ProducerSendThread线程(1对1,相关源码如下)
class Producer[K,V](val config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // only for unit testing
extends Logging {
private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
private var sync: Boolean = true
private var producerSendThread: ProducerSendThread[K,V] = null
private val lock = new Object()
config.producerType match {
case "sync" =>
case "async" =>
sync = false
producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue, eventHandler, config.queueBufferingMaxMs, config.batchNumMessages, config.clientId)
producerSendThread.start() // 初始化发送线程
}
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
KafkaMetricsReporter.startReporters(config.props)
AppInfo.registerInfo()
def this(config: ProducerConfig) =
this(config,
new DefaultEventHandler[K,V](config,
CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),
CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),
CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),
new ProducerPool(config)))
/**
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
* @param messages the producer data object that encapsulates the topic, key and message data
*/
def send(messages: KeyedMessage[K,V]*) {
lock synchronized {
if (hasShutdown.get)
throw new ProducerClosedException
recordStats(messages)
sync match {
case true => eventHandler.handle(messages)
case false => asyncSend(messages)
}
}
}
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
for (message <- messages) {
val added = config.queueEnqueueTimeoutMs match {
case 0 =>
queue.offer(message)
case _ =>
try {
config.queueEnqueueTimeoutMs < 0 match {
case true =>
queue.put(message)
true
case _ =>
queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
}
}
catch {
case e: InterruptedException =>
false
}
}
if(!added) {
producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
}else {
trace("Added to send queue an event: " + message.toString)
trace("Remaining queue size: " + queue.remainingCapacity)
}
}
}
}
然后Spring的bean默认又是单例的,所以实际上每个进程只有一个线程在写kafka,而单线程的吞吐量显然是有限的(并没有完全利用kafka集群的高吞吐量)
4.解决方法
既然kafka Producer是单线程的,那么就在上层封装一个Producer的实例池,进行并发写。优化以后,使用10个线程写,出口流量显著提升到了60Mbps左右
<bean id="producerClient" class="com.halo.kafka.producer.client.ProducerClient" init-method="init"
destroy-method="close" scope="prototype">
<property name="brokerList" value="${borker.list}"/>
<property name="sync" value="false"/>
......
</bean>
<bean id="producerPool" class="com.halo.dc.support.KafkaProducerPool">
<property name="threadNum" value="${thread.num}"/>
</bean>
public class KafkaProducerPool implements ApplicationContextAware { private ProducerClient[] pool; private int threadNum; private int index = 0; // 轮循id private static final Logger logger = LoggerFactory.getLogger(KafkaProducerPool.class); @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { logger.info("Init DCKafkaProducerPool: threadNum=" + threadNum); pool = new ProducerClient[threadNum]; for (int i = 0; i < threadNum; i++) { pool[i] = ctx.getBean(ProducerClient.class); } } public void send(String topic, String message) { pool[index++ % threadNum].send(topic, message); } }
5.总结
kafka的高吞吐量是针对服务端(集群)而言,并不是针对单客户端。具体到Producer端,需要自己创造多线程并发环境才能提高客户端的出口吞吐量,kafka并没有提供类似线程池的api(也许有?)。kafka设计上是针对分布式的,实际场景通常是有很多客户端(多进程),这也许是没有提供多线程api的原因。但是某些情况下仍然需要自己实现多线程写,比如压测的压力端,或者类似上面提到的处理转发场景
相关推荐
Kafka Producer机制优化-提高发送消息可靠性
第12单元 Kafka producer拦截器与Kafka Streams1
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
kafka多线程顺序消费
由于它已经设计、优化并用于的服务器系统,每个流每秒产生超过 100 万个 I/O 密集型任务,其内部实现记录并发处理的实现高度优化,可以产生理想的吞吐量以最少的服务器数量,最大限度地提高资源利用率。入门/教程请...
包含两个Java工程文件,详细讲述了kafka单线程,多线程的,生产者,消费者,及多线程消费者的管理器。
kafka学习过程,maven工程,包含基础过程、提升过程。可供大家学习一下,里面有详细注释,一个groupid多个Consumer来消费消息和一个Consumer且有多个线程消费
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
jmeter连接kafka需要的连接器,可用于将静态测试数据通过jemter模拟高并发数据流发送到kafka中,可作为Kafka的生产者。
kettle kafka 消息者生产插件,用于集成到kettle,生产Kafka消息。亲测试可用。
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import * from collections import OrderedDict threads = []
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
多次整理精简,得出基于maven 的springmvc 框架搭载 :多线程(线程池式)和 kafka(集群下生产者消费者);demo 下载导入改下kafka集群IP地址即可使用。
Kafka技术内幕:图文详解Kafka源码设计与实现 有书签 有源码
python模拟kafka生产者, 读取配置文件, 根据配置文件的信息, 向kafka中写入数据信息。
封装抽取了一个kafka生产者的连接池,能很好的用池的方式对kafka生产者连接点进行有效的管理
Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载
基于新 Kafka Producer 的 Flume kafka sink,高性能且可配置。 它依赖于很少的项目/库,只有 Flume 1.5.2 kafka-clients-0.8.2.1 或更高版本,slf4j。 类似于 Flume 1.6 KafkaSink,但这里有一些不同: Flume 1.6 ...