资源描述
本文档的Copyleft归yfydz所有,使用GPL发布,可以自由拷贝,转载,转载时请保持文档的完整性,
严禁用于任何商业用途。
msn: yfydz_no1@
来源:
1. 前言
linux内核中提供了流量控制的相关处理功能,相关代码在net/sched目录下;而应用层上的控制是通过iproute2软件包中的tc来实现,tc和sched的关系就好象iptables和netfilter的关系一样,一个是用户层接口,一个是具体实现,关于tc的使用方法可详将Linux Advanced Routing HOWTO,本文主要分析内核中的具体实现。
流控包括几个部分: 流控算法, 通常在net/sched/sch_*.c中实现, 缺省的是FIFO, 是比较典型的黑盒模式, 对外只看到入队和出对两个操作; 流控结构的操作处理; 和用户空间的控制接口, 是通过
rtnetlink实现的。
以下内核代码版本为2.6.19.2。
2. 控制入口
2.1 控制入口
linux流控功能反映为网卡设备的属性,表明是网络最底层的处理部分, 已经和上层的网络协议栈无关了:
/* include/linux/netdevice.h */
struct net_device
{
......
/*
* Cache line mostly used on queue transmit path (qdisc)
*/
/* device queue lock */
spinlock_t queue_lock ____cacheline_aligned_in_smp;
// 这是发送数据时的队列处理
struct Qdisc *qdisc;
// 网卡停止时保存网卡活动时的队列处理方法
struct Qdisc *qdisc_sleeping;
// 网卡处理的数据队列链表
struct list_head qdisc_list;
// 最大队列长度
unsigned long tx_queue_len; /* Max frames per queue allowed */
/* Partially transmitted GSO packet. */
struct sk_buff *gso_skb;
/* ingress path synchronizer */
// 输入流控锁
spinlock_t ingress_lock;
// 这是对于接收数据时的队列处理
struct Qdisc *qdisc_ingress;
......
2.1.2 输出流控
数据发出流控处理时,上层的所有处理已经完成,数据包已经交到网卡设备进行发送,在数据发送时进行相关的流控处理网络数据的出口函数为dev_queue_xmit(); 如果是接收流控, 数据只是刚从网卡设备中收到, 还未交到网络上层处理, 不过网卡的输入流控不是必须的, 缺省情况下并不进行流控,输入流控入口函数为ing_filter()函数,该函数被skb_receive_skb()调用:
/* net/core/dev.c */
int dev_queue_xmit(struct sk_buff *skb)
{
struct net_device *dev = skb->dev;
struct Qdisc *q;
int rc = -ENOMEM;
......
/* Updates of qdisc are serialized by queue_lock.
* The struct Qdisc which is pointed to by qdisc is now a
* rcu structure - it may be accessed without acquiring
* a lock (but the structure may be stale.) The freeing of the
* qdisc will be deferred until it's known that there are no
* more references to it.
*
* If the qdisc has an enqueue function, we still need to
* hold the queue_lock before calling it, since queue_lock
* also serializes access to the device queue.
*/
// 获取网卡的qdisc指针, 此出不需要锁, 是各个CPU的私有数据
q = rcu_dereference(dev->qdisc);
#ifdef CONFIG_NET_CLS_ACT
skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
#endif
// 如果队列输入非空, 将数据包入队
// 对于物理网卡设备, 缺省使用的是FIFO qdisc, 该成员函数非空, 只有逻辑网卡
// 才可能为空
if (q->enqueue) {
/* Grab device queue */
// 加锁
spin_lock(&dev->queue_lock);
// 可以直接访问dev->qdisc了
q = dev->qdisc;
if (q->enqueue) {
// 入队处理
rc = q->enqueue(skb, q);
// 运行流控, 出队列操作
qdisc_run(dev);
spin_unlock(&dev->queue_lock);
rc = rc == NET_XMIT_BYPASS ? NET_XMIT_SUCCESS : rc;
goto out;
}
spin_unlock(&dev->queue_lock);
}
......
}
// 出队操作
static inline void qdisc_run(struct net_device *dev)
{
if (!netif_queue_stopped(dev) &&
!test_and_set_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))
__qdisc_run(dev);
}
/* net/sched/sch_generic.c */
void __qdisc_run(struct net_device *dev)
{
// 如果是noop_qdisc流控, 实际是丢包
if (unlikely(dev->qdisc == &noop_qdisc))
goto out;
while (qdisc_restart(dev) < 0 && !netif_queue_stopped(dev))
/* NOTHING */;
out:
clear_bit(__LINK_STATE_QDISC_RUNNING, &dev->state);
}
/* Kick device.
Note, that this procedure can be called by a watchdog timer, so that
we do not check dev->tbusy flag here.
Returns: 0 - queue is empty.
>0 - queue is not empty, but throttled.
<0 - queue is not empty. Device is throttled, if dev->tbusy != 0.
NOTE: Called under dev->queue_lock with locally disabled BH.
*/
static inline int qdisc_restart(struct net_device *dev)
{
struct Qdisc *q = dev->qdisc;
struct sk_buff *skb;
/* Dequeue packet */
// 数据包出队
if (((skb = dev->gso_skb)) || ((skb = q->dequeue(q)))) {
unsigned nolock = (dev->features & NETIF_F_LLTX);
dev->gso_skb = NULL;
......
}
2.1.3 输入流控
输入流控好象不是必须的,目前内核需要配置CONFIG_NET_CLS_ACT选项才起作用:
/* net/core/dev.c */
int netif_receive_skb(struct sk_buff *skb)
{
......
#ifdef CONFIG_NET_CLS_ACT
if (pt_prev) {
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL; /* noone else should process this after*/
} else {
skb->tc_verd = SET_TC_OK2MUNGE(skb->tc_verd);
}
ret = ing_filter(skb);
if (ret == TC_ACT_SHOT || (ret == TC_ACT_STOLEN)) {
kfree_skb(skb);
goto out;
}
skb->tc_verd = 0;
ncls:
#endif
......
}
static int ing_filter(struct sk_buff *skb)
{
struct Qdisc *q;
struct net_device *dev = skb->dev;
int result = TC_ACT_OK;
// 如果网卡设备有输入流控处理
if (dev->qdisc_ingress) {
__u32 ttl = (__u32) G_TC_RTTL(skb->tc_verd);
if (MAX_RED_LOOP < ttl++) {
printk(KERN_WARNING "Redir loop detected Dropping packet (%s->%
s)\n",
skb->input_dev->name, skb->dev->name);
return TC_ACT_SHOT;
}
// 设置数据包的TC参数
skb->tc_verd = SET_TC_RTTL(skb->tc_verd,ttl);
skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_INGRESS);
spin_lock(&dev->ingress_lock);
if ((q = dev->qdisc_ingress) != NULL)
// 数据入队
result = q->enqueue(skb, q);
spin_unlock(&dev->ingress_lock);
}
return result;
}
2.2 初始化
本节先跟踪一下网卡设备的qdisc指针是如何被赋值的, 缺省赋值为何值.
在网卡设备的初始化函数register_netdevice()函数中调用dev_init_scheduler()函数对网卡设备的流控队列处理进行了初始化, 也就是说每个网络网卡设备的qdisc指针都不会是空的:
/* net/sched/sch_gentric.c */
void dev_init_scheduler(struct net_device *dev)
{
qdisc_lock_tree(dev);
// 处理发出数据的qdisc是必须的,而处理输入数据的qdisc_ingress则不是必须的
// 缺省情况下的qdisc
dev->qdisc = &noop_qdisc;
dev->qdisc_sleeping = &noop_qdisc;
INIT_LIST_HEAD(&dev->qdisc_list);
qdisc_unlock_tree(dev);
dev_watchdog_init(dev);
}
当然noop_qdisc中的调度是不可用的, 只进行丢包处理;网卡在激活(dev_open)时会调用
dev_activate()函数重新对qdisc指针赋值,但未对qdisc_ingress赋值:
/* net/sched/sch_generic.c */
void dev_activate(struct net_device *dev)
{
/* No queueing discipline is attached to device;
create default one i.e. pfifo_fast for devices,
which need queueing and noqueue_qdisc for
virtual interfaces
*/
// 如果当前的qdisc_sleeping是noop_qdisc,重新找一个流控操作指针
if (dev->qdisc_sleeping == &noop_qdisc) {
struct Qdisc *qdisc;
// 前提条件是发送队列长度非0, 这正常情况肯定满足的
if (dev->tx_queue_len) {
// 对dev设备建立fifo处理, 只是缺省的网卡发送策略: FIFO, 先入先出
qdisc = qdisc_create_dflt(dev, &pfifo_fast_ops);
if (qdisc == NULL) {
printk(KERN_INFO "%s: activation failed\n", dev->name);
return;
}
write_lock(&qdisc_tree_lock);
list_add_tail(&qdisc->list, &dev->qdisc_list);
write_unlock(&qdisc_tree_lock);
} else {
qdisc = &noqueue_qdisc;
}
write_lock(&qdisc_tree_lock);
// 对qdisc_sleeping赋值
dev->qdisc_sleeping = qdisc;
write_unlock(&qdisc_tree_lock);
}
// 如果现在网线没插, 返回
if (!netif_carrier_ok(dev))
/* Delay activation until next carrier-on event */
return;
spin_lock_bh(&dev->queue_lock);
// 将网卡当前的qdisc赋值为qdisc_sleeping所指的qdisc
rcu_assign_pointer(dev->qdisc, dev->qdisc_sleeping);
if (dev->qdisc != &noqueue_qdisc) {
// 启动
dev->trans_start = jiffies;
dev_watchdog_up(dev);
}
spin_unlock_bh(&dev->queue_lock);
}
qdisc_sleeping用于保存在网卡起效时使用的qdisc, 因为在网卡down或网线拔除不可用时, 网卡设
备的qdisc指针会指向noqueue_qdisc, 该qdisc操作就只是丢包, 这就是为什么在没插网线情况下也
可以调用发包函数处理的原因, 结果就是直接丢包。
/* net/sched/sch_generic.c */
void dev_deactivate(struct net_device *dev)
{
struct Qdisc *qdisc;
spin_lock_bh(&dev->queue_lock);
qdisc = dev->qdisc;
// 将网卡当前qdisc设置为noop_qdisc
dev->qdisc = &noop_qdisc;
// 释放原来的qdisc
qdisc_reset(qdisc);
spin_unlock_bh(&dev->queue_lock);
dev_watchdog_down(dev);
/* Wait for outstanding dev_queue_xmit calls. */
synchronize_rcu();
/* Wait for outstanding qdisc_run calls. */
while (test_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))
yield();
if (dev->gso_skb) {
kfree_skb(dev->gso_skb);
dev->gso_skb = NULL;
}
}
3. 数据结构
流控处理对外表现是一个黑盒,外部只能看到数据入队和出队,但内部队列是如何操作和管理外面是
不知道的;另外处理队列处理外,流控还有一个调度器,该调度器将数据进行分类,然后对不同类型
的数据采取不同的流控处理,所分的类型可能是多级的,形成一个树型的分类树。
流控的基本数据结构是struct Qdisc(queueing discipline,直译是“排队纪律”,意译为“流控”
),这是内核中为数不多的以大写字母开头结构名称之一:
/* include/net/sch_generic.h */
struct Qdisc
{
// 入队操作
int (*enqueue)(struct sk_buff *skb, struct Qdisc *dev);
// 出队操作
struct sk_buff * (*dequeue)(struct Qdisc *dev);
// 标志
unsigned flags;
#define TCQ_F_BUILTIN 1
#define TCQ_F_THROTTLED 2
#define TCQ_F_INGRESS 4
int padded;
// Qdisc的基本操作结构
struct Qdisc_ops *ops;
// 句柄
u32 handle;
u32 parent;
atomic_t refcnt;
// 数据包链表头
struct sk_buff_head q;
// 网卡设备
struct net_device *dev;
struct list_head list;
// 统计信息
struct gnet_stats_basic bstats;
struct gnet_stats_queue qstats;
// 速率估计
struct gnet_stats_rate_est rate_est;
// 流控锁
spinlock_t *stats_lock;
struct rcu_head q_rcu;
int (*reshape_fail)(struct sk_buff *skb,
struct Qdisc *q);
/* This field is deprecated, but it is still used by CBQ
* and it will live until better solution will be invented.
*/
// 父节点, 但基本已经被淘汰了
struct Qdisc *__parent;
};
流控队列的基本操作结构:
struct Qdisc_ops
{
// 链表中的下一个
struct Qdisc_ops *next;
// 类别操作结构
struct Qdisc_class_ops *cl_ops;
// Qdisc的名称, 从数组大小看应该就是网卡名称
char id[IFNAMSIZ];
// 私有数据大小
int priv_size;
// 入队
int (*enqueue)(struct sk_buff *, struct Qdisc *);
// 出队
struct sk_buff * (*dequeue)(struct Qdisc *);
// 将数据包重新排队
int (*requeue)(struct sk_buff *, struct Qdisc *);
// 丢弃
unsigned int (*drop)(struct Qdisc *);
// 初始化
int (*init)(struct Qdisc *, struct rtattr *arg);
// 复位为初始状态,释放缓冲,删除定时器,清空计数器
void (*reset)(struct Qdisc *);
// 释放
void (*destroy)(struct Qdisc *);
// 更改Qdisc参数
int (*change)(struct Qdisc *, struct rtattr *arg);
// 输出
int (*dump)(struct Qdisc *, struct sk_buff *);
int (*dump_stats)(struct Qdisc *, struct gnet_dump *);
struct module *owner;
};
流控队列类别操作结构:
struct Qdisc_class_ops
{
/* Child qdisc manipulation */
// 减子节点
int (*graft)(struct Qdisc *, unsigned long cl,
struct Qdisc *, struct Qdisc **);
// 增加子节点
struct Qdisc * (*leaf)(struct Qdisc *, unsigned long cl);
/* Class manipulation routines */
// 获取, 增加使用计数
unsigned long (*get)(struct Qdisc *, u32 classid);
// 释放, 减少使用计数
void (*put)(struct Qdisc *, unsigned long);
// 改变
int (*change)(struct Qdisc *, u32, u32,
struct rtattr **, unsigned long *);
// 删除
int (*delete)(struct Qdisc *, unsigned long);
// 遍历
void (*walk)(struct Qdisc *, struct qdisc_walker * arg);
/* Filter manipulation */
struct tcf_proto ** (*tcf_chain)(struct Qdisc *, unsigned long);
// tc捆绑
unsigned long (*bind_tcf)(struct Qdisc *, unsigned long,
u32 classid);
// tc解除
void (*unbind_tcf)(struct Qdisc *, unsigned long);
/* rtnetlink specific */
// 输出
int (*dump)(struct Qdisc *, unsigned long,
struct sk_buff *skb, struct tcmsg*);
int (*dump_stats)(struct Qdisc *, unsigned long,
struct gnet_dump *);
};
// 流控速率控制表结构
struct qdisc_rate_table
{
struct tc_ratespec rate;
u32 data[256];
struct qdisc_rate_table *next;
int refcnt;
};
4. 基本操作
各种流控算法是通过流控操作结构实现,然后这些操作结构登记到内核的流控链表,在使用时可为网
卡构造新的流控结构,将该结构和某种流控操作结构挂钩,这样就实现网卡采用某种策略发送数据进
行流控,所有操作在用户空间都可通过tc程序设置。
4.1 Qdisc的一些基本操作
4.1.1 分配新的流控结构
/* net/sched/sch_generic.c */
// 分配新的Qdisc结构, Qdisc的操作结构由函数参数指定
struct Qdisc *qdisc_alloc(struct net_device *dev, struct Qdisc_ops *ops)
{
void *p;
struct Qdisc *sch;
unsigned int size;
int err = -ENOBUFS;
/* ensure that the Qdisc and the private data are 32-byte aligned */
// Qdisc空间按32字节对齐
size = QDISC_ALIGN(sizeof(*sch));
// 增加私有数据空间
size += ops->priv_size + (QDISC_ALIGNTO - 1);
p = kzalloc(size, GFP_KERNEL);
if (!p)
goto errout;
// 确保从缓冲区中的sch到缓冲区结束点空间是32字节对齐的
sch = (struct Qdisc *) QDISC_ALIGN((unsigned long) p);
// 填充字节的数量
sch->padded = (char *) sch - (char *) p;
// +------------------------------------+
// |________|___________________________|
// ^ ^ ^
// | pad | <------ 32*N ------------>|
// p sch
// 初始化链表, 将用于挂接到dev的Qdisc链表
INIT_LIST_HEAD(&sch->list);
// 初始化数据包链表
skb_queue_head_init(&sch->q);
// Qdisc结构参数
sch->ops = ops;
sch->enqueue = ops->enqueue;
sch->dequeue = ops->dequeue;
sch->dev = dev;
// 网卡使用计数增加
dev_hold(dev);
sch->stats_lock = &dev->queue_lock;
atomic_set(&sch->refcnt, 1);
return sch;
errout:
return ERR_PTR(-err);
}
struct Qdisc * qdisc_create_dflt(struct net_device *dev, struct Qdisc_ops *ops)
{
struct Qdisc *sch;
// 分配Qdisc结构
sch = qdisc_alloc(dev, ops);
if (IS_ERR(sch))
goto errout;
// 如果没有初始化函数或者初始化成功, 返回Qdisc结构
if (!ops->init || ops->init(sch, NULL) == 0)
return sch;
// 初始化失败, 释放Qdisc
qdisc_destroy(sch);
errout:
return NULL;
}
/* Under dev->queue_lock and BH! */
// 调用Qdisc操作函数中的reset函数
void qdisc_reset(struct Qdisc *qdisc)
{
struct Qdisc_ops *ops = qdisc->ops;
if (ops->reset)
ops->reset(qdisc);
}
/* this is the rcu callback function to clean up a qdisc when there
* are no further references to it */
// 真正释放Qdisc缓冲区
static void __qdisc_destroy(struct rcu_head *head)
{
struct Qdisc *qdisc = container_of(head, struct Qdisc, q_rcu);
// qdisc-padded就是缓冲区头的位置
kfree((char *) qdisc - qdisc->padded);
}
/* Under dev->queue_lock and BH! */
// 释放Qdisc
void qdisc_destroy(struct Qdisc *qdisc)
{
struct Qdisc_ops *ops = qdisc->ops;
// 检查Qdisc的使用计数
if (qdisc->flags & TCQ_F_BUILTIN ||
!atomic_dec_and_test(&qdisc->refcnt))
return;
// 将Qdisc从网卡设备的Qdisc链表中断开
list_del(&qdisc->list);
#ifdef CONFIG_NET_ESTIMATOR
gen_kill_estimator(&qdisc->bstats, &qdisc->rate_est);
#endif
// 复位操作
if (ops->reset)
ops->reset(qdisc);
// 内部释放操作
if (ops->destroy)
ops->destroy(qdisc);
// 减少操作结构的模块计数
module_put(ops->owner);
// 减少网卡使用计数
dev_put(qdisc->dev);
// 对每个CPU的数据进行具体空间释放
call_rcu(&qdisc->q_rcu, __qdisc_destroy);
}
/* include/net/sch_generic.h */
// 将skb包添加到数据队列最后
static inline int __qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch,
struct sk_buff_head *list)
{
// 将数据包连接到数据包链表尾
__skb_queue_tail(list, skb);
// 更新统计信息
// 当前队列中数据包的数据长度增加
sch->qstats.backlog += skb->len;
// Qdisc处理的数据包数字节数增加
sch->bstats.bytes += skb->len;
sch->bstats.packets++;
return NET_XMIT_SUCCESS;
}
static inline int qdisc_enqueue_tail(struct sk_buff *skb, struct Qdisc *sch)
{
return __qdisc_enqueue_tail(skb, sch, &sch->q);
}
// 将队列头的数据包出队列
static inline struct sk_buff *__qdisc_dequeue_head(struct Qdisc *sch,
struct sk_buff_head *list)
{
// 从skb链表中头skb包出队列
struct sk_buff *skb = __skb_dequeue(list);
// 减少当前数据队列数据长度计数
if (likely(skb != NULL))
sch->qstats.backlog -= skb->len;
return skb;
}
static inline struct sk_buff *qdisc_dequeue_head(struct Qdisc *sch)
{
return __qdisc_dequeue_head(sch, &sch->q);
}
// 将队列尾的数据包出队列
static inline struct sk_buff *__qdisc_dequeue_tail(struct Qdisc *sch,
struct sk_buff_head *list)
{
// 从链表为提出数据包
struct sk_buff *skb = __skb_dequeue_tail(list);
if (likely(skb != NULL))
sch->qstats.backlog -= skb->len;
return skb;
}
static inline struct sk_buff *qdisc_deq
展开阅读全文