背景
在前文我们对flink早期基于TCP的反压和现在基于信任值的反压机制进行了剖析 Flink反压机制剖析,本文主要从源码的角度对flink基于Credit&BackLog的反压机制从源码的角度剖析其具体实现过程。
基于TCP反压的问题

- TaskManager之间会启动一个TCP通道进行数据交互,TaskManager的所有Task通过多路复用使用同一个TCP通道。因此,当下游因一个Task处理能力不足造成反压时,就会导致整个TCP通道阻塞。即使其他Task还有空余的Buffer也无法接受数据。
- 上游ResultPartition只能通过TCP通道的状态被动感知下游处理能力,不能提前调整数据发生评率,也不能根据ResultPartition当前数据挤压情况调节下游节点的数据处理能力。
基于信用值的反压机制介绍
信任值反压的基本原理:引入上游BackLog表示上游数据挤压情况(即ResultSubPartition的队列中BufferConsumer的挤压情况),下游Credit表示下游处理能力(即InputChannel中Buffer队列中可用Buffer数量),动态调节上下游数据生产和处理频率。
具体过程如下:
- RemoteInputChannel启动过程中,会向NetworkBufferPool申请ExclusiveBuffers空间,具体大小根究配置决定
- RemoteInputChannel启动后,会向ResultPartition发生PartitionRequest请求,其中会包含InitCredit值,InitCredit值等于ExclusiveBuffers队列的大小。Credit值会写到上游ResultSubPartition对于的CreditBaseViewReader中,ViewReader通过消耗Credit读取Buffer。
- 上游ResultSubPartition的BufferConsumer队列写入新的BufferConsumer时,会同步增加BackLog值,最后BackLog值会和Buffer一起转换为BufferResponse结构发生到RemoteInputChannel中。
- RemoteInputChannel会根据BackLog指标,判断当前ExclusiveBuffers是否够用,如果不够,则向NetworkBufferPool申请FloatingBuffers,并更新UNAnnouncedCredit指标。
- RemoteInputChannel检测到足够的Buffer后,向ResultPartition发生UNAnnouncedCredit信用值,增加该RemoteInputChannel的信用值。
- 当CreditBaseViewReader有足够的Credit后,会将CreditBaseViewReader添加到AvailableReader队列中,然后从ResultSubPartition中读取Buffer数据。
基于信用值的反压机制详解
ResultPartition发送BackLog
算子处理完数据,会通过RecordWriter将数据写入ResultSubPartition的Buffers队列中,并更新BackLog指标。
class PipelinedSubpartition extends ResultSubpartition {
private boolean add(BufferConsumer bufferConsumer, boolean finish) {
checkNotNull(bufferConsumer); final boolean notifyDataAvailable; synchronized (buffers) {
if (isFinished || isReleased) {
bufferConsumer.close(); return false; } // 将BufferConsumers添加到队列中 buffers.add(bufferConsumer); updateStatistics(bufferConsumer); //增加BackLog指标 increaseBuffersInBacklog(bufferConsumer); notifyDataAvailable = shouldNotifyDataAvailable() || finish; isFinished |= finish; } if (notifyDataAvailable) {
notifyDataAvailable(); } return true; } //同步增加BackLog指标 private void increaseBuffersInBacklog(BufferConsumer buffer) {
assert Thread.holdsLock(buffers); if (buffer != null && buffer.isBuffer()) {
buffersInBacklog++; } } //ViewReader消费完Buffer后,同步减少BackLog指标 private void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
assert Thread.holdsLock(buffers); if (isBuffer) {
buffersInBacklog--; } } }
讯享网
BackLog和Buffer被消费出来封装为BufferResponse对象发送给下游RemoteInputChannel处理。
InputChannel处理BackLog
下游NettyClient的Handler中会调用到InputChannel.onBuffer方法,将数据写到InputChannel的Buffer队列中,期间会调用onSenderBackLog方法处理BackLog指标。

讯享网public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
void onSenderBacklog(int backlog) throws IOException {
int numRequestedBuffers = 0; synchronized (bufferQueue) {
if (isReleased.get()) {
return; } //通过BackLog+initialCredit计算需要的Buffer数 numRequiredBuffers = backlog + initialCredit; while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
//InputChannel中可用Buffer数不够,向LocalBufferPool申请FloatingBuffer Buffer buffer = inputGate.getBufferPool().requestBuffer(); if (buffer != null) {
bufferQueue.addFloatingBuffer(buffer); numRequestedBuffers++; } else if (inputGate.getBufferProvider().addBufferListener(this)) {
// 没有申请到Buffer,将当前InputChannel注册为监听器,等待获取更多的FloatingBuffer isWaitingForFloatingBuffers = true; break; } } } //将申请的FloatingBuffer的数量增加到unannouncedCredit中,用于增加信任值 if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
notifyCreditAvailable(); } } }
BackLog的大小会影响RemoteInputChannel中FloatingBuffer的申请数量,通过BackLog可以调节RemoteInputChannel的数据接入和处理能力。
InputChannel向ResultPartition发生Credit
RemoteInputChannel中的信用值分为两类:
- InitCredit:RemoteInputChannel的初始Credit,InitialCredit和RemoteInputChannel中的ExclusiveBuffers的数量保持一直。
- UnAnnouncedCredit:对于FloatingBuffer数量,UnAnnouncedCredit需要实时动态反馈给ResultPartition,以告知RemoteInputChannel具有更多的信用值处理数据
当RemoteInputChannel中的Credit因可用Buffer改变而变化时,RemoteInputChannel会调用notifyCreditAvailable()方法通知上游的ResultPartition
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
private void notifyCreditAvailable() {
checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); //通过partitionRequestClient将当前InputChannel注册到NettyClient的CreditBaseHandler中。 partitionRequestClient.notifyCreditAvailable(this); } }
将InputChannel作为UserEvent事件注入到Handler的pipeline中,然后回调userEventTriggered方法,将InputChannel放入队列中。
讯享网class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler {
//Step1 @Override public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
//将InputChannel作为UserEvent事件注入到pipeline循环中 ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel)); } //Step2:handler检测到UserEvent事件会调用该方法 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RemoteInputChannel) {
boolean triggerWrite = inputChannelsWithCredit.isEmpty(); //将InputChannel添加到inputChannelsWithCredit队列中 inputChannelsWithCredit.add((RemoteInputChannel) msg); //如果之前inputChannelsWithCredit初始为空,则触发 if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel()); } } else {
ctx.fireUserEventTriggered(msg); } } //Step3:将InputChannel新增的Credit发生给上游的ResultSubPartition private void writeAndFlushNextMessageIfPossible(Channel channel) {
//判断通道是否为可用状态 if (channelError.get() != null || !channel.isWritable()) {
return; } //循环读取inputChannelsWithCredit队列中的InputChannel while (true) {
RemoteInputChannel inputChannel = inputChannelsWithCredit.poll(); if (inputChannel == null) {
return; } //如果InputChannel已被释放,则不需要发生Credit if (!inputChannel.isReleased()) {
//创建AddCredit请求 AddCredit msg = new AddCredit( inputChannel.getPartitionId(), inputChannel.getAndResetUnannouncedCredit(),//获取UnannouncedCredit,并清零 inputChannel.getInputChannelId()); // 向TCP Channel中写入AddCredit消息 channel.writeAndFlush(msg).addListener(writeListener); return; } } } }
ResultPartition处理信用值
上游NettyServer接受到消息后,会先进入PartitionRequestServerHandler进行分类处理,当判断NettyMessage为AddCredit类型时,就会调用PartitionRequestQueue这个Handler的addCredit方法继续处理。
//Step1 void addCredit(InputChannelID receiverId, int credit) throws Exception {
if (fatalError) {
return; } //获取InputChannel对应的ViewReader NetworkSequenceViewReader reader = allReaders.get(receiverId); if (reader != null) {
//增加ViewReader的信任值 reader.addCredit(credit); //将该ViewReader将入可用队列,之后会使用队列中的ViewReader读取ResultSubPartition中的Buffer数据 enqueueAvailableReader(reader); } else {
throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists."); } } //Step2 private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return; } // 将ViewReader注册到可用队列中 boolean triggerWrite = availableReaders.isEmpty(); registerAvailableReader(reader); //如果之前队列为空,则再次开始调用队列中的ViewReader读取Buffer数据 if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel()); } } //Step3 //循环调用可用队列中的ViewReader,读取Buffer数据 private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError || !channel.isWritable()) {
return; } BufferAndAvailability next = null; try {
while (true) {
NetworkSequenceViewReader reader = pollAvailableReader(); // 没有可用的ViewReader就返回,等待下次有数据写入ResultSubPartition或者有Credit更新时,再次调用 if (reader == null) {
return; } //通过ViewReader读取Buffer数据 next = reader.getNextBuffer(); if (next == null) {
if (!reader.isReleased()) {
continue; } Throwable cause = reader.getFailureCause(); if (cause != null) {
ErrorResponse msg = new ErrorResponse( new ProducerFailedException(cause), reader.getReceiverId()); ctx.writeAndFlush(msg); } } else {
// 如果该ResultSubPartition还有可读Buffer,注册ViewReader继续读取Buffer if (next.moreAvailable()) {
registerAvailableReader(reader); } //创建BufferResponse请求 BufferResponse msg = new BufferResponse( next.buffer(), reader.getSequenceNumber(),//获取Record的序列号 reader.getReceiverId(), next.buffersInBacklog());//获取BackLog指标 // 通过TCP Channel将BufferResponse消息发送给InputChannel channel.writeAndFlush(msg).addListener(writeListener); return; } } } catch (Throwable t) {
if (next != null) {
next.buffer().recycleBuffer(); } throw new IOException(t.getMessage(), t); } }
总结
基于Credit的反压机制主要是通过上下游的BackLog&Credit指标控制的。
- BackLog表明上游积压的数据量,通过BackLog会影响到下游RemoteInputChannel中FloatingBuffer的数量,提升InputChannel的处理能力。
- Credit体现了下游的处理能力,当下游RemoteInputChannel的AvailableBuffer的数量变化时,会将增加的Buffer数量以Credit的形式发生给上游,表明下游有更多的Buffer接受数据。
- 如果下游数据处理不及时,上游就会发生BackLog提升RemoteInputChannel中FloatingBuffer的数量。如果RemoteInputChannel无法申请更多的FloatingBuffer,则不会继续向上游发生Credit,此时上游的Handler就会把ViewReader从AvailableReader队列中移除,就不会再讲ResultSubPartition中的Buffer推送给下游。直到下游有足够Credit,才会再次触发ViewReader的读取操作。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/32923.html