前面我介绍过google对内核协议栈的patch,RPS,它主要是为了软中断的负载均衡,这次继续来介绍google 的对RPS的增强path RFS(receive flow steering),RPS是把软中断map到对应cpu,而这个时候还会有另外的性能影响,那就是如果应用程序所在的cpu和软中断处理的cpu不是同一个,此时对于cpu cache的影响会很大。 这里要注意,在kernel 的2.6.35中 这两个patch已经加入了。

ok,先来描述下它是怎么做的,其实这个补丁很简单,想对于rps来说就是添加了一个cpu的选择,也就是说我们需要根据应用程序的cpu来选择软中断需要被处理的cpu。这里做法是当调用recvmsg的时候,应用程序的cpu会被存储在一个hash table中,而索引是根据socket的rxhash进行计算的。而这个rxhash就是RPS中计算得出的那个skb的hash值.

可是这里会有一个问题,那就是当多个线程或者进程读取相同的socket的时候,此时就会导致cpu id不停的变化,从而导致大量的OOO的数据包(这是因为cpu id变化,导致下面软中断不停的切换到不同的cpu,此时就会导致大量的乱序的包).

而RFS是如何解决这个问题的呢,它做了两个表rps_sock_flow_table和rps_dev_flow_table,其中第一个rps_sock_flow_table是一个全局的hash表,这个表针对socket的,映射了socket对应的cpu,这里的cpu就是应用层期待软中断所在的cpu。

1
2
3
4
5
6
7
8
9
10
  
struct rps_sock_flow_table {

unsigned int mask;

//hash表

u16 ents[0];

};

可以看到它有两个域,其中第一个是掩码,用于来计算hash表的索引,而ents就是保存了对应socket的cpu。

然后是rps_dev_flow_table,这个是针对设备的,每个设备队列都含有一个rps_dev_flow_table(这个表主要是保存了上次处理相同链接上的skb所在的cpu),这个hash表中每一个元素包含了一个cpu id,一个tail queue的计数器,这个值是一个很关键的值,它主要就是用来解决上面大量OOO的数据包的问题的,它保存了当前的dev flow table需要处理的数据包的尾部计数。接下来我们会详细分析这个东西。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  
struct netdev_rx_queue {

struct rps_map *rps_map;

//每个设备的队列保存了一个rps_dev_flow_table

struct rps_dev_flow_table *rps_flow_table;

struct kobject kobj;

struct netdev_rx_queue *first;

atomic_t count;

} ____cacheline_aligned_in_smp;

struct rps_dev_flow_table {

unsigned int mask;

struct rcu_head rcu;

struct work_struct free_work;

//hash表

struct rps_dev_flow flows[0];

};

struct rps_dev_flow {

u16 cpu;

u16 fill;

//tail计数。

unsigned int last_qtail;

};

首先我们知道,大量的OOO的数据包的引起是因为多个进程同时请求相同的socket,而此时会导致这个socket对应的cpu id不停的切换,然后软中断如果不做处理,只是简单的调度软中断到不同的cpu,就会导致顺序的数据包被分发到不同的cpu,由于是smp,因此会导致大量的OOO的数据包,而在RFS中是这样解决这个问题的,在soft_net中添加了2个域,input_queue_head和input_queue_tail,然后在设备队列中添加了rps_flow_table,而rps_flow_table中的元素rps_dev_flow包含有一个last_qtail,RFS就通过这3个域来控制乱序的数据包。

这里为什么需要3个值呢,这是因为每个cpu上的队列的个数input_queue_tail是一直增加的,而设备每一个队列中的flow table对应的skb则是有可能会被调度到另外的cpu,而dev flow table的last_qtail表示当前的flow table所需要处理的数据包队列(backlog queue)的尾部队列计数,也就是说当input_queue_head大于等于它的时候说明当前的flow table可以切换了,否则的话不能切换到进程期待的cpu。

不过这里还要注意就是最好能够绑定进程到指定的cpu(配合rps和rfs的参数设置),这样的话,rfs和rps的效率会更好,所以我认为像erlang这种在rfs和rps下性能应该提高非常大的.

下面就是softnet_data 的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  
struct softnet_data {

struct Qdisc *output_queue;

struct Qdisc **output_queue_tailp;

struct list_head poll_list;

struct sk_buff *completion_queue;

struct sk_buff_head process_queue;

/\* stats \*/

unsigned int processed;

unsigned int time_squeeze;

unsigned int cpu_collision;

unsigned int received_rps;

#ifdef CONFIG_RPS

struct softnet_data *rps_ipi_list;

/\* Elements below can be accessed between CPUs for RPS \*/

struct call_single_data csd ____cacheline_aligned_in_smp;

struct softnet_data *rps_ipi_next;

unsigned int cpu;

//最关键的两个域

unsigned int input_queue_head;

unsigned int input_queue_tail;

#endif

unsigned dropped;

struct sk_buff_head input_pkt_queue;

struct napi_struct backlog;

};

接下来我们来看代码,来看内核是如何实现的,先来看inet_recvmsg,也就是调用rcvmsg时,内核会调用的函数,这个函数比较简单,就是多加了一行代码sock_rps_record_flow,这个函数主要是将本socket和cpu设置到rps_sock_flow_table这个hash表中。

首先要提一下,这里这两个flow table的初始化都是放在sys中初始化的,不过sys部分相关的代码我就不分析了,因为具体的逻辑和原理都是在协议栈部分实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  
int inet_recvmsg(struct kiocb \*iocb, struct socket \*sock, struct msghdr *msg,

size_t size, int flags)

{

struct sock *sk = sock->sk;

int addr_len = 0;

int err;

//设置hash表

sock_rps_record_flow(sk);

err = sk->sk_prot->recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,

flags & ~MSG_DONTWAIT, &addr_len);

if (err >= 0)

msg->msg_namelen = addr_len;

return err;

}

然后就是rps_record_sock_flow,这个函数主要是得到全局的rps_sock_flow_table,然后调用rps_record_sock_flow来对rps_sock_flow_table进行设置,这里会将socket的sk_rxhash传递进去当作hash的索引,而这个sk_rxhash其实就是skb里面的rxhash,skb的rxhash就是rps中设置的hash值,这个值是根据四元组进行hash的。这里用这个当索引一个是为了相同的socket都能落入一个index。而且下面的软中断上下文也比较容易存取这个hash表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  
struct rps_sock_flow_table *rps_sock_flow_table __read_mostly;

static inline void sock_rps_record_flow(const struct sock *sk)

{

#ifdef CONFIG_RPS

struct rps_sock_flow_table *sock_flow_table;

rcu_read_lock();

sock_flow_table = rcu_dereference(rps_sock_flow_table);

//设置hash表

rps_record_sock_flow(sock_flow_table, sk->sk_rxhash);

rcu_read_unlock();

#endif

}

其实所有的事情都是rps_record_sock_flow中做的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  
static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,

u32 hash)

{

if (table && hash) {

//获取索引。

unsigned int cpu, index = hash & table->mask;

/\* We only give a hint, preemption can change cpu under us \*/

//获取cpu

cpu = raw_smp_processor_id();

//保存对应的cpu,如果等于当前cpu,则说明已经设置过了。

if (table->ents[index] != cpu)

//否则设置cpu

table->ents[index] = cpu;

}

}

上面是进程上下文做的事情,也就是设置对应的进程所期待的cpu,它用的是rps_sock_flow_table,而接下来就是软中断上下文了,rfs这个patch主要的工作都是在软中断上下文做的。不过看这里的代码之前最好能够了解下RPS补丁,因为RFS就是对rps做了一点小改动。

主要是两个函数,第一个是enqueue_to_backlog,这个函数我们知道是用来将skb挂在到对应cpu的input queue上的,这里我们就关注他的一个函数就是input_queue_tail_incr_save,他就是更新设备的input_queue_tail以及softnet_data的input_queue_tail。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
		  
if (skb_queue_len(&sd->input_pkt_queue)) {

enqueue:

__skb_queue_tail(&sd->input_pkt_queue, skb);

//这个函数更新对应设备的rps_dev_flow_table中的input_queue_tail以及dev flow table的last_qtail

input_queue_tail_incr_save(sd, qtail);

rps_unlock(sd);

local_irq_restore(flags);

return NET_RX_SUCCESS;

}

第二个是get_rps_cpu,这个函数我们知道就是得到软中断应该运行的cpu,这里我们就看RFS添加的部分,这里它是这样计算的,首先会得到两个flow table,一个是sock_flow_table,另一个是设备的rps_flow_table(skb对应的设备队列中对应的flow table),这里的逻辑是这样子的取出来两个cpu,一个是根据rps计算数据包前一次被调度过的cpu(tcpu),一个是应用程序期望的cpu(next_cpu),然后比较这两个值,如果 1 tcpu未设置(等于RPS_NO_CPU) 2 tcpu是离线的 3 tcpu的input_queue_head大于rps_flow_table中的last_qtail 的话就调度这个skb到next_cpu.

而这里第三点input_queue_head大于rps_flow_table则说明在当前的dev flow table中的数据包已经发送完毕,否则的话为了避免乱序就还是继续使用tcpu.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
  
got_hash:

flow_table = rcu_dereference(rxqueue->rps_flow_table);

sock_flow_table = rcu_dereference(rps_sock_flow_table);

if (flow_table && sock_flow_table) {

u16 next_cpu;

struct rps_dev_flow *rflow;

//得到flow table

rflow = &flow_table->flows[skb->rxhash & flow_table->mask];

tcpu = rflow->cpu;

/得到next_cpu

next_cpu = sock_flow_table->ents[skb->rxhash &

sock_flow_table->mask];

//条件

if (unlikely(tcpu != next_cpu) &&

(tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||

((int)(per_cpu(softnet_data, tcpu).input_queue_head –

rflow->last_qtail)) >= 0)) {

//设置tcpu

tcpu = rflow->cpu = next_cpu;

if (tcpu != RPS_NO_CPU)

//更新last_qtail

rflow->last_qtail = per_cpu(softnet_data,

tcpu).input_queue_head;

}

if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {

*rflowp = rflow;

//设置返回cpu,以供软中断重新调度

cpu = tcpu;

goto done;

}

}

………………………………

最后我们来分析下第一次数据包到达协议栈而应用程序还没有调用rcvmsg读取数据包,此时会发生什么问题,当第一次进来时tcpu是RPS_NO_CPU,并且next_cpu也是RPS_NO_CPU,此时会导致跳过rfs处理,而是直接使用rps的处理,也就是上面代码的紧接着的部分,下面这段代码前面rps时已经分析过了,这里就不分析了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	  
map = rcu_dereference(rxqueue->rps_map);

if (map) {

tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32];

if (cpu_online(tcpu)) {

cpu = tcpu;

goto done;

}

}