收藏 分销(赏)

Linux内核中流量控制.doc

上传人:xrp****65 文档编号:8993730 上传时间:2025-03-10 格式:DOC 页数:365 大小:1,003KB
下载 相关 举报
Linux内核中流量控制.doc_第1页
第1页 / 共365页
Linux内核中流量控制.doc_第2页
第2页 / 共365页
点击查看更多>>
资源描述
本文档的Copyleft归yfydz所有,使用GPL发布,可以自由拷贝,转载,转载时请保持文档的完整性, 严禁用于任何商业用途。 msn: yfydz_no1@ 来源: 1. 前言 linux内核中提供了流量控制的相关处理功能,相关代码在net/sched目录下;而应用层上的控制是通过iproute2软件包中的tc来实现,tc和sched的关系就好象iptables和netfilter的关系一样,一个是用户层接口,一个是具体实现,关于tc的使用方法可详将Linux Advanced Routing HOWTO,本文主要分析内核中的具体实现。 流控包括几个部分: 流控算法, 通常在net/sched/sch_*.c中实现, 缺省的是FIFO, 是比较典型的黑盒模式, 对外只看到入队和出对两个操作; 流控结构的操作处理; 和用户空间的控制接口, 是通过 rtnetlink实现的。 以下内核代码版本为2.6.19.2。 2. 控制入口 2.1 控制入口 linux流控功能反映为网卡设备的属性,表明是网络最底层的处理部分, 已经和上层的网络协议栈无关了: /* include/linux/netdevice.h */ struct net_device { ...... /*  * Cache line mostly used on queue transmit path (qdisc)  */  /* device queue lock */  spinlock_t  queue_lock ____cacheline_aligned_in_smp; // 这是发送数据时的队列处理  struct Qdisc  *qdisc; // 网卡停止时保存网卡活动时的队列处理方法  struct Qdisc  *qdisc_sleeping; // 网卡处理的数据队列链表  struct list_head qdisc_list; // 最大队列长度  unsigned long  tx_queue_len; /* Max frames per queue allowed */  /* Partially transmitted GSO packet. */  struct sk_buff  *gso_skb;  /* ingress path synchronizer */ // 输入流控锁  spinlock_t  ingress_lock; // 这是对于接收数据时的队列处理  struct Qdisc  *qdisc_ingress; ...... 2.1.2 输出流控 数据发出流控处理时,上层的所有处理已经完成,数据包已经交到网卡设备进行发送,在数据发送时进行相关的流控处理网络数据的出口函数为dev_queue_xmit(); 如果是接收流控, 数据只是刚从网卡设备中收到, 还未交到网络上层处理, 不过网卡的输入流控不是必须的, 缺省情况下并不进行流控,输入流控入口函数为ing_filter()函数,该函数被skb_receive_skb()调用: /* net/core/dev.c */ int dev_queue_xmit(struct sk_buff *skb) {  struct net_device *dev = skb->dev;  struct Qdisc *q;  int rc = -ENOMEM; ......  /* Updates of qdisc are serialized by queue_lock.   * The struct Qdisc which is pointed to by qdisc is now a   * rcu structure - it may be accessed without acquiring   * a lock (but the structure may be stale.) The freeing of the   * qdisc will be deferred until it's known that there are no   * more references to it.   *   * If the qdisc has an enqueue function, we still need to   * hold the queue_lock before calling it, since queue_lock   * also serializes access to the device queue.   */ // 获取网卡的qdisc指针, 此出不需要锁, 是各个CPU的私有数据  q = rcu_dereference(dev->qdisc); #ifdef CONFIG_NET_CLS_ACT  skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS); #endif // 如果队列输入非空, 将数据包入队 // 对于物理网卡设备, 缺省使用的是FIFO qdisc, 该成员函数非空, 只有逻辑网卡 // 才可能为空  if (q->enqueue) {   /* Grab device queue */ // 加锁   spin_lock(&dev->queue_lock); // 可以直接访问dev->qdisc了   q = dev->qdisc;   if (q->enqueue) { // 入队处理    rc = q->enqueue(skb, q); // 运行流控, 出队列操作    qdisc_run(dev);    spin_unlock(&dev->queue_lock);    rc = rc == NET_XMIT_BYPASS ? NET_XMIT_SUCCESS : rc;    goto out;   }   spin_unlock(&dev->queue_lock);  } ...... }   // 出队操作 static inline void qdisc_run(struct net_device *dev) {  if (!netif_queue_stopped(dev) &&      !test_and_set_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))   __qdisc_run(dev); } /* net/sched/sch_generic.c */ void __qdisc_run(struct net_device *dev) { // 如果是noop_qdisc流控, 实际是丢包  if (unlikely(dev->qdisc == &noop_qdisc))   goto out;  while (qdisc_restart(dev) < 0 && !netif_queue_stopped(dev))   /* NOTHING */; out:  clear_bit(__LINK_STATE_QDISC_RUNNING, &dev->state); } /* Kick device.    Note, that this procedure can be called by a watchdog timer, so that    we do not check dev->tbusy flag here.    Returns:  0  - queue is empty.             >0  - queue is not empty, but throttled.      <0  - queue is not empty. Device is throttled, if dev->tbusy != 0.    NOTE: Called under dev->queue_lock with locally disabled BH. */ static inline int qdisc_restart(struct net_device *dev) {  struct Qdisc *q = dev->qdisc;  struct sk_buff *skb;  /* Dequeue packet */ // 数据包出队  if (((skb = dev->gso_skb)) || ((skb = q->dequeue(q)))) {   unsigned nolock = (dev->features & NETIF_F_LLTX);   dev->gso_skb = NULL; ...... } 2.1.3 输入流控 输入流控好象不是必须的,目前内核需要配置CONFIG_NET_CLS_ACT选项才起作用: /* net/core/dev.c */ int netif_receive_skb(struct sk_buff *skb) { ...... #ifdef CONFIG_NET_CLS_ACT  if (pt_prev) {   ret = deliver_skb(skb, pt_prev, orig_dev);   pt_prev = NULL; /* noone else should process this after*/  } else {   skb->tc_verd = SET_TC_OK2MUNGE(skb->tc_verd);  }  ret = ing_filter(skb);  if (ret == TC_ACT_SHOT || (ret == TC_ACT_STOLEN)) {   kfree_skb(skb);   goto out;  }  skb->tc_verd = 0; ncls: #endif ...... } static int ing_filter(struct sk_buff *skb) {  struct Qdisc *q;  struct net_device *dev = skb->dev;  int result = TC_ACT_OK; // 如果网卡设备有输入流控处理  if (dev->qdisc_ingress) {   __u32 ttl = (__u32) G_TC_RTTL(skb->tc_verd);   if (MAX_RED_LOOP < ttl++) {    printk(KERN_WARNING "Redir loop detected Dropping packet (%s->% s)\n",     skb->input_dev->name, skb->dev->name);    return TC_ACT_SHOT;   } // 设置数据包的TC参数   skb->tc_verd = SET_TC_RTTL(skb->tc_verd,ttl);   skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_INGRESS);   spin_lock(&dev->ingress_lock);   if ((q = dev->qdisc_ingress) != NULL) // 数据入队    result = q->enqueue(skb, q);   spin_unlock(&dev->ingress_lock);  }  return result; } 2.2 初始化 本节先跟踪一下网卡设备的qdisc指针是如何被赋值的, 缺省赋值为何值. 在网卡设备的初始化函数register_netdevice()函数中调用dev_init_scheduler()函数对网卡设备的流控队列处理进行了初始化, 也就是说每个网络网卡设备的qdisc指针都不会是空的: /* net/sched/sch_gentric.c */ void dev_init_scheduler(struct net_device *dev) {  qdisc_lock_tree(dev); // 处理发出数据的qdisc是必须的,而处理输入数据的qdisc_ingress则不是必须的 // 缺省情况下的qdisc  dev->qdisc = &noop_qdisc;  dev->qdisc_sleeping = &noop_qdisc;  INIT_LIST_HEAD(&dev->qdisc_list);  qdisc_unlock_tree(dev);  dev_watchdog_init(dev); } 当然noop_qdisc中的调度是不可用的, 只进行丢包处理;网卡在激活(dev_open)时会调用 dev_activate()函数重新对qdisc指针赋值,但未对qdisc_ingress赋值: /* net/sched/sch_generic.c */ void dev_activate(struct net_device *dev) {  /* No queueing discipline is attached to device;     create default one i.e. pfifo_fast for devices,     which need queueing and noqueue_qdisc for     virtual interfaces   */ // 如果当前的qdisc_sleeping是noop_qdisc,重新找一个流控操作指针  if (dev->qdisc_sleeping == &noop_qdisc) {   struct Qdisc *qdisc; // 前提条件是发送队列长度非0, 这正常情况肯定满足的   if (dev->tx_queue_len) { // 对dev设备建立fifo处理, 只是缺省的网卡发送策略: FIFO, 先入先出    qdisc = qdisc_create_dflt(dev, &pfifo_fast_ops);    if (qdisc == NULL) {     printk(KERN_INFO "%s: activation failed\n", dev->name);     return;    }    write_lock(&qdisc_tree_lock);    list_add_tail(&qdisc->list, &dev->qdisc_list);    write_unlock(&qdisc_tree_lock);   } else {    qdisc =  &noqueue_qdisc;   }   write_lock(&qdisc_tree_lock); // 对qdisc_sleeping赋值   dev->qdisc_sleeping = qdisc;   write_unlock(&qdisc_tree_lock);  } // 如果现在网线没插, 返回  if (!netif_carrier_ok(dev))   /* Delay activation until next carrier-on event */   return;  spin_lock_bh(&dev->queue_lock); // 将网卡当前的qdisc赋值为qdisc_sleeping所指的qdisc  rcu_assign_pointer(dev->qdisc, dev->qdisc_sleeping);  if (dev->qdisc != &noqueue_qdisc) { // 启动   dev->trans_start = jiffies;   dev_watchdog_up(dev);  }  spin_unlock_bh(&dev->queue_lock); } qdisc_sleeping用于保存在网卡起效时使用的qdisc, 因为在网卡down或网线拔除不可用时, 网卡设 备的qdisc指针会指向noqueue_qdisc, 该qdisc操作就只是丢包, 这就是为什么在没插网线情况下也 可以调用发包函数处理的原因, 结果就是直接丢包。 /* net/sched/sch_generic.c */ void dev_deactivate(struct net_device *dev) {  struct Qdisc *qdisc;  spin_lock_bh(&dev->queue_lock);  qdisc = dev->qdisc; // 将网卡当前qdisc设置为noop_qdisc  dev->qdisc = &noop_qdisc; // 释放原来的qdisc  qdisc_reset(qdisc);  spin_unlock_bh(&dev->queue_lock);  dev_watchdog_down(dev);  /* Wait for outstanding dev_queue_xmit calls. */  synchronize_rcu();  /* Wait for outstanding qdisc_run calls. */  while (test_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))   yield();  if (dev->gso_skb) {   kfree_skb(dev->gso_skb);   dev->gso_skb = NULL;  } }   3. 数据结构 流控处理对外表现是一个黑盒,外部只能看到数据入队和出队,但内部队列是如何操作和管理外面是 不知道的;另外处理队列处理外,流控还有一个调度器,该调度器将数据进行分类,然后对不同类型 的数据采取不同的流控处理,所分的类型可能是多级的,形成一个树型的分类树。 流控的基本数据结构是struct Qdisc(queueing discipline,直译是“排队纪律”,意译为“流控” ),这是内核中为数不多的以大写字母开头结构名称之一: /* include/net/sch_generic.h */ struct Qdisc { // 入队操作  int    (*enqueue)(struct sk_buff *skb, struct Qdisc *dev); // 出队操作  struct sk_buff * (*dequeue)(struct Qdisc *dev); // 标志  unsigned  flags; #define TCQ_F_BUILTIN 1 #define TCQ_F_THROTTLED 2 #define TCQ_F_INGRESS 4  int   padded; // Qdisc的基本操作结构  struct Qdisc_ops *ops; // 句柄  u32   handle;  u32   parent;  atomic_t  refcnt; // 数据包链表头  struct sk_buff_head q; // 网卡设备  struct net_device *dev;  struct list_head list; // 统计信息  struct gnet_stats_basic bstats;  struct gnet_stats_queue qstats; // 速率估计  struct gnet_stats_rate_est rate_est; // 流控锁  spinlock_t  *stats_lock;  struct rcu_head  q_rcu;  int   (*reshape_fail)(struct sk_buff *skb,      struct Qdisc *q);  /* This field is deprecated, but it is still used by CBQ   * and it will live until better solution will be invented.   */ // 父节点, 但基本已经被淘汰了  struct Qdisc  *__parent; }; 流控队列的基本操作结构: struct Qdisc_ops { // 链表中的下一个  struct Qdisc_ops *next; // 类别操作结构  struct Qdisc_class_ops *cl_ops; // Qdisc的名称, 从数组大小看应该就是网卡名称  char   id[IFNAMSIZ]; // 私有数据大小  int   priv_size; // 入队  int    (*enqueue)(struct sk_buff *, struct Qdisc *); // 出队  struct sk_buff * (*dequeue)(struct Qdisc *); // 将数据包重新排队  int    (*requeue)(struct sk_buff *, struct Qdisc *); // 丢弃  unsigned int  (*drop)(struct Qdisc *); // 初始化  int   (*init)(struct Qdisc *, struct rtattr *arg); // 复位为初始状态,释放缓冲,删除定时器,清空计数器  void   (*reset)(struct Qdisc *); // 释放  void   (*destroy)(struct Qdisc *); // 更改Qdisc参数  int   (*change)(struct Qdisc *, struct rtattr *arg); // 输出  int   (*dump)(struct Qdisc *, struct sk_buff *);  int   (*dump_stats)(struct Qdisc *, struct gnet_dump *);  struct module  *owner; }; 流控队列类别操作结构: struct Qdisc_class_ops {  /* Child qdisc manipulation */ // 减子节点  int   (*graft)(struct Qdisc *, unsigned long cl,      struct Qdisc *, struct Qdisc **); // 增加子节点  struct Qdisc *  (*leaf)(struct Qdisc *, unsigned long cl);  /* Class manipulation routines */ // 获取, 增加使用计数  unsigned long  (*get)(struct Qdisc *, u32 classid); // 释放, 减少使用计数  void   (*put)(struct Qdisc *, unsigned long); // 改变  int   (*change)(struct Qdisc *, u32, u32,      struct rtattr **, unsigned long *); // 删除  int   (*delete)(struct Qdisc *, unsigned long); // 遍历  void   (*walk)(struct Qdisc *, struct qdisc_walker * arg);  /* Filter manipulation */  struct tcf_proto ** (*tcf_chain)(struct Qdisc *, unsigned long); // tc捆绑  unsigned long  (*bind_tcf)(struct Qdisc *, unsigned long,      u32 classid); // tc解除  void   (*unbind_tcf)(struct Qdisc *, unsigned long);  /* rtnetlink specific */ // 输出  int   (*dump)(struct Qdisc *, unsigned long,      struct sk_buff *skb, struct tcmsg*);  int   (*dump_stats)(struct Qdisc *, unsigned long,      struct gnet_dump *); }; // 流控速率控制表结构 struct qdisc_rate_table {  struct tc_ratespec rate;  u32  data[256];  struct qdisc_rate_table *next;  int  refcnt; }; 4. 基本操作 各种流控算法是通过流控操作结构实现,然后这些操作结构登记到内核的流控链表,在使用时可为网 卡构造新的流控结构,将该结构和某种流控操作结构挂钩,这样就实现网卡采用某种策略发送数据进 行流控,所有操作在用户空间都可通过tc程序设置。 4.1 Qdisc的一些基本操作 4.1.1 分配新的流控结构 /* net/sched/sch_generic.c */ // 分配新的Qdisc结构, Qdisc的操作结构由函数参数指定 struct Qdisc *qdisc_alloc(struct net_device *dev, struct Qdisc_ops *ops) {  void *p;  struct Qdisc *sch;  unsigned int size;  int err = -ENOBUFS;  /* ensure that the Qdisc and the private data are 32-byte aligned */ // Qdisc空间按32字节对齐  size = QDISC_ALIGN(sizeof(*sch)); // 增加私有数据空间  size += ops->priv_size + (QDISC_ALIGNTO - 1);  p = kzalloc(size, GFP_KERNEL);  if (!p)   goto errout; // 确保从缓冲区中的sch到缓冲区结束点空间是32字节对齐的  sch = (struct Qdisc *) QDISC_ALIGN((unsigned long) p); // 填充字节的数量  sch->padded = (char *) sch - (char *) p; //  +------------------------------------+ //  |________|___________________________| //  ^        ^                           ^ //  |  pad   | <------ 32*N ------------>| //  p        sch                          // 初始化链表, 将用于挂接到dev的Qdisc链表  INIT_LIST_HEAD(&sch->list); // 初始化数据包链表  skb_queue_head_init(&sch->q); // Qdisc结构参数  sch->ops = ops;  sch->enqueue = ops->enqueue;  sch->dequeue = ops->dequeue;  sch->dev = dev; // 网卡使用计数增加  dev_hold(dev);  sch->stats_lock = &dev->queue_lock;  atomic_set(&sch->refcnt, 1);  return sch; errout:  return ERR_PTR(-err); } struct Qdisc * qdisc_create_dflt(struct net_device *dev, struct Qdisc_ops *ops) {  struct Qdisc *sch; // 分配Qdisc结构   sch = qdisc_alloc(dev, ops);  if (IS_ERR(sch))   goto errout; // 如果没有初始化函数或者初始化成功, 返回Qdisc结构  if (!ops->init || ops->init(sch, NULL) == 0)   return sch; // 初始化失败, 释放Qdisc  qdisc_destroy(sch); errout:  return NULL; } /* Under dev->queue_lock and BH! */ // 调用Qdisc操作函数中的reset函数 void qdisc_reset(struct Qdisc *qdisc) {  struct Qdisc_ops *ops = qdisc->ops;  if (ops->reset)   ops->reset(qdisc); } /* this is the rcu callback function to clean up a qdisc when there  * are no further references to it */ // 真正释放Qdisc缓冲区 static void __qdisc_destroy(struct rcu_head *head) {  struct Qdisc *qdisc = container_of(head, struct Qdisc, q_rcu); // qdisc-padded就是缓冲区头的位置  kfree((char *) qdisc - qdisc->padded); } /* Under dev->queue_lock and BH! */ // 释放Qdisc void qdisc_destroy(struct Qdisc *qdisc) {  struct Qdisc_ops  *ops = qdisc->ops; // 检查Qdisc的使用计数  if (qdisc->flags & TCQ_F_BUILTIN ||      !atomic_dec_and_test(&qdisc->refcnt))   return; // 将Qdisc从网卡设备的Qdisc链表中断开  list_del(&qdisc->list); #ifdef CONFIG_NET_ESTIMATOR  gen_kill_estimator(&qdisc->bstats, &qdisc->rate_est); #endif // 复位操作  if (ops->reset)   ops->reset(qdisc); // 内部释放操作  if (ops->destroy)   ops->destroy(qdisc); // 减少操作结构的模块计数  module_put(ops->owner); // 减少网卡使用计数  dev_put(qdisc->dev); // 对每个CPU的数据进行具体空间释放  call_rcu(&qdisc->q_rcu, __qdisc_destroy); } /* include/net/sch_generic.h */ // 将skb包添加到数据队列最后 static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch,            struct sk_buff_head *list) { // 将数据包连接到数据包链表尾  __skb_queue_tail(list, skb); // 更新统计信息 // 当前队列中数据包的数据长度增加  sch->qstats.backlog += skb->len; // Qdisc处理的数据包数字节数增加  sch->bstats.bytes += skb->len;  sch->bstats.packets++;  return NET_XMIT_SUCCESS; } static inline int qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch) {  return __qdisc_enqueue_tail(skb, sch, &sch->q); } // 将队列头的数据包出队列 static inline struct sk_buff *__qdisc_dequeue_head(struct Qdisc *sch,          struct sk_buff_head *list) { // 从skb链表中头skb包出队列  struct sk_buff *skb = __skb_dequeue(list); // 减少当前数据队列数据长度计数  if (likely(skb != NULL))   sch->qstats.backlog -= skb->len;  return skb; } static inline struct sk_buff *qdisc_dequeue_head(struct Qdisc *sch) {  return __qdisc_dequeue_head(sch, &sch->q); } // 将队列尾的数据包出队列 static inline struct sk_buff *__qdisc_dequeue_tail(struct Qdisc *sch,          struct sk_buff_head *list) { // 从链表为提出数据包  struct sk_buff *skb = __skb_dequeue_tail(list);  if (likely(skb != NULL))   sch->qstats.backlog -= skb->len;  return skb; } static inline struct sk_buff *qdisc_deq
展开阅读全文

开通  VIP会员、SVIP会员  优惠大
下载10份以上建议开通VIP会员
下载20份以上建议开通SVIP会员


开通VIP      成为共赢上传
相似文档                                   自信AI助手自信AI助手

当前位置:首页 > 包罗万象 > 大杂烩

移动网页_全站_页脚广告1

关于我们      便捷服务       自信AI       AI导航        抽奖活动

©2010-2025 宁波自信网络信息技术有限公司  版权所有

客服电话:4009-655-100  投诉/维权电话:18658249818

gongan.png浙公网安备33021202000488号   

icp.png浙ICP备2021020529号-1  |  浙B2-20240490  

关注我们 :微信公众号    抖音    微博    LOFTER 

客服