2025年Linux GRO流程分析

Linux GRO流程分析1 概述 GRO 是针对报文接收方向的 是指设备链路层在接收报文处理的时候 将多个小包合并成一个大包一起上送协议栈 减少数据包在协议栈间交互的机制 可以通过 ethtool K eth0 gro on off 来打开或关闭 GRO 功能 GRO 虽然可以提升吞吐 但同时也会带来一定是时延增加

大家好,我是讯享网,很高兴认识大家。

1、概述

GRO是针对报文接收方向的,是指设备链路层在接收报文处理的时候,将多个小包合并成一个大包一起上送协议栈,减少数据包在协议栈间交互的机制。可以通过ethtool -K eth0 gro on/off来打开或关闭GRO功能,GRO虽然可以提升吞吐,但同时也会带来一定是时延增加。GRO是需要网卡有NAPI的能力,驱动通过NAPI收上来包后,判断如果有启用GRO功能,则将包按流的方式先存放在napi->gro_list链表里,等NAPI收完包或GRO链表里的skb超时,或者GRO合并过程中判断需要上送协议栈处理时,将对应的gro链表的skb上送协议栈。

struct napi_struct { /* The poll_list must only be managed by the entity which * changes the state of the NAPI_STATE_SCHED bit. This means * whoever atomically sets that bit can add this napi_struct * to the per-cpu poll_list, and whoever clears that bit * can remove from the list right before clearing the bit. */ struct list_head poll_list; unsigned long state; int weight; //gro链表流的个数,最多不超过8个 unsigned int gro_count; int (*poll)(struct napi_struct *, int); #ifdef CONFIG_NETPOLL spinlock_t poll_lock; int poll_owner; #endif struct net_device *dev; //gro链表 struct sk_buff *gro_list; struct sk_buff *skb; struct list_head dev_list; struct hlist_node napi_hash_node; unsigned int napi_id; RH_KABI_EXTEND(size_t size) RH_KABI_EXTEND(struct hrtimer timer) };

讯享网

2、流程分析

ixgbe_rx_skb

网卡驱动从rx ring里收到包后,调用ixgbe_rx_skb上送协议栈,ixgbe_rx_skb判断上层socket是否有在对队列polling,如果没有,则进入gro合并入口函数napi_gro_receive;

讯享网static void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector, struct sk_buff *skb) { skb_mark_napi_id(skb, &q_vector->napi); if (ixgbe_qv_busy_polling(q_vector)) netif_receive_skb(skb); else napi_gro_receive(&q_vector->napi, skb); } 

dev_gro_receive

gro入口函数进一步调用dev_gro_receive,在dev_gro_receive里,先重置下skb的mac层信息,然后调用ip层提供的GRO回调函数,上层回调函数判断napi->gro_list链表里是否有跟skb是同一条流的,如果存在,则将skb合并到对应的skb里,如果不存在,返回到dev_gro_receive函数后,将新的skb插入到napi->gro_list的末尾,作为这条流的首包。

static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb) { struct sk_buff pp = NULL; struct packet_offload *ptype; __be16 type = skb->protocol; struct list_head *head = &offload_base; int same_flow; enum gro_result ret; int grow; if (!(skb->dev->features & NETIF_F_GRO)) goto normal; if (skb_is_gso(skb) || skb_has_frag_list(skb) || skb->csum_bad) goto normal; gro_list_prepare(napi, skb); rcu_read_lock(); list_for_each_entry_rcu(ptype, head, list) { if (ptype->type != type || !ptype->callbacks.gro_receive) continue; skb_set_network_header(skb, skb_gro_offset(skb)); skb_reset_mac_len(skb); //先将same_flow清零 NAPI_GRO_CB(skb)->same_flow = 0; NAPI_GRO_CB(skb)->flush = 0; NAPI_GRO_CB(skb)->free = 0; NAPI_GRO_CB(skb)->encap_mark = 0; NAPI_GRO_CB(skb)->recursion_counter = 0; NAPI_GRO_CB(skb)->is_atomic = 1; NAPI_GRO_CB(skb)->gro_remcsum_start = 0; /* Setup for GRO checksum validation */ switch (skb->ip_summed) { case CHECKSUM_COMPLETE: NAPI_GRO_CB(skb)->csum = skb->csum; NAPI_GRO_CB(skb)->csum_valid = 1; NAPI_GRO_CB(skb)->csum_cnt = 0; break; case CHECKSUM_UNNECESSARY: NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1; NAPI_GRO_CB(skb)->csum_valid = 0; break; default: NAPI_GRO_CB(skb)->csum_cnt = 0; NAPI_GRO_CB(skb)->csum_valid = 0; } pp = ptype->callbacks.gro_receive(&napi->gro_list, skb); break; } rcu_read_unlock(); if (&ptype->list == head) goto normal; //在回调网络层、传输层的gro合并回调函数时,会判断已有的gro链表是否存在相同流的 //如果存在,same_flow为置1,因此这里判断same_flow的值,如果为0,说明是流首包 //如果非0,说明skb已经被合并到gro_list里了 same_flow = NAPI_GRO_CB(skb)->same_flow; ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED; //pp为非空,说明需要flush if (pp) { struct sk_buff *nskb = *pp; *pp = nskb->next; nskb->next = NULL; napi_gro_complete(nskb); napi->gro_count--; } //如果存在同一条流的, 说明在gro_receive流程里已经将skb合入到gro_list里了,因此这里不需要再处理了 if (same_flow) goto ok; //这个skb需要直接上送协议栈,不能添加到gro_list if (NAPI_GRO_CB(skb)->flush) goto normal; //gro链表上一共有8条流了,则再添加新的一条流前,把链表里最老的那条流的skb先发送出去 if (unlikely(napi->gro_count >= MAX_GRO_SKBS)) { struct sk_buff *nskb = napi->gro_list; /* locate the end of the list to select the 'oldest' flow */ while (nskb->next) { pp = &nskb->next; nskb = *pp; } *pp = NULL; nskb->next = NULL; napi_gro_complete(nskb); } else { napi->gro_count++; } //走到这里说明,待合入的skb是这条流的首包,因此将其挂到gro_list里, //并将NAPI_GRO_CB(skb)->last指向自己 //并等待后续同一条流的skb到来 NAPI_GRO_CB(skb)->count = 1; NAPI_GRO_CB(skb)->age = jiffies; NAPI_GRO_CB(skb)->last = skb; skb_shinfo(skb)->gso_size = skb_gro_len(skb); skb->next = napi->gro_list; napi->gro_list = skb; ret = GRO_HELD; pull: grow = skb_gro_offset(skb) - skb_headlen(skb); if (grow > 0) gro_pull_from_frag0(skb, grow); ok: return ret; normal: ret = GRO_NORMAL; goto pull; }

inet_gro_receive

GRO合并消息进入到ip层后,首先根据ip头的信息(源、宿ip)进一步找到skb_list里相同的流,然后判断待GRO合并的skb是否是分片数据包,分片数据包不能做GRO,最后重置下带GRO合并的skb的网络层信息后,进一步调用传输层的GRO回调函数;

讯享网static struct sk_buff inet_gro_receive(struct sk_buff head, struct sk_buff *skb) { const struct net_offload *ops; struct sk_buff pp = NULL; struct sk_buff *p; const struct iphdr *iph; unsigned int hlen; unsigned int off; unsigned int id; int flush = 1; int proto; off = skb_gro_offset(skb); hlen = off + sizeof(*iph); iph = skb_gro_header_fast(skb, off); if (skb_gro_header_hard(skb, hlen)) { iph = skb_gro_header_slow(skb, hlen, off); if (unlikely(!iph)) goto out; } proto = iph->protocol; rcu_read_lock(); ops = rcu_dereference(inet_offloads[proto]); if (!ops || !ops->callbacks.gro_receive) goto out_unlock; if (*(u8 *)iph != 0x45) goto out_unlock; if (unlikely(ip_fast_csum((u8 *)iph, 5))) goto out_unlock; id = ntohl(*(__be32 *)&iph->id); flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF)); id >>= 16; for (p = *head; p; p = p->next) { struct iphdr *iph2; u16 flush_id; //不是相同流的,跳过 if (!NAPI_GRO_CB(p)->same_flow) continue; //off为skb的data偏移,因为驱动就已经把mac头剥离了,所以这里的p->data是指向ip头 iph2 = (struct iphdr *)(p->data + off); /* The above works because, with the exception of the top * (inner most) layer, we only aggregate pkts with the same * hdr length so all the hdrs we'll need to verify will start * at the same offset. */ //再次判断ip头,确认是同一条流 if ((iph->protocol ^ iph2->protocol) | ((__force u32)iph->saddr ^ (__force u32)iph2->saddr) | ((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) { NAPI_GRO_CB(p)->same_flow = 0; continue; } /* All fields must match except length and checksum. */ //分片数据包不能gro NAPI_GRO_CB(p)->flush |= (iph->ttl ^ iph2->ttl) | (iph->tos ^ iph2->tos) | (__force int)((iph->frag_off ^ iph2->frag_off) & htons(IP_DF)); NAPI_GRO_CB(p)->flush |= flush; /* We need to store of the IP ID check to be included later * when we can verify that this packet does in fact belong * to a given flow. */ flush_id = (u16)(id - ntohs(iph2->id)); /* This bit of code makes it much easier for us to identify * the cases where we are doing atomic vs non-atomic IP ID * checks. Specifically an atomic check can return IP ID * values 0 - 0xFFFF, while a non-atomic check can only * return 0 or 0xFFFF. */ if (!NAPI_GRO_CB(p)->is_atomic || !(iph->frag_off & htons(IP_DF))) { flush_id ^= NAPI_GRO_CB(p)->count; flush_id = flush_id ? 0xFFFF : 0; } /* If the previous IP ID value was based on an atomic * datagram we can overwrite the value and ignore it. */ if (NAPI_GRO_CB(skb)->is_atomic) NAPI_GRO_CB(p)->flush_id = flush_id; else NAPI_GRO_CB(p)->flush_id |= flush_id; } NAPI_GRO_CB(skb)->is_atomic = !!(iph->frag_off & htons(IP_DF)); NAPI_GRO_CB(skb)->flush |= flush; //设置ip头信息 skb_set_network_header(skb, off); /* The above will be needed by the transport layer if there is one * immediately following this IP hdr. */ //data_offset偏移增加ip头偏移 skb_gro_pull(skb, sizeof(*iph)); //设置传输层信息 skb_set_transport_header(skb, skb_gro_offset(skb)); pp = call_gro_receive(ops->callbacks.gro_receive, head, skb); out_unlock: rcu_read_unlock(); out: NAPI_GRO_CB(skb)->flush |= flush; return pp; }

tcp4_gro_receive

进入到传输层的GRO处理函数后,首先对待合并的skb做checksum校验;


讯享网

static struct sk_buff tcp4_gro_receive(struct sk_buff head, struct sk_buff *skb) { /* Don't bother verifying checksum if we're going to flush anyway. */ //先对skb做checksum校验,检验通过后csum_valid if (!NAPI_GRO_CB(skb)->flush && skb_gro_checksum_validate(skb, IPPROTO_TCP, inet_gro_compute_pseudo)) { NAPI_GRO_CB(skb)->flush = 1; return NULL; } return tcp_gro_receive(head, skb); }

校验通过后进一步调用tcp_gro_receive,在tcp_gro_receive里进一步根据tcp头部信息找到skb_list里相同的流,然后调用skb_gro_receive,skb_gro_receive为真正做GRO合并的处理函数,在skb_gro_receive将新的skb的线性区或非线性区合入到gro_skb的非线性区,合并完成后,同步更新gro_skb的data_len和len长度。如果合并过程发现gro_skb的非线性区域个数已经超过最大值(8个),则将skb最为一个新的数据包挂到gro_skb的next链表里。

讯享网int skb_gro_receive(struct sk_buff head, struct sk_buff *skb) { //走到这里说明head的skb与待合并的skb是同一条流 struct skb_shared_info *pinfo, *skbinfo = skb_shinfo(skb); //skb->data基于skb->head的偏移(此时skb->data指向tcp头) unsigned int offset = skb_gro_offset(skb); //线性区长度 unsigned int headlen = skb_headlen(skb); //skb的data数据长度(包括线性区和非线性区) unsigned int len = skb_gro_len(skb); struct sk_buff *lp, *p = *head; unsigned int delta_truesize; if (unlikely(p->len + len >= 65536)) return -E2BIG; lp = NAPI_GRO_CB(p)->last; pinfo = skb_shinfo(lp); //skb的线性区长度不超过offset,说明skb的线性区没有data数据,因此从skb的非线性区拷贝数据 //拷贝的数据放到gro_skb->last的非线性区 if (headlen <= offset) { skb_frag_t *frag; skb_frag_t *frag2; int i = skbinfo->nr_frags; int nr_frags = pinfo->nr_frags + i; //如果这个gro_skb->last的frags已经超标,则将新加入的skb挂到gro_skb->last里 if (nr_frags > MAX_SKB_FRAGS) goto merge; offset -= headlen; pinfo->nr_frags = nr_frags; skbinfo->nr_frags = 0; frag = pinfo->frags + nr_frags; frag2 = skbinfo->frags + i; do { *--frag = *--frag2; } while (--i); frag->page_offset += offset; skb_frag_size_sub(frag, offset); /* all fragments truesize : remove (head size + sk_buff) */ delta_truesize = skb->truesize - SKB_TRUESIZE(skb_end_offset(skb)); skb->truesize -= skb->data_len; skb->len -= skb->data_len; skb->data_len = 0; NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE; goto done; } else if (skb->head_frag) { //将skb的线性区拷贝到拷贝到gro_skb->last的非线性区 int nr_frags = pinfo->nr_frags; skb_frag_t *frag = pinfo->frags + nr_frags; struct page *page = virt_to_head_page(skb->head); unsigned int first_size = headlen - offset; unsigned int first_offset; if (nr_frags + 1 + skbinfo->nr_frags > MAX_SKB_FRAGS) goto merge; first_offset = skb->data - (unsigned char *)page_address(page) + offset; pinfo->nr_frags = nr_frags + 1 + skbinfo->nr_frags; frag->page.p = page; frag->page_offset = first_offset; skb_frag_size_set(frag, first_size); memcpy(frag + 1, skbinfo->frags, sizeof(*frag) * skbinfo->nr_frags); /* We dont need to clear skbinfo->nr_frags here */ delta_truesize = skb->truesize - SKB_DATA_ALIGN(sizeof(struct sk_buff)); NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE_STOLEN_HEAD; goto done; } merge: //gro->last的空间已满(frags个数已经达到最多的16个),将待合并的skb挂到gro_skb->last里 delta_truesize = skb->truesize; if (offset > headlen) { unsigned int eat = offset - headlen; skbinfo->frags[0].page_offset += eat; skb_frag_size_sub(&skbinfo->frags[0], eat); skb->data_len -= eat; skb->len -= eat; offset = headlen; } __skb_pull(skb, offset); if (NAPI_GRO_CB(p)->last == p) skb_shinfo(p)->frag_list = skb; else NAPI_GRO_CB(p)->last->next = skb; NAPI_GRO_CB(p)->last = skb; __skb_header_release(skb); lp = p; done: //合并完一个skb后,count计数加1 NAPI_GRO_CB(p)->count++; //data_len长度加len,len为新合并的skb的长度,因为新合并的skb都是放在p的非线性区,所以data_len要增加 p->data_len += len; p->truesize += delta_truesize; //整个skb长度增加len p->len += len; if (lp != p) { lp->data_len += len; lp->truesize += delta_truesize; lp->len += len; } NAPI_GRO_CB(skb)->same_flow = 1; return 0; } EXPORT_SYMBOL_GPL(skb_gro_receive);

napi_gro_complete

当GRO合并过程中判断需要刷新gro_list或者gro_list的流个数超过8个,再或者napi_poll过程判断需要刷新gro_list时,会调用napi_gro_complete处理函数,然后进一步调用ip层的complete处理函数inet_gro_complete;

inet_gro_complete

在ip层回调函数里,根据最新的skb->len,跟新ip头的checksum,然后进一步调用传输层的complete函数tcp4_gro_complete;在tcp4_gro_complete更新一下tcp的伪头部checksum,然后最终调用netif_receive_skb_internal将gro skb上送协议栈。

static int inet_gro_complete(struct sk_buff *skb, int nhoff) { __be16 newlen = htons(skb->len - nhoff); struct iphdr *iph = (struct iphdr *)(skb->data + nhoff); const struct net_offload *ops; int proto = iph->protocol; int err = -ENOSYS; if (skb->encapsulation) { skb_set_inner_protocol(skb, cpu_to_be16(ETH_P_IP)); skb_set_inner_network_header(skb, nhoff); } //更新ip头的checksum,newlen为skb做gro合并后的新长度 csum_replace2(&iph->check, iph->tot_len, newlen); iph->tot_len = newlen; rcu_read_lock(); ops = rcu_dereference(inet_offloads[proto]); if (WARN_ON(!ops || !ops->callbacks.gro_complete)) goto out_unlock; /* Only need to add sizeof(*iph) to get to the next hdr below * because any hdr with option will have been flushed in * inet_gro_receive(). */ err = ops->callbacks.gro_complete(skb, nhoff + sizeof(*iph)); out_unlock: rcu_read_unlock(); return err; }

netif_receive_skb_internal

在netif_receive_skb_internal里,判断是否有开启rps,如果有,则通过enqueue_to_backlog对应cpu的softnet_data的input_pkt_queue队列,如果不需要rps,则通过__netif_receive_skb进一步上送协议栈,最后通过ip层注册的回调函数ip_rcv进入ip层。

讯享网static int netif_receive_skb_internal(struct sk_buff *skb) { int ret; net_timestamp_check(netdev_tstamp_prequeue, skb); if (skb_defer_rx_timestamp(skb)) return NET_RX_SUCCESS; rcu_read_lock(); //检查是否需要rps,如果要,则将报文放到cpu的softnet队列里,并且触发软中断 //软中断处理函数最终调用process_backlog从softnet队列里取出报文,上送协议栈 #ifdef CONFIG_RPS if (static_key_false(&rps_needed)) { struct rps_dev_flow voidflow, *rflow = &voidflow; int cpu = get_rps_cpu(skb->dev, skb, &rflow); if (cpu >= 0) { ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); rcu_read_unlock(); return ret; } } #endif //不需要rps,直接上送协议栈 ret = __netif_receive_skb(skb); rcu_read_unlock(); return ret; }

 

小讯
上一篇 2025-04-06 11:34
下一篇 2025-04-10 13:43

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/28884.html