随笔 - 137  文章 - 1  trackbacks - 0
<2024年3月>
252627282912
3456789
10111213141516
17181920212223
24252627282930
31123456

常用链接

留言簿

随笔分类

随笔档案

收藏夹

调试技巧

搜索

  •  

最新评论

阅读排行榜

评论排行榜

     摘要: 关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术。下面开始正文。目录关于CAS等原子操作无锁队列的链表实现CAS的ABA问题解决ABA的问题用数组实现无锁队列 小结关于CAS等原子操作在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare ...  阅读全文
posted @ 2020-04-23 17:49 长戟十三千 阅读(1989) | 评论 (0)编辑 收藏
Linux kernel里面从来就不缺少简洁,优雅和高效的代码,只是我们缺少发现和品味的眼光。在Linux kernel里面,简洁并不表示代码使用神出鬼没的超然技巧,相反,它使用的不过是大家非常熟悉的基础数据结构,但是kernel开发者能从基础的数据结构中,提炼出优美的特性。 
kfifo就是这样的一类优美代码,它十分简洁,绝无多余的一行代码,却非常高效。 
关于kfifo信息如下:
本文分析的原代码版本: 2.6.24.4
kfifo的定义文件: kernel/kfifo.c
kfifo的头文件: include/linux/kfifo.h
kfifo概述
kfifo是内核里面的一个First In First Out数据结构,它采用环形循环队列的数据结构来实现;它提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场情时,两个线程可以并发操作,而不需要任何加锁行为,就可以保证kfifo的线程安全。 
kfifo代码既然肩负着这么多特性,那我们先一敝它的代码:
struct kfifo {
    unsigned char *buffer;    /* the buffer holding the data */
    unsigned int size;    /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;    /* data is extracted from off. (out % size) */
    spinlock_t *lock;    /* protects concurrent modifications */
};
1
2
3
4
5
6
7
这是kfifo的数据结构,kfifo主要提供了两个操作,__kfifo_put(入队操作)和__kfifo_get(出队操作)。 它的各个数据成员如下:
buffer: 用于存放数据的缓存
size: buffer空间的大小,在初化时,将它向上扩展成2的幂
lock: 如果使用不能保证任何时间最多只有一个读线程和写线程,需要使用该lock实施同步。
in, out: 和buffer一起构成一个循环队列。 in指向buffer中队头,而且out指向buffer中的队尾,它的结构如示图如下:
+--------------------------------------------------------------+
|            |<----------data---------->|                      |
+--------------------------------------------------------------+
             ^                          ^                      ^
             |                          |                      |
            out                        in                     size
1
2
3
4
5
6
当然,内核开发者使用了一种更好的技术处理了in, out和buffer的关系,我们将在下面进行详细分析。
kfifo功能描述
kfifo提供如下对外功能规格
只支持一个读者和一个读者并发操作
无阻塞的读写操作,如果空间不够,则返回实际访问空间
kfifo_alloc 分配kfifo内存和初始化工作
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;
    /*
     * round up to the next power of 2, since our 'let the indices
     * wrap' tachnique works only in this case.
     */
    if (size & (size - 1)) {
        BUG_ON(size > 0x80000000);
        size = roundup_pow_of_two(size);
    }
    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);
    ret = kfifo_init(buffer, size, gfp_mask, lock);
    if (IS_ERR(ret))
        kfree(buffer);
    return ret;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
这里值得一提的是,kfifo->size的值总是在调用者传进来的size参数的基础上向2的幂扩展,这是内核一贯的做法。这样的好处不言而喻——对kfifo->size取模运算可以转化为与运算,如下:
kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1)
在kfifo_alloc函数中,使用size & (size – 1)来判断size 是否为2幂,如果条件为真,则表示size不是2的幂,然后调用roundup_pow_of_two将之向上扩展为2的幂。
这都是常用的技巧,只不过大家没有将它们结合起来使用而已,下面要分析的__kfifo_put和__kfifo_get则是将kfifo->size的特点发挥到了极致。
__kfifo_put和__kfifo_get巧妙的入队和出队
__kfifo_put是入队操作,它先将数据放入buffer里面,最后才修改in参数;__kfifo_get是出队操作,它先将数据从buffer中移走,最后才修改out。你会发现in和out两者各司其职。
下面是__kfifo_put和__kfifo_get的代码
unsigned int __kfifo_put(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    len = min(len, fifo->size - fifo->in + fifo->out);
    /*
     * Ensure that we sample the fifo->out index -before- we
     * start putting bytes into the kfifo.
     */
    smp_mb();
    /* first put the data starting from fifo->in to buffer end */
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    /* then put the rest (if any) at the beginning of the buffer */
    memcpy(fifo->buffer, buffer + l, len - l);
    /*
     * Ensure that we add the bytes to the kfifo -before-
     * we update the fifo->in index.
     */
    smp_wmb();
    fifo->in += len;
    return len;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
奇怪吗?代码完全是线性结构,没有任何if-else分支来判断是否有足够的空间存放数据。内核在这里的代码非常简洁,没有一行多余的代码。
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
这个表达式计算当前写入的空间,换成人可理解的语言就是:
l = kfifo可写空间和预期写入空间的最小值
使用min宏来代if-else分支
__kfifo_get也应用了同样技巧,代码如下:
unsigned int __kfifo_get(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    len = min(len, fifo->in - fifo->out);
    /*
     * Ensure that we sample the fifo->in index -before- we
     * start removing bytes from the kfifo.
     */
    smp_rmb();
    /* first get the data from fifo->out until the end of the buffer */
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    /* then get the rest (if any) from the beginning of the buffer */
    memcpy(buffer + l, fifo->buffer, len - l);
    /*
     * Ensure that we remove the bytes from the kfifo -before-
     * we update the fifo->out index.
     */
    smp_mb();
    fifo->out += len;
    return len;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
认真读两遍吧,我也读了多次,每次总是有新发现,因为in, out和size的关系太巧妙了,竟然能利用上unsigned int回绕的特性。
原来,kfifo每次入队或出队,kfifo->in或kfifo->out只是简单地kfifo->in/kfifo->out += len,并没有对kfifo->size 进行取模运算。因此kfifo->in和kfifo->out总是一直增大,直到unsigned in最大值时,又会绕回到0这一起始端。但始终满足:
kfifo->in - kfifo->out <= kfifo->size
即使kfifo->in回绕到了0的那一端,这个性质仍然是保持的。
对于给定的kfifo:
数据空间长度为:kfifo->in - kfifo->out
而剩余空间(可写入空间)长度为:kfifo->size - (kfifo->in - kfifo->out)
尽管kfifo->in和kfofo->out一直超过kfifo->size进行增长,但它对应在kfifo->buffer空间的下标却是如下:
kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))
kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))
往kfifo里面写一块数据时,数据空间、写入空间和kfifo->size的关系如果满足:
kfifo->in % size + len > size
那就要做写拆分了,见下图:
                                                    kfifo_put(写)空间开始地址
                                                    |
                                                   \_/
                                                    |XXXXXXXXXX
XXXXXXXX|                                                    
+--------------------------------------------------------------+
|                        |<----------data---------->|          |
+--------------------------------------------------------------+
                         ^                          ^          ^
                         |                          |          |
                       out%size                   in%size     size
        ^
        |
      写空间结束地址                      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
第一块当然是: [kfifo->in % kfifo->size, kfifo->size] 
第二块当然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]
下面是代码,细细体味吧:
/* first put the data starting from fifo->in to buffer end */   
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));   
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);   
/* then put the rest (if any) at the beginning of the buffer */   
memcpy(fifo->buffer, buffer + l, len - l);  
1
2
3
4
5
6
对于kfifo_get过程,也是类似的,请各位自行分析。
kfifo_get和kfifo_put无锁并发操作
计算机科学家已经证明,当只有一个读经程和一个写线程并发操作时,不需要任何额外的锁,就可以确保是线程安全的,也即kfifo使用了无锁编程技术,以提高kernel的并发。
kfifo使用in和out两个指针来描述写入和读取游标,对于写入操作,只更新in指针,而读取操作,只更新out指针,可谓井水不犯河水,示意图如下:
                                               |<--写入-->|
+--------------------------------------------------------------+
|                        |<----------data----->|               |
+--------------------------------------------------------------+
                         |<--读取-->|
                         ^                     ^               ^
                         |                     |               |
                        out                   in              size
1
2
3
4
5
6
7
8
为了避免读者看到写者预计写入,但实际没有写入数据的空间,写者必须保证以下的写入顺序:
往[kfifo->in, kfifo->in + len]空间写入数据
更新kfifo->in指针为 kfifo->in + len
在操作1完成时,读者是还没有看到写入的信息的,因为kfifo->in没有变化,认为读者还没有开始写操作,只有更新kfifo->in之后,读者才能看到。
那么如何保证1必须在2之前完成,秘密就是使用内存屏障:smp_mb(),smp_rmb(), smp_wmb(),来保证对方观察到的内存操作顺序。
总结
读完kfifo代码,令我想起那首诗“众里寻他千百度,默然回首,那人正在灯火阑珊处”。不知你是否和我一样,总想追求简洁,高质量和可读性的代码,当用尽各种方法,江郞才尽之时,才发现Linux kernel里面的代码就是我们寻找和学习的对象
————————————————
版权声明:本文为CSDN博主「海枫」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/linyt/article/details/53355355
posted @ 2020-04-23 16:38 长戟十三千 阅读(266) | 评论 (0)编辑 收藏
     摘要: 何为C++对象模型?C++对象模型可以概括为以下2部分:1.        语言中直接支持面向对象程序设计的部分2.        对于各种支持的底层实现机制语言中直接支持面向对象程序设计的部分,如构造函数、析构函数、虚函数、继承(单继承、...  阅读全文
posted @ 2020-04-23 15:56 长戟十三千 阅读(120) | 评论 (0)编辑 收藏
posted @ 2020-04-23 15:55 长戟十三千 阅读(262) | 评论 (0)编辑 收藏
#define EAGAIN 11 /* Try again */
 
#define EINTR 4 /* Interrupted system call */
 
#define EWOULDBLOCK EAGAIN /* Operation would block */
EWOULDBLOCK用于非阻塞模式,不需要重新读或者写
EINTR指被中断唤醒,需要重新读/写
 
在Linux环境下开发经常会碰到很多错误(设置errno),其中EAGAIN是其中比较常见的一个错误(比如用在非阻塞操作中)。
从字面上来看,是提示在试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。例如,以O_NONBLOCK的标记打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。
又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其在调用一次(也许下次就能成功)。
linux-非阻塞socket编程处理EAGAIN错误
在linux进行非阻塞的socket接受数据时经常出现Resource temporarily unavailable,errno代码为11(EAGAIN),这是什么意思?
这表明你在非阻塞模式下调用了阻塞操作,在该操作没有完成就返回这个错误,这个错误不会破坏socket的同步,不用管它,下次循环接着recv就可以。对非阻塞socket而言,EAGAIN不是一种错误。在VxWorks和Windows上,EAGAIN的名字叫做EWOULDBLOCK。

EAGAIN、EWOULDBLOCK、EINTR与非阻塞 长连接
EWOULDBLOCK用于非阻塞模式,不需要重新读或者写

EINTR指操作被中断唤醒,需要重新读/写

在Linux环境下开发经常会碰到很多错误(设置errno),其中EAGAIN是其中比较常见的一个错误(比如用在非阻塞操作中)。
从字面上来看,是提示再试一次。这个错误经常出现在当应用程序进行一些非阻塞(non-blocking)操作(对文件或socket)的时候。例如,以 O_NONBLOCK的标志打开文件/socket/FIFO,如果你连续做read操作而没有数据可读。此时程序不会阻塞起来等待数据准备就绪返 回,read函数会返回一个错误EAGAIN,提示你的应用程序现在没有数据可读请稍后再试。
又例如,当一个系统调用(比如fork)因为没有足够的资源(比如虚拟内存)而执行失败,返回EAGAIN提示其再调用一次(也许下次就能成功)。
Linux - 非阻塞socket编程处理EAGAIN错误
在linux进行非阻塞的socket接收数据时经常出现Resource temporarily unavailable,errno代码为11(EAGAIN),这是什么意思?
这表明你在非阻塞模式下调用了阻塞操作,在该操作没有完成就返回这个错误,这个错误不会破坏socket的同步,不用管它,下次循环接着recv就可以。 对非阻塞socket而言,EAGAIN不是一种错误。在VxWorks和Windows上,EAGAIN的名字叫做EWOULDBLOCK。
另外,如果出现EINTR即errno为4,错误描述Interrupted system call,操作也应该继续。

最后,如果recv的返回值为0,那表明连接已经断开,我们的接收操作也应该结束。


另外附一下几个问题:

1、阻塞模式与非阻塞模式下recv的返回值各代表什么意思?有没有 区别?(就我目前了解阻塞与非阻塞recv返回值没有区分,都是 <0:出错,=0:连接关闭,>0接收到数据大小,特别:返回 值 <0时并且(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)的情况 下认为连接是正常的,继续接收。只是阻塞模式下recv会阻塞着接收数据,非阻塞模式下如果没有数据会返回,不会阻塞着读,因此需要 循环读取



2、阻塞模式与非阻塞模式下write的返回值各代表什么意思? 有没有区别?

阻塞与非阻塞write返回值没有区分,都是 <0:出错,=0:连接关闭,>0发送数据大小,特别:返回值 <0时并且 (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)的情况下认为连接是正常的, 继续发送。只是阻塞模式下write会阻塞着发送数据,非阻塞模式下如果暂时无法发送数据会返回,不会阻塞着 write,因此需要循环发送



3、阻塞模式下read返回 值 < 0 && errno != EINTR && errno != EWOULDBLOCK & amp;& errno != EAGAIN时,连接异常,需要关闭,read返回值 < 0 && amp; (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)时表示没有数据, 需要继续接收,如果返回值大于0表示接送到数据。 

非阻塞模式下read返回值 < 0表示没有数据,= 0表示 连接断开,> 0表示接收到数据。 

这2种模式下的返回值是不是这么理解,有没有跟详细的理解或跟准确的 说明? 


4、阻塞模式与非阻塞模式下是否send返回 值 < 0 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) 表示暂时发送失败,需要重试,如果send返回值 <= 0, && errno != EINTR && amp; errno != EWOULDBLOCK && errno != EAGAIN时,连接异常,需要关闭,如果send返回 值 > 0则表示发送了数据?send的返回值是否这么理解,阻塞模式与非阻塞模式下send返回值=0是否都是发送失败,还是那个模式下表示暂时 不可发送,需要 重发?

posted @ 2020-04-22 16:09 长戟十三千 阅读(913) | 评论 (0)编辑 收藏

http://name5566.com/4535.html

 

http://wizmann.tk/linux-lockless-llist.html

typeofsizeof类似,sizeof求的是变量/类型的大小,而typeof是求变量/类型的数据类型。

typeof在#define中的应用很多,例如:

#define max(a,b) \    ({ typeof (a) _a = (a); \        typeof (b) _b = (b); \      _a > _b ? _a : _b; }) 

typeof(a)获得了a的类型,声明了一个同类型的_a变量。

p.s. 上面是一个安全的用#define实现的max函数。

ACCESS_ONCE

#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x)); 

ACCESS_ONCE使用了一个类型转换,使用volatile修饰 x。避免编译器优化带来的潜在岐义。

 

参考文献列表:
http://en.wikipedia.org/wiki/Memory_barrier
http://en.wikipedia.org/wiki/Out-of-order_execution
https://www.kernel.org/doc/Documentation/memory-barriers.txt

本文例子均在 Linux(g++)下验证通过,CPU 为 X86-64 处理器架构。所有罗列的 Linux 内核代码也均在(或只在)X86-64 下有效。

本文首先通过范例(以及内核代码)来解释 Memory barrier,然后介绍一个利用 Memory barrier 实现的无锁环形缓冲区。

Memory barrier 简介

程序在运行时内存实际的访问顺序和程序代码编写的访问顺序不一定一致,这就是内存乱序访问。内存乱序访问行为出现的理由是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段:

  1. 编译时,编译器优化导致内存乱序访问(指令重排)
  2. 运行时,多 CPU 间交互引起内存乱序访问

Memory barrier 能够让 CPU 或编译器在内存访问上有序。一个 Memory barrier 之前的内存访问操作必定先于其之后的完成。Memory barrier 包括两类:

  1. 编译器 barrier
  2. CPU Memory barrier

很多时候,编译器和 CPU 引起内存乱序访问不会带来什么问题,但一些特殊情况下,程序逻辑的正确性依赖于内存访问顺序,这时候内存乱序访问会带来逻辑上的错误,例如:

  1. // thread 1
  2. while (!ok);
  3. do(x);
  4.  
  5. // thread 2
  6. x = 42;
  7. ok = 1;

此段代码中,ok 初始化为 0,线程 1 等待 ok 被设置为 1 后执行 do 函数。假如说,线程 2 对内存的写操作乱序执行,也就是 x 赋值后于 ok 赋值完成,那么 do 函数接受的实参就很可能出乎程序员的意料,不为 42。

编译时内存乱序访问

在编译时,编译器对代码做出优化时可能改变实际执行指令的顺序(例如 gcc 下 O2 或 O3 都会改变实际执行指令的顺序):

  1. // test.cpp
  2. int x, y, r;
  3. void f()
  4. {
  5. x = r;
  6. y = 1;
  7. }

编译器优化的结果可能导致 y = 1 在 x = r 之前执行完成。首先直接编译此源文件:

  1. g++ -S test.cpp

得到相关的汇编代码如下:

  1. movl r(%rip), %eax
  2. movl %eax, x(%rip)
  3. movl $1, y(%rip)

这里我们看到,x = r 和 y = 1 并没有乱序。现使用优化选项 O2(或 O3)编译上面的代码(g++ -O2 -S test.cpp),生成汇编代码如下:

  1. movl r(%rip), %eax
  2. movl $1, y(%rip)
  3. movl %eax, x(%rip)

我们可以清楚的看到经过编译器优化之后 movl $1, y(%rip) 先于 movl %eax, x(%rip) 执行。避免编译时内存乱序访问的办法就是使用编译器 barrier(又叫优化 barrier)。Linux 内核提供函数 barrier() 用于让编译器保证其之前的内存访问先于其之后的完成。内核实现 barrier() 如下(X86-64 架构):

  1. #define barrier() __asm__ __volatile__("" ::: "memory")

现在把此编译器 barrier 加入代码中:

  1. int x, y, r;
  2. void f()
  3. {
  4. x = r;
  5. __asm__ __volatile__("" ::: "memory");
  6. y = 1;
  7. }

这样就避免了编译器优化带来的内存乱序访问的问题了(如果有兴趣可以再看看编译之后的汇编代码)。本例中,我们还可以使用 volatile 这个关键字来避免编译时内存乱序访问(而无法避免后面要说的运行时内存乱序访问)。volatile 关键字能够让相关的变量之间在内存访问上避免乱序,这里可以修改 x 和 y 的定义来解决问题:

  1. volatile int x, y;
  2. int r;
  3. void f()
  4. {
  5. x = r;
  6. y = 1;
  7. }

现加上了 volatile 关键字,这使得 x 相对于 y、y 相对于 x 在内存访问上有序。在 Linux 内核中,提供了一个宏 ACCESS_ONCE 来避免编译器对于连续的 ACCESS_ONCE 实例进行指令重排。其实 ACCESS_ONCE 实现源码如下:

  1. #define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))

此代码只是将变量 x 转换为 volatile 的而已。现在我们就有了第三个修改方案:

  1. int x, y, r;
  2. void f()
  3. {
  4. ACCESS_ONCE(x) = r;
  5. ACCESS_ONCE(y) = 1;
  6. }

到此基本上就阐述完了我们的编译时内存乱序访问的问题。下面开始介绍运行时内存乱序访问。

运行时内存乱序访问

在运行时,CPU 虽然会乱序执行指令,但是在单个 CPU 的上,硬件能够保证程序执行时所有的内存访问操作看起来像是按程序代码编写的顺序执行的,这时候 Memory barrier 没有必要使用(不考虑编译器优化的情况下)。这里我们了解一下 CPU 乱序执行的行为。在乱序执行时,一个处理器真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。
早期的处理器为有序处理器(In-order processors),有序处理器处理指令通常有以下几步:

  1. 指令获取
  2. 如果指令的输入操作对象(input operands)可用(例如已经在寄存器中了),则将此指令分发到适当的功能单元中。如果一个或者多个操作对象不可用(通常是由于需要从内存中获取),则处理器会等待直到它们可用
  3. 指令被适当的功能单元执行
  4. 功能单元将结果写回寄存器堆(Register file,一个 CPU 中的一组寄存器)

相比之下,乱序处理器(Out-of-order processors)处理指令通常有以下几步:

  1. 指令获取
  2. 指令被分发到指令队列
  3. 指令在指令队列中等待,直到输入操作对象可用(一旦输入操作对象可用,指令就可以离开队列,即便更早的指令未被执行
  4. 指令被分配到适当的功能单元并执行
  5. 执行结果被放入队列(而不立即写入寄存器堆)
  6. 只有所有更早请求执行的指令的执行结果被写入寄存器堆后,指令执行的结果才被写入寄存器堆(执行结果重排序,让执行看起来是有序的

从上面的执行过程可以看出,乱序执行相比有序执行能够避免等待不可用的操作对象(有序执行的第二步)从而提高了效率。现代的机器上,处理器运行的速度比内存快很多,有序处理器花在等待可用数据的时间里已经可以处理大量指令了。
现在思考一下乱序处理器处理指令的过程,我们能得到几个结论:

  1. 对于单个 CPU 指令获取是有序的(通过队列实现)
  2. 对于单个 CPU 指令执行结果也是有序返回寄存器堆的(通过队列实现)

由此可知,在单 CPU 上,不考虑编译器优化导致乱序的前提下,多线程执行不存在内存乱序访问的问题。我们从内核源码也可以得到类似的结论(代码不完全的摘录):

  1. #ifdef CONFIG_SMP
  2. #define smp_mb() mb()
  3. #else
  4. #define smp_mb() barrier()
  5. #endif

这里可以看到,如果是 SMP 则使用 mb,mb 被定义为 CPU Memory barrier(后面会讲到),而非 SMP 时,直接使用编译器 barrier。

在多 CPU 的机器上,问题又不一样了。每个 CPU 都存在 cache(cache 主要是为了弥补 CPU 和内存之间较慢的访问速度),当一个特定数据第一次被特定一个 CPU 获取时,此数据显然不在 CPU 的 cache 中(这就是 cache miss)。此 cache miss 意味着 CPU 需要从内存中获取数据(这个过程需要 CPU 等待数百个周期),此数据将被加载到 CPU 的 cache 中,这样后续就能直接从 cache 上快速访问。当某个 CPU 进行写操作时,它必须确保其他的 CPU 已经将此数据从它们的 cache 中移除(以便保证一致性),只有在移除操作完成后此 CPU 才能安全的修改数据。显然,存在多个 cache 时,我们必须通过一个 cache 一致性协议来避免数据不一致的问题,而这个通讯的过程就可能导致乱序访问的出现,也就是这里说的运行时内存乱序访问。这里不再深入讨论整个细节,这是一个比较复杂的问题,有兴趣可以研究 http://www.rdrop.com/users/paulmck/scalability/paper/whymb.2010.06.07c.pdf 一文,其详细的分析了整个过程。

现在通过一个例子来说明多 CPU 下内存乱序访问:

  1. // test2.cpp
  2. #include <pthread.h>
  3. #include <assert.h>
  4.  
  5. // -------------------
  6. int cpu_thread1 = 0;
  7. int cpu_thread2 = 1;
  8.  
  9. volatile int x, y, r1, r2;
  10.  
  11. void start()
  12. {
  13. x = y = r1 = r2 = 0;
  14. }
  15.  
  16. void end()
  17. {
  18. assert(!(r1 == 0 && r2 == 0));
  19. }
  20.  
  21. void run1()
  22. {
  23. x = 1;
  24. r1 = y;
  25. }
  26.  
  27. void run2()
  28. {
  29. y = 1;
  30. r2 = x;
  31. }
  32.  
  33. // -------------------
  34. static pthread_barrier_t barrier_start;
  35. static pthread_barrier_t barrier_end;
  36.  
  37. static void* thread1(void*)
  38. {
  39. while (1) {
  40. pthread_barrier_wait(&barrier_start);
  41. run1();
  42. pthread_barrier_wait(&barrier_end);
  43. }
  44.  
  45. return NULL;
  46. }
  47.  
  48. static void* thread2(void*)
  49. {
  50. while (1) {
  51. pthread_barrier_wait(&barrier_start);
  52. run2();
  53. pthread_barrier_wait(&barrier_end);
  54. }
  55.  
  56. return NULL;
  57. }
  58.  
  59. int main()
  60. {
  61. assert(pthread_barrier_init(&barrier_start, NULL, 3) == 0);
  62. assert(pthread_barrier_init(&barrier_end, NULL, 3) == 0);
  63.  
  64. pthread_t t1;
  65. pthread_t t2;
  66. assert(pthread_create(&t1, NULL, thread1, NULL) == 0);
  67. assert(pthread_create(&t2, NULL, thread2, NULL) == 0);
  68.  
  69. cpu_set_t cs;
  70. CPU_ZERO(&cs);
  71. CPU_SET(cpu_thread1, &cs);
  72. assert(pthread_setaffinity_np(t1, sizeof(cs), &cs) == 0);
  73. CPU_ZERO(&cs);
  74. CPU_SET(cpu_thread2, &cs);
  75. assert(pthread_setaffinity_np(t2, sizeof(cs), &cs) == 0);
  76.  
  77. while (1) {
  78. start();
  79. pthread_barrier_wait(&barrier_start);
  80. pthread_barrier_wait(&barrier_end);
  81. end();
  82. }
  83.  
  84. return 0;
  85. }

这里创建了两个线程来运行测试代码(需要测试的代码将放置在 run 函数中)。我使用了 pthread barrier(区别于本文讨论的 Memory barrier)主要为了让两个子线程能够同时运行它们的 run 函数。此段代码不停的尝试同时运行两个线程的 run 函数,以便得出我们期望的结果。在每次运行 run 函数前会调用一次 start 函数(进行数据初始化),run 运行后会调用一次 end 函数(进行结果检查)。run1 和 run2 两个函数运行在哪个 CPU 上则通过 cpu_thread1 和 cpu_thread2 两个变量控制。
先编译此程序:g++ -lpthread -o test2 test2.cpp(这里未优化,目的是为了避免编译器优化的干扰)。需要注意的是,两个线程运行在两个不同的 CPU 上(CPU 0 和 CPU 1)。只要内存不出现乱序访问,那么 r1 和 r2 不可能同时为 0,因此断言失败表示存在内存乱序访问。编译之后运行此程序,会发现存在一定概率导致断言失败。为了进一步说明问题,我们把 cpu_thread2 的值改为 0,换而言之就是让两个线程跑在同一个 CPU 下,再运行程序发现断言不再失败。

最后,我们使用 CPU Memory barrier 来解决内存乱序访问的问题(X86-64 架构下):

  1. int cpu_thread1 = 0;
  2. int cpu_thread2 = 1;
  3.  
  4. void run1()
  5. {
  6. x = 1;
  7. __asm__ __volatile__("mfence" ::: "memory");
  8. r1 = y;
  9. }
  10.  
  11. void run2()
  12. {
  13. y = 1;
  14. __asm__ __volatile__("mfence" ::: "memory");
  15. r2 = x;
  16. }

准备使用 Memory barrier

Memory barrier 常用场合包括:

  1. 实现同步原语(synchronization primitives)
  2. 实现无锁数据结构(lock-free data structures)
  3. 驱动程序

实际的应用程序开发中,开发者可能完全不知道 Memory barrier 就可以开发正确的多线程程序,这主要是因为各种同步机制中已经隐含了 Memory barrier(但和实际的 Memory barrier 有细微差别),这就使得不直接使用 Memory barrier 不会存在任何问题。但是如果你希望编写诸如无锁数据结构,那么 Memory barrier 还是很有用的。

通常来说,在单个 CPU 上,存在依赖的内存访问有序:

  1. Q = P;
  2. D = *Q;

这里内存操作有序。然而在 Alpha CPU 上,存在依赖的内存读取操作不一定有序,需要使用数据依赖 barrier(由于 Alpha 不常见,这里就不详细解释了)。

在 Linux 内核中,除了前面说到的编译器 barrier — barrier() 和 ACCESS_ONCE(),还有 CPU Memory barrier:

  1. 通用 barrier,保证读写操作有序的,mb() 和 smp_mb()
  2. 写操作 barrier,仅保证写操作有序的,wmb() 和 smp_wmb()
  3. 读操作 barrier,仅保证读操作有序的,rmb() 和 smp_rmb()

注意,所有的 CPU Memory barrier(除了数据依赖 barrier 之外)都隐含了编译器 barrier。这里的 smp 开头的 Memory barrier 会根据配置在单处理器上直接使用编译器 barrier,而在 SMP 上才使用 CPU Memory barrier(也就是 mb()、wmb()、rmb(),回忆上面相关内核代码)。

最后需要注意一点的是,CPU Memory barrier 中某些类型的 Memory barrier 需要成对使用,否则会出错,详细来说就是:一个写操作 barrier 需要和读操作(或数据依赖)barrier 一起使用(当然,通用 barrier 也是可以的),反之依然。

Memory barrier 的范例

读内核代码进一步学习 Memory barrier 的使用。
Linux 内核实现的无锁(只有一个读线程和一个写线程时)环形缓冲区 kfifo 就使用到了 Memory barrier,实现源码如下:

  1. /*
  2. * A simple kernel FIFO implementation.
  3. *
  4. * Copyright (C) 2004 Stelian Pop <stelian@popies.net>
  5. *
  6. * This program is free software; you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation; either version 2 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program; if not, write to the Free Software
  18. * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  19. *
  20. */
  21.  
  22. #include <linux/kernel.h>
  23. #include <linux/module.h>
  24. #include <linux/slab.h>
  25. #include <linux/err.h>
  26. #include <linux/kfifo.h>
  27. #include <linux/log2.h>
  28.  
  29. /**
  30. * kfifo_init - allocates a new FIFO using a preallocated buffer
  31. * @buffer: the preallocated buffer to be used.
  32. * @size: the size of the internal buffer, this have to be a power of 2.
  33. * @gfp_mask: get_free_pages mask, passed to kmalloc()
  34. * @lock: the lock to be used to protect the fifo buffer
  35. *
  36. * Do NOT pass the kfifo to kfifo_free() after use! Simply free the
  37. * &struct kfifo with kfree().
  38. */
  39. struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
  40. gfp_t gfp_mask, spinlock_t *lock)
  41. {
  42. struct kfifo *fifo;
  43.  
  44. /* size must be a power of 2 */
  45. BUG_ON(!is_power_of_2(size));
  46.  
  47. fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
  48. if (!fifo)
  49. return ERR_PTR(-ENOMEM);
  50.  
  51. fifo->buffer = buffer;
  52. fifo->size = size;
  53. fifo->in = fifo->out = 0;
  54. fifo->lock = lock;
  55.  
  56. return fifo;
  57. }
  58. EXPORT_SYMBOL(kfifo_init);
  59.  
  60. /**
  61. * kfifo_alloc - allocates a new FIFO and its internal buffer
  62. * @size: the size of the internal buffer to be allocated.
  63. * @gfp_mask: get_free_pages mask, passed to kmalloc()
  64. * @lock: the lock to be used to protect the fifo buffer
  65. *
  66. * The size will be rounded-up to a power of 2.
  67. */
  68. struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
  69. {
  70. unsigned char *buffer;
  71. struct kfifo *ret;
  72.  
  73. /*
  74. * round up to the next power of 2, since our 'let the indices
  75. * wrap' technique works only in this case.
  76. */
  77. if (!is_power_of_2(size)) {
  78. BUG_ON(size > 0x80000000);
  79. size = roundup_pow_of_two(size);
  80. }
  81.  
  82. buffer = kmalloc(size, gfp_mask);
  83. if (!buffer)
  84. return ERR_PTR(-ENOMEM);
  85.  
  86. ret = kfifo_init(buffer, size, gfp_mask, lock);
  87.  
  88. if (IS_ERR(ret))
  89. kfree(buffer);
  90.  
  91. return ret;
  92. }
  93. EXPORT_SYMBOL(kfifo_alloc);
  94.  
  95. /**
  96. * kfifo_free - frees the FIFO
  97. * @fifo: the fifo to be freed.
  98. */
  99. void kfifo_free(struct kfifo *fifo)
  100. {
  101. kfree(fifo->buffer);
  102. kfree(fifo);
  103. }
  104. EXPORT_SYMBOL(kfifo_free);
  105.  
  106. /**
  107. * __kfifo_put - puts some data into the FIFO, no locking version
  108. * @fifo: the fifo to be used.
  109. * @buffer: the data to be added.
  110. * @len: the length of the data to be added.
  111. *
  112. * This function copies at most @len bytes from the @buffer into
  113. * the FIFO depending on the free space, and returns the number of
  114. * bytes copied.
  115. *
  116. * Note that with only one concurrent reader and one concurrent
  117. * writer, you don't need extra locking to use these functions.
  118. */
  119. unsigned int __kfifo_put(struct kfifo *fifo,
  120. const unsigned char *buffer, unsigned int len)
  121. {
  122. unsigned int l;
  123.  
  124. len = min(len, fifo->size - fifo->in + fifo->out);
  125.  
  126. /*
  127. * Ensure that we sample the fifo->out index -before- we
  128. * start putting bytes into the kfifo.
  129. */
  130.  
  131. smp_mb();
  132.  
  133. /* first put the data starting from fifo->in to buffer end */
  134. l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
  135. memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
  136.  
  137. /* then put the rest (if any) at the beginning of the buffer */
  138. memcpy(fifo->buffer, buffer + l, len - l);
  139.  
  140. /*
  141. * Ensure that we add the bytes to the kfifo -before-
  142. * we update the fifo->in index.
  143. */
  144.  
  145. smp_wmb();
  146.  
  147. fifo->in += len;
  148.  
  149. return len;
  150. }
  151. EXPORT_SYMBOL(__kfifo_put);
  152.  
  153. /**
  154. * __kfifo_get - gets some data from the FIFO, no locking version
  155. * @fifo: the fifo to be used.
  156. * @buffer: where the data must be copied.
  157. * @len: the size of the destination buffer.
  158. *
  159. * This function copies at most @len bytes from the FIFO into the
  160. * @buffer and returns the number of copied bytes.
  161. *
  162. * Note that with only one concurrent reader and one concurrent
  163. * writer, you don't need extra locking to use these functions.
  164. */
  165. unsigned int __kfifo_get(struct kfifo *fifo,
  166. unsigned char *buffer, unsigned int len)
  167. {
  168. unsigned int l;
  169.  
  170. len = min(len, fifo->in - fifo->out);
  171.  
  172. /*
  173. * Ensure that we sample the fifo->in index -before- we
  174. * start removing bytes from the kfifo.
  175. */
  176.  
  177. smp_rmb();
  178.  
  179. /* first get the data from fifo->out until the end of the buffer */
  180. l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
  181. memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
  182.  
  183. /* then get the rest (if any) from the beginning of the buffer */
  184. memcpy(buffer + l, fifo->buffer, len - l);
  185.  
  186. /*
  187. * Ensure that we remove the bytes from the kfifo -before-
  188. * we update the fifo->out index.
  189. */
  190.  
  191. smp_mb();
  192.  
  193. fifo->out += len;
  194.  
  195. return len;
  196. }
  197. EXPORT_SYMBOL(__kfifo_get);

为了更好的理解上面的源码,这里顺带说一下此实现使用到的一些和本文主题无关的技巧:

  1. 使用与操作来求取环形缓冲区的下标,相比取余操作来求取下标的做法效率要高不少。使用与操作求取下标的前提是环形缓冲区的大小必须是 2 的 N 次方,换而言之就是说环形缓冲区的大小为一个仅有一个 1 的二进制数,那么 index & (size – 1) 则为求取的下标(这不难理解)
  2. 使用了 in 和 out 两个索引且 in 和 out 是一直递增的(此做法比较巧妙),这样能够避免一些复杂的条件判断(某些实现下,in == out 时还无法区分缓冲区是空还是满)

这里,索引 in 和 out 被两个线程访问。in 和 out 指明了缓冲区中实际数据的边界,也就是 in 和 out 同缓冲区数据存在访问上的顺序关系,由于未使用同步机制,那么保证顺序关系就需要使用到 Memory barrier 了。索引 in 和 out 都分别只被一个线程修改,而被两个线程读取。__kfifo_put 先通过 in 和 out 来确定可以向缓冲区中写入数据量的多少,这时,out 索引应该先被读取后才能真正的将用户 buffer 中的数据写入缓冲区,因此这里使用到了 smp_mb(),对应的,__kfifo_get 也使用 smp_mb() 来确保修改 out 索引之前缓冲区中数据已经被成功读取并写入用户 buffer 中了。对于 in 索引,在 __kfifo_put 中,通过 smp_wmb() 保证先向缓冲区写入数据后才修改 in 索引,由于这里只需要保证写入操作有序,故选用写操作 barrier,在 __kfifo_get 中,通过 smp_rmb() 保证先读取了 in 索引(这时候 in 索引用于确定缓冲区中实际存在多少可读数据)才开始读取缓冲区中数据(并写入用户 buffer 中),由于这里只需要保证读取操作有序,故选用读操作 barrier。

到这里,Memory barrier 就介绍完毕了。

posted @ 2020-04-21 16:49 长戟十三千 阅读(348) | 评论 (0)编辑 收藏
这里说的环形队列是一种内存通讯机制,本身这个机制和语言没有什么关系,不过上篇提到了volatile语法和acquire/release语义,就以这个机制做一个例子,C语言实现。这方面的内容涉及到一些现有的语言实现的东西 
环形队列的数据结构是一个数组,简单起见我们认为通讯内容就是一个个int,则定义一个int数组和头尾索引: 
int queue[SIZE]; 
int head; 
int tail; 
方便起见可以约定:head表示队列首元素的索引,tail表示队列尾元素后面的空位的索引,队列中最多可容纳SIZE-1个元素,因为这样可以head==tail表示队列空,head==(tail+1)%SIZE表示队列满,如果能容纳SIZE个元素,就不能区分这两种情况了 
然后是读写: 
ErrorType read(int *v) 
    if (head == tail) 
    { 
        return EMPTY; 
    } 
    *v = queue[head]; 
    head = (head + 1) % SIZE; 
    return SUCCESS; 
ErrorType write(const int *v) 
    if (head == (tail + 1) % SIZE) 
    { 
        return FULL; 
    } 
    queue[tail] = *v; 
    tail = (tail + 1) % SIZE; 
    return SUCCESS; 
假如有多个线程读写队列,则一般需要锁来实现同步,但是做一点点约束和修改,就能实现无锁: 
1 队列只能单读单写 
2 共享的数据用volatile修饰 
可以就上面的代码简单分析下,read操作只会读写head,读tail和queue,反之write操作读写tail,读head,写queue,而head和tail都是正向增长的。这里的关键在于,head和tail使用int,它们的读写用一条机器指令即可完成,是原子的,在这个前提下,上述两个函数分别在两个线程执行时,无论怎么调度穿插,都不会产生冲突。例如,在读的时候,另一个线程正在写,由于是内容写完再修改tail的值,因此不会读到写了一半的数据,最多就是返回一个EMPTY错误,下次轮询的时候还能读到,反之写的时候如果有人在读,因为是读完内容才修改head,因此不会冲乱正在读的数据(当然,由于上面举例中queue的元素是int,所以不会出现单个元素不一致的情况,不过如果是结构体或一段数据就可能)。但若不是单读单写,就可能出问题了 
然后分析下为什么数据要加volatile,估计很多人都会说,因为如果不加,编译器会优化变量到寄存器,比如write修改了tail的值,而read把tail优化到寄存器里,就一直以为队列还是空的。的确volatile能阻止编译器做这类优化,每次读取都会保证从内存重新读取到寄存器,但是就上面这个例子而言,不存在这个问题,read函数在被调用的时候,还是要从内存读一下tail,因为一般来说不可能在read退出后还给它把tail值保存在寄存器里,事实上只有当在一个函数的代码段中重复使用一个变量的时候,才做上面这种优化 
这个例子里面用volatile,是为了执行的顺序性,根据上面的分析可以看到,除int的读写是原子外,这个无锁机制依赖于执行顺序,即读的时候先读,再改head,写的时候先写,再改tail。不少人可能觉得,我代码既然这么写了,那应该是按这个顺序执行,可惜这只是天真的想法,举例: 
static int a; 
static int b = 1; 
int main() 
    a = b + 1; 
    b = 0; 
    return 0; 
这段代码,如果编译器优化级别很低,比如vs的debug或g++的O0和O1,编译出来的执行序列是和语句一样的,但是在优化编译下会指令乱序(gcc): 
movl b, %eax 
movl $0, b 
addl $1, %eax 
movl %eax, a 
可以看到,将b加载到eax后,立即执行了b=0,然后才对eax+1,再复制给a,相当于把b=0提前执行了,假如我们在另一个线程判断b的值是否为0,然后访问a的值,就可能和预期不符,因为可能还没执行到写a的内存就访问了,出现了不一致的情况(vs2008下也乱序了) 
P.S.这个乱序的原因,个人猜测是将对b的存取聚在一起,减少cpu cache miss 
这里,先给出acquire和release的语义: 
acquire:禁止read-acquire之后所有的内存读写操作,被提前到read-acquire操作之前进行 
release:禁止write-release之前所有的内存读写操作,被推迟到write-release操作之后进行 
具体到volatile变量,就是说,对于一个volatile变量的读操作,所有代码中在它后面的指令不得提前到它之前执行,反之对于写操作,所有在它之前的代码不得延迟到它之后执行 
很明显上面的例子中,我们需要release语义,因此可以把b修饰为volatile,根据release语义,b=3之前的所有语句不得乱序到它后面执行,在vs下测试时,的确起作用了: 
00401000 mov eax,dword ptr [___defaultmatherr+8 (40301Ch)] //load b 
00401005 inc eax //+1 
00401006 mov dword ptr [__fmode+4 (403378h)],eax //store a 
0040100B mov dword ptr [___defaultmatherr+8 (40301Ch)],0 //store b 
但是,如果在gcc下测试,给b加volatile是没有任何效果的,对a的赋值依然像上面一样被乱序到后面执行,这显然是有问题的。不过这并不是gcc的bug,而是因为C语言对于volatile并没有规定acquire和release语义,gcc就没有实现,那为啥vs可以呢,因为vs实现了这俩语义(windows程序员欢呼吧) 
如果要解决上面这个例子在gcc的问题,只需要把a也声明为volatile即可,也就是说,虽然gcc没有实现对单独volatile变量操作时release语义,但是多个volatile变量的顺序似乎是保证的。说似乎,因为我还没有找到权威资料证明,但从经验来看是没什么问题。对于实际问题,比如实现一个无锁环形队列,最好还是用-S参数输出下汇编,确认下没有乱序比较好 
如果说,我们已经注意到并避免了上述问题,甚至对可执行文件反汇编,并对汇编做了确认,那是不是就没问题了?可惜这个想法还是太天真了,即便你保证了汇编(机器代码)级别的有序性,它最终还是要到cpu里面执行的,而cpu为了执行速度,也会采用乱序优化,这个是volatile无法控制的领域 
回忆一下大学的计算机组成和体系结构,cpu是由一些硬件组件组成,而硬件组件的工作是可以高度并行的,于是有了最经典的五级流水线,而现在的cpu的流水线是非常复杂的,还有指令乱序和分支预测等技术 
cpu指令乱序是一种统筹规划,比如小时候都做过统筹相关的题目:小明要做若干菜,其中对每个菜,切菜xx分钟,煮xx分钟,腌xx分钟,其中若干步骤可以并行,问如何安排等 
举个简单的例子(寄存器表达式伪代码): 
eax /= ebx 
eax += eax 
ecx -= edx 
ecx *= ecx 
执行这个序列的时候,如果按最原始的办法,一个个指令串行执行,则可能很浪费,因为第一个除法可能要消耗十几个cpu周期,后面的加减乘法又有几个周期。如果不用乱序执行,只考虑流水线,则效率提升也不大,因为只有第二和第三条指令能在流水线同时执行,第二条指令依赖第一条的结果,第四条依赖第三条的结果,流水线会停顿 
如果有了乱序执行技术,则cpu在执行第一条指令时会对后面的若干指令进行分析,找到可以提前执行的指令,具体在上面的例子,由于第二条要等第一条的eax结果,因此加法就停顿住,但是第三条和第一条没有关系,就提前到cpu执行,由于减法很快,第三条执行完后还可以把第四条提前执行,甚至可能三四都执行完成后,第一条还在除法器里面循环,然后一直等到第一条执行完,这时候再执行第二条的加法,最后的结果和简单串行执行一样,但是总耗时只是一次除法和一次加法而已 
上面是用寄存器运算乱序做个例子,对于volatile变量来说,主要受内存访问指令乱序的影响,具体的就是load和store两条指令顺序的问题,例如: 
x=x*x 
y=1 
假设我们用y=1来表示x计算完毕,根据上面的讨论,x和y都应该是volatile,编译后可能是(寄存器表达式伪代码): 
eax = x //load 
eax *= eax 
x = eax //store 
y = 1 //store 
cpu在执行这段时,分析出前三条指令有依赖关系,第四条跟前三条无关,于是可能在算乘法的时候将y的store指令乱序执行,如果另一个线程在另一个核执行,检测到y的值已经被store,以为x算完了,可能出问题,因为这时候可能还在算乘法,没有store x,这个例子中是将两次无关(cpu认为无关,但实际上是有关)的store乱序执行,简称SS乱序,对应的还有LL乱序,LS乱序和SL乱序 
很显然SS乱序会对我们上面讨论的volatile变量或无锁队列产生影响,可以从acquire和release语义来看这个问题: 
acquire:一个volatile变量的读行为之后的所有动作不应该提前到读之前,因此LL和LS乱序是不可以的 
release:一个volatile变量的写行为之前的所有动作不应该推迟到写之后,因此LS和SS乱序是不可以的 
于是乎,也只剩下SL乱序在这种情况下是安全的,幸运的是,我们常用的x86和amd64架构的cpu都只支持SL乱序,所以只要正确使用volatile和实现代码,基本不会出什么问题,各种CPU的乱序支持参考下图(前四行): 
可以看到,只有四种cpu只支持SL乱序,而其他cpu为了提高运行效率,支持力度会高一些,大部分四种乱序都支持 
既然乱序执行是cpu本身的特性,那么在支持各种乱序的cpu上,单纯依靠软件岂不是无法实现并发访问了?这显然是不可能的,解铃还须系铃人,可以利用一些特殊的机器指令能实现acquire和release语义,这也是操作系统中各种互斥量实现的基础
————————————————
版权声明:本文为CSDN博主「xtlisk」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/xtlisk/article/details/39098981
posted @ 2020-04-21 16:13 长戟十三千 阅读(530) | 评论 (0)编辑 收藏

代码段、数据段、bss段

(1)编译器在编译程序的时候,将程序中的所有的元素分成了一些组成部分,各部分构成一个段,所以说段是可执行程序的组成部分。

(2)代码段:代码段就是程序中的可执行部分,直观理解代码段就是函数堆叠组成的。

(3)数据段(也被称为数据区、静态数据区、静态区):数据段就是程序中的数据,直观理解就是C语言程序中的全局变量。(注意:全局变量才算是程序的数据,局部变量不算程序的数据,只能算是函数的数据)

(4)bss段(又叫ZI(zero initial)段):bss段的特点就是被初始化为0,bss段本质上也是属于数据段,bss段就是被初始化为0的数据段。 注意区分:数据段(.data)和bss段的区别和联系:二者本来没有本质区别,都是用来存放C程序中的全局变量的。区别在于把显示初始化为非零的全局变量存在.data段中,而把显式初始化为0或者并未显式初始化(C语言规定未显式初始化的全局变量值默认为0)的全局变量存在bss段。

有些特殊数据会被放到代码段

(1)C语言中使用char *p = "linux";定义字符串时,字符串"linux"实际被分配在代码段,也就是说这个"linux"字符串实际上是一个常量字符串而不是变量字符串。

(2)const型常量:C语言中const关键字用来定义常量,常量就是不能被改变的量。

const的实现方法至少有2种

第一种就是编译将const修饰的变量放在代码段去以实现不能修改(普遍见于各种单片机的编译器);

第二种就是由编译器来检查以确保const型的常量不会被修改,实际上const型的常量还是和普通变量一样放在数据段的(gcc中就是这样实现的)。

显式初始化为非零的全局变量和静态局部变量放在数据段

(1)放在.data段的变量有2种:第一种是显式初始化为非零的全局变量。第二种是静态局部变量,也就是static修饰的局部变量。(普通局部变量分配在栈上,静态局部变量分配在.data段)

未初始化或显式初始化为0的全局变量放在bss段 (1)bss段和.data段并没有本质区别,几乎可以不用明确去区分这两种。


C++中虚函数表位于只读数据段(.rodata),也就是C++内存模型中的常量区;而虚函数则位于代码段(.text),也就是C++内存模型中的代码区。

posted @ 2020-04-20 15:37 长戟十三千 阅读(1461) | 评论 (0)编辑 收藏
(1)C语言跟内存分配方式
<1>从静态存储区域分配.
       内存在程序编译的时候就已经分配好,这块内存在程序的整个运行期间都存在.例如全局变量、static变量.
<2>在栈上创建
       在执行函数时,函数内局部变量的存储单元都可以在栈上创建,函数执行结束时这些存储单元自动被释放.栈内存分配运算内置于处理器的指令集中,效率很高,但是分配的内存容量有限.
<3>从堆上分配,亦称动态内存分配.
       程序在运行的时候用malloc或new申请任意多少的内存,程序员自己负责在何时用free或delete释放内存.动态内存的生存期由用户决定,使用非常灵活,但问题也最多.
(2)C语言跟内存申请相关的函数主要有 alloca、calloc、malloc、free、realloc等.
    <1>alloca是向栈申请内存,因此无需释放.
    <2>malloc分配的内存是位于堆中的,并且没有初始化内存的内容,因此基本上malloc之后,调用函数memset来初始化这部分的内存空间.
    <3>calloc则将初始化这部分的内存,设置为0.
    <4>realloc则对malloc申请的内存进行大小的调整.
    <5>申请的内存最终需要通过函数free来释放.
    当程序运行过程中malloc了,但是没有free的话,会造成内存泄漏.一部分的内存没有被使用,但是由于没有free,因此系统认为这部分内存还在使用,造成不断的向系统申请内存,使得系统可用内存不断减少.但是内存泄漏仅仅指程序在运行时,程序退出时,OS将回收所有的资源.因此,适当的重起一下程序,有时候还是有点作用.
【attention】
    三个函数的申明分别是:
        void* malloc(unsigned size);
        void* realloc(void* ptr, unsigned newsize);  
        void* calloc(size_t numElements, size_t sizeOfElement); 
    都在stdlib.h函数库内,它们的返回值都是请求系统分配的地址,如果请求失败就返回NULL.
    (1)函数malloc()
        在内存的动态存储区中分配一块长度为size字节的连续区域,参数size为需要内存空间的长度,返回该区域的首地址.
    (2)函数calloc()
        与malloc相似,参数sizeOfElement为申请地址的单位元素长度,numElements为元素个数,即在内存中申请numElements*sizeOfElement字节大小的连续地址空间.
    (3)函数realloc()
        给一个已经分配了地址的指针重新分配空间,参数ptr为原有的空间地址,newsize是重新申请的地址长度.
    区别:
    (1)函数malloc不能初始化所分配的内存空间,而函数calloc能.如果由malloc()函数分配的内存空间原来没有被使用过,则其中的每一位可能都是0;反之, 如果这部分内存曾经被分配过,则其中可能遗留有各种各样的数据.也就是说,使用malloc()函数的程序开始时(内存空间还没有被重新分配)能正常进行,但经过一段时间(内存空间还已经被重新分配)可能会出现问题.
    (2)函数calloc() 会将所分配的内存空间中的每一位都初始化为零,也就是说,如果你是为字符类型或整数类型的元素分配内存,那么这些元素将保证会被初始化为0;如果你是为指针类型的元素分配内存,那么这些元素通常会被初始化为空指针;如果你为实型数据分配内存,则这些元素会被初始化为浮点型的零.
    (3)函数malloc向系统申请分配指定size个字节的内存空间.返回类型是 void*类型.void*表示未确定类型的指针.C,C++规定,void* 类型可以强制转换为任何其它类型的指针.
    (4)realloc可以对给定的指针所指的空间进行扩大或者缩小,无论是扩张或是缩小,原有内存的中内容将保持不变.当然,对于缩小,则被缩小的那一部分的内容会丢失.realloc并不保证调整后的内存空间和原来的内存空间保持同一内存地址.相反,realloc返回的指针很可能指向一个新的地址.
    (5)realloc是从堆上分配内存的.当扩大一块内存空间时,realloc()试图直接从堆上现存的数据后面的那些字节中获得附加的字节,如果能够满足,自然天下太平;如果数据后面的字节不够,问题就出来了,那么就使用堆上第一个有足够大小的自由块,现存的数据然后就被拷贝至新的位置,而老块则放回到堆上.这句话传递的一个重要的信息就是数据可能被移动. 
————————————————
版权声明:本文为CSDN博主「shuaishuai80」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/shuaishuai80/article/details/6140979
posted @ 2020-04-20 15:29 长戟十三千 阅读(2610) | 评论 (0)编辑 收藏
posted @ 2020-04-20 10:38 长戟十三千 阅读(126) | 评论 (0)编辑 收藏
仅列出标题
共14页: 1 2 3 4 5 6 7 8 9 Last