KafkaSpout
主要负责从kafka拉取消息后发射出去。每次调用nextTuple时,如果当前发射的数据为空则会从kafka拉取消息。每调用一次nextTuple只会发送一条数据。如果一次拉取多条数据,需要executor执行多次nextTuple。
public void nextTuple() { try { if (refreshAssignmentTimer.isExpiredResetOnTrue()) { refreshAssignment(); } if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) { if (isAtLeastOnceProcessing()) { commitOffsetsForAckedTuples(); } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) { Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = createFetchedOffsetsMetadata(consumer.assignment()); consumer.commitAsync(offsetsToCommit, null); LOG.debug("Committed offsets {} to Kafka", offsetsToCommit); } } // 获取可以拉取的分区信息,如果发射的数据未空才会去拉取 PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo(); if (pollablePartitionsInfo.shouldPoll()) { try { setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo)); } catch (RetriableException e) { LOG.error("Failed to poll from kafka.", e); } } // 发射数据 emitIfWaitingNotEmitted(); } catch (InterruptException e) { throwKafkaConsumerInterruptedException(); } }
讯享网
在默认情况下,拉取kafka的数据超时时间为200ms,每次拉取的最大记录数为500条。
KafkaSpout消费者的偏移量提交是由Spout自动提交的,在设置参数时不能设置为自动提交。
偏移量的处理保证有三种情况:
1. NO_GUARANTEE
消息可能被处理0次、1次或多次。等价于设置为自动提交,采用异步方式提交偏移量。
2. AT_MOST_ONCE
拉取到消息后立即提交,同步提交。
3. AT_LEAST_ONCE
定时提交,默认为500ms,如果消息发射后为收到ACK,消息会被重新发射。只有收到ACK的消息才会提交偏移量。
使用过程中存在的问题
情形1:
1个task由1个excutor处理,如果该task处理的topic的分区数为1时,不存在任何问题;如果处理的topic的分区数超过1,则可能产生以下问题:
如果某个分区的没有可消费的数据时,而另外一个分区有可消费的数据,当所有拉取的数据都发射完毕后,再次从kafka拉取数据时会被阻塞200ms,这样会导致另外一个分区的数据消费速度减慢。
情形2:
多个task由一个excutor处理,在storm中采用轮询的方式,如果一个分区没有可消费的数据且在200ms内无数据,则task执行时会被阻塞200ms,就会影响其他task的执行效率,导致kafka的消费速度降低。
针对上述问题可以解决的方式:
1. 拉取数据的超时时间设置更短,不要阻塞其他task任务的执行;
2. 在实际开发中,topic有多少个分区就设置多少个task,每个task都由一个excutor执行,即并行度设置为分区数。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/52913.html