随笔 - 83  文章 - 0  trackbacks - 0
<2019年12月>
24252627282930
1234567
891011121314
15161718192021
22232425262728
2930311234

常用链接

留言簿

随笔分类

随笔档案

收藏夹

调试技巧

搜索

  •  

最新评论

阅读排行榜

评论排行榜

60天内阅读排行

  1. 背景

前几天,发了一条如下的微博 (关于C/C++ Volatile关键词的使用建议):


 

此微博,引发了朋友们的大量讨论:赞同者有之;批评者有之;当然,更多的朋友,是希望我能更详细的解读C/C++ Volatile关键词,来佐证我的微博观点。而这,正是我写这篇博文的初衷:本文,将详细分析C/C++ Volatile关键词的功能 (有多种功能)、Volatile关键词在多线程编程中存在的问题、Volatile关键词与编译器/CPU的关系、C/C++ Volatile与Java Volatile的区别,以及Volatile关键词的起源,希望对大家更好的理解、使用C/C++ Volatile,有所帮助。

 

Volatile,词典上的解释为:易失的;易变的;易挥发的。那么用这个关键词修饰的C/C++变量,应该也能够体现出”易变”的特征。大部分人认识Volatile,也是从这个特征出发,而这也是本文揭秘的C/C++ Volatile的第一个特征。

 

 

  1. Volatile:易变的

在介绍C/C++ Volatile关键词的”易变”性前,先让我们看看以下的两个代码片段,以及他们对应的汇编指令 (以下用例的汇编代码,均为VS 2008编译出来的Release版本):

 

  • 测试用例一:非Volatile变量

    volatile1

    b = a + 1;这条语句,对应的汇编指令是:lea ecx, [eax + 1]。由于变量a,在前一条语句a = fn(c)执行时,被缓存在了寄存器eax中,因此b = a + 1;语句,可以直接使用仍旧在寄存器eax中的a,来进行计算,对应的也就是汇编:[eax + 1]。

  • 测试用例二:Volatile变量

volatile2

与测试用例一唯一的不同之处,是变量a被设置为volatile属性,一个小小的变化,带来的是汇编代码上很大的变化。a = fn(c)执行后,寄存器ecx中的a,被写回内存:mov dword ptr [esp+0Ch], ecx。然后,在执行b = a + 1;语句时,变量a有重新被从内存中读取出来:mov eax, dword ptr [esp + 0Ch],而不再直接使用寄存器ecx中的内容。

  1. 小结

从以上的两个用例,就可以看出C/C++ Volatile关键词的第一个特性:易变性。所谓的易变性,在汇编层面反映出来,就是两条语句,下一条语句不会直接使用上一条语句对应的volatile变量的寄存器内容,而是重新从内存中读取。volatile的这个特性,相信也是大部分朋友所了解的特性。

 

在了解了C/C++ Volatile关键词的”易变”特性之后,再让我们接着继续来剖析Volatile的下一个特性:”不可优化”特性。

 

  1. Volatile:不可优化的

与前面介绍的”易变”性类似,关于C/C++ Volatile关键词的第二个特性:”不可优化”性,也通过两个对比的代码片段来说明:

 

  • 测试用例三:非Volatile变量

    volatile4

    在这个用例中,非volatile变量a,b,c全部被编译器优化掉了 (optimize out),因为编译器通过分析,发觉a,b,c三个变量是无用的,可以进行常量替换。最后的汇编代码相当简介,高效率。

  • 测试用例四:Volatile变量

    volatile3

    测试用例四,与测试用例三类似,不同之处在于,a,b,c三个变量,都是volatile变量。这个区别,反映到汇编语言中,就是三个变量仍旧存在,需要将三个变量从内存读入到寄存器之中,然后再调用printf()函数。

 

  1. 小结

从测试用例三、四,可以总结出C/C++ Volatile关键词的第二个特性:“不可优化”特性。volatile告诉编译器,不要对我这个变量进行各种激进的优化,甚至将变量直接消除,保证程序员写在代码中的指令,一定会被执行。相对于前面提到的第一个特性:”易变”性,”不可优化”特性可能知晓的人会相对少一些。但是,相对于下面提到的C/C++ Volatile的第三个特性,无论是”易变”性,还是”不可优化”性,都是Volatile关键词非常流行的概念。

 

  1. Volatile:顺序性

 

C/C++ Volatile关键词前面提到的两个特性,让Volatile经常被解读为一个为多线程而生的关键词:一个全局变量,会被多线程同时访问/修改,那么线程内部,就不能假设此变量的不变性,并且基于此假设,来做一些程序设计。当然,这样的假设,本身并没有什么问题,多线程编程,并发访问/修改的全局变量,通常都会建议加上Volatile关键词修饰,来防止C/C++编译器进行不必要的优化。但是,很多时候,C/C++ Volatile关键词,在多线程环境下,会被赋予更多的功能,从而导致问题的出现。

 

回到本文背景部分我的那篇微博,我的这位朋友,正好犯了一个这样的问题。其对C/C++ Volatile关键词的使用,可以抽象为下面的伪代码:

v5

这段伪代码,声明另一个Volatile的flag变量。一个线程(Thread1)在完成一些操作后,会修改这个变量。而另外一个线程(Thread2),则不断读取这个flag变量,由于flag变量被声明了volatile属性,因此编译器在编译时,并不会每次都从寄存器中读取此变量,同时也不会通过各种激进的优化,直接将if (flag == true)改写为if (false == true)。只要flag变量在Thread1中被修改,Thread2中就会读取到这个变化,进入if条件判断,然后进入if内部进行处理。在if条件的内部,由于flag == true,那么假设Thread1中的something操作一定已经完成了,在基于这个假设的基础上,继续进行下面的other things操作。

 

通过将flag变量声明为volatile属性,很好的利用了本文前面提到的C/C++ Volatile的两个特性:”易变”性;”不可优化”性。按理说,这是一个对于volatile关键词的很好应用,而且看到这里的朋友,也可以去检查检查自己的代码,我相信肯定会有这样的使用存在。

 

但是,这个多线程下看似对于C/C++ Volatile关键词完美的应用,实际上却是有大问题的。问题的关键,就在于前面标红的文字:由于flag = true,那么假设Thread1中的something操作一定已经完成了。flag == true,为什么能够推断出Thread1中的something一定完成了?其实既然我把这作为一个错误的用例,答案是一目了然的:这个推断不能成立,你不能假设看到flag == true后,flag = true;这条语句前面的something一定已经执行完成了。这就引出了C/C++ Volatile关键词的第三个特性:顺序性。

 

同样,为了说明C/C++ Volatile关键词的”顺序性”特征,下面给出三个简单的用例 (注:与上面的测试用例不同,下面的三个用例,基于的是Linux系统,使用的是”GCC: (Debian 4.3.2-1.1) 4.3.2″):

 

  • 测试用例五:非Volatile变量

    v9

    一个简单的示例,全局变量A,B均为非volatile变量。通过gcc O2优化进行编译,你可以惊奇的发现,A,B两个变量的赋值顺序被调换了!!!在对应的汇编代码中,B = 0语句先被执行,然后才是A = B + 1语句被执行。

    在这里,我先简单的介绍一下C/C++编译器最基本优化原理:保证一段程序的输出,在优化前后无变化。将此原理应用到上面,可以发现,虽然gcc优化了A,B变量的赋值顺序,但是foo()函数的执行结果,优化前后没有发生任何变化,仍旧是A = 1;B = 0。因此这么做是可行的。

  • 测试用例六:一个Volatile变量

    v10

    此测试,相对于测试用例五,最大的区别在于,变量B被声明为volatile变量。通过查看对应的汇编代码,B仍旧被提前到A之前赋值,Volatile变量B,并未阻止编译器优化的发生,编译后仍旧发生了乱序现象。

    如此看来,C/C++ Volatile变量,与非Volatile变量之间的操作,是可能被编译器交换顺序的

    通过此用例,已经能够很好的说明,本章节前面,通过flag == true,来假设something一定完成是不成立的。在多线程下,如此使用volatile,会产生很严重的问题。但是,这不是终点,请继续看下面的测试用例七。

  • 测试用例七:两个Volatile变量

    v11

    同时将A,B两个变量都声明为volatile变量,再来看看对应的汇编。奇迹发生了,A,B赋值乱序的现象消失。此时的汇编代码,与用户代码顺序高度一直,先赋值变量A,然后赋值变量B。

    如此看来,C/C++ Volatile变量间的操作,是不会被编译器交换顺序的


  1. happens-before

 

通过测试用例六,可以总结出:C/C++ Volatile变量与非Volatile变量间的操作顺序,有可能被编译器交换。因此,上面多线程操作的伪代码,在实际运行的过程中,就有可能变成下面的顺序:

v6

由于Thread1中的代码执行顺序发生变化,flag = true被提前到something之前进行,那么整个Thread2的假设全部失效。由于something未执行,但是Thread2进入了if代码段,整个多线程代码逻辑出现问题,导致多线程完全错误。

 

细心的读者看到这里,可能要提问,根据测试用例七,C/C++ Volatile变量间,编译器是能够保证不交换顺序的,那么能不能将something中所有的变量全部设置为volatile呢?这样就阻止了编译器的乱序优化,从而也就保证了这个多线程程序的正确性。

 

针对此问题,很不幸,仍旧不行。将所有的变量都设置为volatile,首先能够阻止编译器的乱序优化,这一点是可以肯定的。但是,别忘了,编译器编译出来的代码,最终是要通过CPU来执行的。目前,市场上有各种不同体系架构的CPU产品,CPU本身为了提高代码运行的效率,也会对代码的执行顺序进行调整,这就是所谓的CPU Memory Model (CPU内存模型)。关于CPU的内存模型,可以参考这些资料:Memory Ordering From WikiMemory Barriers Are Like Source Control Operations From Jeff PreshingCPU Cache and Memory Ordering From 何登成。下面,是截取自Wiki上的一幅图,列举了不同CPU架构,可能存在的指令乱序。

 

mo

 

从图中可以看到,X86体系(X86,AMD64),也就是我们目前使用最广的CPU,也会存在指令乱序执行的行为:StoreLoad乱序,读操作可以提前到写操作之前进行。

 

因此,回到上面的例子,哪怕将所有的变量全部都声明为volatile,哪怕杜绝了编译器的乱序优化,但是针对生成的汇编代码,CPU有可能仍旧会乱序执行指令,导致程序依赖的逻辑出错,volatile对此无能为力。

 

其实,针对这个多线程的应用,真正正确的做法,是构建一个happens-before语义。关于happens-before语义的定义,可参考文章:The Happens-Before Relation。下面,用图的形式,来展示happens-before语义:

 

v7

 

如图所示,所谓的happens-before语义,就是保证Thread1代码块中的所有代码,一定在Thread2代码块的第一条代码之前完成。当然,构建这样的语义有很多方法,我们常用的Mutex、Spinlock、RWLock,都能保证这个语义 (关于happens-before语义的构建,以及为什么锁能保证happens-before语义,以后专门写一篇文章进行讨论)。但是,C/C++ Volatile关键词不能保证这个语义,也就意味着C/C++ Volatile关键词,在多线程环境下,如果使用的不够细心,就会产生如同我这里提到的错误。

 

  1. 小结

 

C/C++ Volatile关键词的第三个特性:”顺序性”,能够保证Volatile变量间的顺序性,编译器不会进行乱序优化。Volatile变量与非Volatile变量的顺序,编译器不保证顺序,可能会进行乱序优化。同时,C/C++ Volatile关键词,并不能用于构建happens-before语义,因此在进行多线程程序设计时,要小心使用volatile,不要掉入volatile变量的使用陷阱之中。

posted @ 2019-12-02 18:25 长戟十三千 阅读(2) | 评论 (0)编辑 收藏
volatile与内存屏障总结

一. 内存屏障 Memory Barrior

1.1 重排序

同步的目的是保证不同执行流对共享数据并发操作的一致性。在单核时代,使用原子变量就很容易达成这一目的。甚至因为CPU的一些访存特性,对某些内存对齐数据的读或写也具有原子的特性。但在多核架构下即使操作是原子的,仍然会因为其他原因导致同步失效。

首先是现代编译器的代码优化和编译器指令重排可能会影响到代码的执行顺序。

其次还有指令执行级别的乱序优化,流水线、乱序执行、分支预测都可能导致处理器次序(Process Ordering,机器指令在CPU实际执行时的顺序)和程序次序(Program Ordering,程序代码的逻辑执行顺序)不一致。可惜不影响语义依旧只能是保证单核指令序列间,单核时代CPU的Self-Consistent特性在多核时代已不存在(Self-Consistent即重排原则:有数据依赖不会进行重排,单核最终结果肯定一致)。

除此还有硬件级别Cache一致性(Cache Coherence)带来的问题:CPU架构中传统的MESI协议中有两个行为的执行成本比较大。一个是将某个Cache Line标记为Invalid状态,另一个是当某Cache Line当前状态为Invalid时写入新的数据。所以CPU通过Store Buffer和Invalidate Queue组件来降低这类操作的延时。如图:

当一个核心在Invalid状态进行写入时,首先会给其它CPU核发送Invalid消息,然后把当前写入的数据写入到Store Buffer中。然后异步在某个时刻真正的写入到Cache Line中。当前CPU核如果要读Cache Line中的数据,需要先扫描Store Buffer之后再读取Cache Line(Store-Buffer Forwarding)。但是此时其它CPU核是看不到当前核的Store Buffer中的数据的,要等到Store Buffer中的数据被刷到了Cache Line之后才会触发失效操作。

而当一个CPU核收到Invalid消息时,会把消息写入自身的Invalidate Queue中,随后异步将其设为Invalid状态。和Store Buffer不同的是,当前CPU核心使用Cache时并不扫描Invalidate Queue部分,所以可能会有极短时间的脏读问题。这里的Store Buffer和Invalidate Queue的说法是针对一般的SMP架构来说的,不涉及具体架构。

内存对于缓存更新策略,要区分Write-Through和Write-Back两种策略。前者更新内容直接写内存并不同时更新Cache,但要置Cache失效,后者先更新Cache,随后异步更新内存。通常X86 CPU更新内存都使用Write-Back策略。

1.2 编译器屏障 Compiler Barrior

/* The "volatile" is due to gcc bugs */ #define barrier() __asm__ __volatile__("": : :"memory")  

阻止编译器重排,保证编译程序时在优化屏障之前的指令不会在优化屏障之后执行。

1.3 CPU屏障 CPU Barrior

CPU级别内存屏障其作用有两个:

  1. 防止指令之间的重排序
  2. 保证数据的可见性

指令重排中Load和Store两种操作会有Load-Store、Store-Load、Load-Load、Store-Store这四种可能的乱序结果。

Intel为此提供三种内存屏障指令:

  • sfence ,实现Store Barrior 会将store buffer中缓存的修改刷入L1 cache中,使得其他cpu核可以观察到这些修改,而且之后的写操作不会被调度到之前,即sfence之前的写操作一定在sfence完成且全局可见;
  • lfence ,实现Load Barrior 会将invalidate queue失效,强制读取入L1 cache中,而且lfence之后的读操作不会被调度到之前,即lfence之前的读操作一定在lfence完成(并未规定全局可见性);
  • mfence ,实现Full Barrior 同时刷新store buffer和invalidate queue,保证了mfence前后的读写操作的顺序,同时要求mfence之后写操作结果全局可见之前,mfence之前写操作结果全局可见;
  • lock 用来修饰当前指令操作的内存只能由当前CPU使用,若指令不操作内存仍然由用,因为这个修饰会让指令操作本身原子化,而且自带Full Barrior效果;还有指令比如IO操作的指令、exch等原子交换的指令,任何带有lock前缀的指令以及CPUID等指令都有内存屏障的作用。

X86-64下仅支持一种指令重排:Store-Load ,即读操作可能会重排到写操作前面,同时不同线程的写操作并没有保证全局可见,例子见《Intel® 64 and IA-32 Architectures Software Developer’s Manual》手册8.6.1、8.2.3.7节。要注意的是这个问题只能用mfence解决,不能靠组合sfence和lfence解决。(用sfence+lfence组合仅可以解决重排问题,但不能解决全局可见性问题,简单理解不如视为sfence和lfence本身也能乱序重拍)

X86-64一般情况根本不会需要使用lfence与sfence这两个指令,除非操作Write-Through内存或使用 non-temporal 指令(NT指令,属于SSE指令集),比如movntdq, movnti, maskmovq,这些指令也使用Write-Through内存策略,通常使用在图形学或视频处理,Linux编程里就需要使用GNC提供的专门的函数(例子见参考资料13:Memory part 5: What programmers can do)。

下面是GNU中的三种内存屏障定义方法,结合了编译器屏障和三种CPU屏障指令

#define lfence() __asm__ __volatile__("lfence": : :"memory")  #define sfence() __asm__ __volatile__("sfence": : :"memory")  #define mfence() __asm__ __volatile__("mfence": : :"memory")  

代码中仍然使用lfence()与sfence()这两个内存屏障应该也是一种长远的考虑。按照Interface写代码是最保险的,万一Intel以后出一个采用弱一致模型的CPU,遗留代码出问题就不好了。目前在X86下面视为编译器屏障即可。

GCC 4以后的版本也提供了Built-in的屏障函数__sync_synchronize(),这个屏障函数既是编译屏障又是内存屏障,代码插入这个函数的地方会被安插一条mfence指令。

C++11为内存屏障提供了专门的函数std::atomic_thread_fence,方便移植统一行为而且可以配合内存模型进行设置,比如实现Acquire-release语义:

#include <atomic> std::atomic_thread_fence(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_release);

二、内存模型

2.1 Acquire与Release语义

  • 对于Acquire来说,保证Acquire后的读写操作不会发生在Acquire动作之前
  • 对于Release来说,保证Release前的读写操作不会发生在Release动作之后

Acquire & Release 语义保证内存操作仅在acquire和release屏障之间发生

X86-64中Load读操作本身满足Acquire语义,Store写操作本身也是满足Release语义。但Store-Load操作间等于没有保护,因此仍需要靠mfence或lock等指令才可以满足到Synchronizes-with规则。

2.2 happens-before规则

相对于Synchronizes-with规则更宽松,happens-before规则定义指令执行顺序与变量的可见性,类似偏序关系,具有可传递性,因此可以运用于并行逻辑分析。

2.3 内存一致性模型 Memory Model

内存一致性模型从程序员视角,由内存序Memory Ordering和写操作原子性Store Atomicity来定义,针对不同线程中原子操作的全局顺序:

  • Strong Consistency / Sequential consistency 顺序一致性
  • Release Consistency / release-acquire / release-consume
  • Relaxed Consistency

C++11相应定义了6种内存模型:

  • std::memory_order_seq_cst 所有读写操作不能跨过,写顺序全线程可见
  • std::memory_order_acq_rel 所有读写操作不能跨过,写顺序仅同步线程间可见、std::memory_order_release 所有读写操作不能往后乱序、std::memory_order_acquire 所有读写操作不能向前乱序、std::memory_order_consume 依赖该读操作的后续读写操作不能向前乱序
  • std::memory_order_relaxed 无特殊要求

三. volatile 关键字

voldatile关键字首先具有“易变性”,声明为volatile变量编译器会强制要求读内存,相关语句不会直接使用上一条语句对应的的寄存器内容,而是重新从内存中读取。

其次具有”不可优化”性,volatile告诉编译器,不要对这个变量进行各种激进的优化,甚至将变量直接消除,保证代码中的指令一定会被执行。

最后具有“顺序性”,能够保证Volatile变量间的顺序性,编译器不会进行乱序优化。不过要注意与非volatile变量之间的操作,还是可能被编译器重排序的。

需要注意的是其含义跟原子操作无关,比如:volatile int a; a++; 其中a++操作实际对应三条汇编指令实现”读-改-写“操作(RMW),并非原子的。

思考:bool类型是不是适合使用,不会出问题。

不同编程语言中voldatile含义与实现并不完全相同,Java语言中voldatile变量可以被看作是一种轻量级的同步,因其还附带了acuire和release语义。实际上也是从JDK5以后才通过这个措施进行完善,其volatile 变量具有 synchronized 的可见性特性,但是不具备原子特性。Java语言中有volatile修饰的变量,赋值后多执行了一个“load addl $0x0, (%esp)”操作,这个操作相当于一个lock指令,就是增加一个完全的内存屏障指令,这点与C++实现并不一样。volatile 的读性能消耗与普通变量几乎相同,但是写操作稍慢,因为它需要在本地代码中插入许多内存屏障指令来保证处理器不发生乱序执行。

Java实践中仅满足下面这些条件才应该使用volatile关键字:

  • 变量写入操作不依赖变量当前值,或确保只有一个线程更新变量的值(Java可以,C++仍然不能)
  • 该变量不会与其他变量一起纳入
  • 变量并未被锁保护

C++中voldatile等于插入编译器级别屏障,因此并不能阻止CPU硬件级别导致的重排。C++11 中volatile语义没有任何变化,不过提供了std::atomic工具可以真正实现原子操作,而且默认加入了内存屏障(可以通过在store与load操作时设置内存模型参数进行调整,默认为std::memory_order_seq_cst)。

C++实践中推荐涉及并发问题都使用std::atomic,只有涉及特殊内存操作的时候才使用volatile关键字。这些情况通常IO相关,防止相关操作被编译器优化,也是volatile关键字发明的本意。

四、使用经验

内存屏障相关并行逻辑使用的分析顺序:线程安全、操作原子性、存储器操作顺序

C++ 使用对齐变量与mfence

C++11 使用std::atomic与std::atomic_thread_fence,先使用默认std::memory_order_seq_cst模型再进行相关优化

Java 使用volatile或atomic

一个经典的例子就是DCL双重检查加锁实现单例模式,本意是想要实现延迟初始化

@NotThreadSafe public class DoubleCheckedLock {     private static Resource resoure;          public static Resource getInstance() {         if (resource == null) {             synchronized (DoubleCheckedLock.class) {                  if (resource == null)                      resource = new Resource();             }         }         return resource;     } }

问题在于未同步状态下读共享变量,可能获取的是中间值,比如这里获取的resource对象可能还未完全构造好。尽管JDK5以后声明为volatile可以避免这个问题,但DCL已经没有必要,因为Java可以利用JVM内部静态类装载的特点实现“延迟初始化占位类模式”来达到同样的效果。C++下面可以使用pthread_once实现。

后续:同步原语使用(锁、信号量等)、Lockfree与非阻塞操作

posted @ 2019-12-02 16:49 长戟十三千 阅读(4) | 评论 (0)编辑 收藏

为什么会有内存屏障

  • 每个CPU都会有自己的缓存(有的甚至L1,L2,L3),缓存的目的就是为了提高性能,避免每次都要向内存取。但是这样的弊端也很明显:不能实时的和内存发生信息交换,分在不同CPU执行的不同线程对同一个变量的缓存值不同。
  • 用volatile关键字修饰变量可以解决上述问题,那么volatile是如何做到这一点的呢?那就是内存屏障,内存屏障是硬件层的概念,不同的硬件平台实现内存屏障的手段并不是一样,java通过屏蔽这些差异,统一由jvm来生成内存屏障的指令。

内存屏障是什么

  • 硬件层的内存屏障分为两种:Load BarrierStore Barrier即读屏障和写屏障。
  • 内存屏障有两个作用:
  1. 阻止屏障两侧的指令重排序;
  2. 强制把写缓冲区/高速缓存中的脏数据等写回主内存,让缓存中相应的数据失效。
  • 对于Load Barrier来说,在指令前插入Load Barrier,可以让高速缓存中的数据失效,强制从新从主内存加载数据;
  • 对于Store Barrier来说,在指令后插入Store Barrier,能让写入缓存中的最新数据更新写入主内存,让其他线程可见。

java内存屏障

  • java的内存屏障通常所谓的四种即LoadLoad,StoreStore,LoadStore,StoreLoad实际上也是上述两种的组合,完成一系列的屏障和数据同步功能。
  • LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
  • StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
  • LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
  • StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能

volatile语义中的内存屏障

  • volatile的内存屏障策略非常严格保守,非常悲观且毫无安全感的心态:

在每个volatile写操作前插入StoreStore屏障,在写操作后插入StoreLoad屏障;
在每个volatile读操作前插入LoadLoad屏障,在读操作后插入LoadStore屏障;

  • 由于内存屏障的作用,避免了volatile变量和其它指令重排序、线程之间实现了通信,使得volatile表现出了锁的特性。

final语义中的内存屏障

  • 对于final域,编译器和CPU会遵循两个排序规则:
  1. 新建对象过程中,构造体中对final域的初始化写入和这个对象赋值给其他引用变量,这两个操作不能重排序;(废话嘛)
  2. 初次读包含final域的对象引用和读取这个final域,这两个操作不能重排序;(晦涩,意思就是先赋值引用,再调用final值)
  • 总之上面规则的意思可以这样理解,必需保证一个对象的所有final域被写入完毕后才能引用和读取。这也是内存屏障的起的作用:
  • 写final域:在编译器写final域完毕,构造体结束之前,会插入一个StoreStore屏障,保证前面的对final写入对其他线程/CPU可见,并阻止重排序。
  • 读final域:在上述规则2中,两步操作不能重排序的机理就是在读final域前插入了LoadLoad屏障。
  • X86处理器中,由于CPU不会对写-写操作进行重排序,所以StoreStore屏障会被省略;而X86也不会对逻辑上有先后依赖关系的操作进行重排序,所以LoadLoad也会变省略

参考文章:

  1. C/C++ Volatile关键词深度剖析
  2. java的并发关键字volatile
  3. 指令重排序
  4. 多处理器编程:从缓存一致性到内存模型
  5. 聊聊原子变量、锁、内存屏障那点事
  6. Why Memory Barriers?中文翻译(上)
  7. LINUX内核内存屏障
  8. Memory Model: 从多处理器到高级语言
  9. 高并发编程--多处理器编程中的一致性问题(上)
  10. 高并发编程--多处理器编程中的一致性问题(下)
  11. 如何理解C++11中的六种内存模型
  12. C/C++11 mappings to processors
  13. When should I use _mm_sfence _mm_lfence and _mm_mfence
  14. Why is (or isn't?) SFENCE + LFENCE equivalent to MFENCE?
  15. Memory part 5: What programmers can do
  16. Memory Reordering Caught in the Act
  17. C++ and the Perils of Double-Checked Locking

作者:Rinoux
链接:https://www.jianshu.com/p/2ab5e3d7e510
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
posted @ 2019-12-02 16:13 长戟十三千 阅读(3) | 评论 (0)编辑 收藏

以War3为例,启动魔兽后,首先是如何看见主机的问题:
魔兽是通过TCP/UDP协议进行数据发送的,那如何实现看到对方?我们这样:每个机器监听一个固定的UDP端口(比如6112),一旦任何机器建立主机,它就向整个局域网所有的机器的6112端口广播“我建立了主机”的信息,这样,其他机器接收到这个信息,就知道有主机建立了(广播只存在于UDP协议,使用UDP.sendto向地址255.255.255.255实现)。

来看看HF和VS平台怎么实现的:
1.挂钩UDP.Sendto,将所有广播信息(即发向地址255.255.255.255)的消息截获,然后把消息重新打包(比如{本机虚拟IP+消息数据}的形式),然后使用真正的UDP.sendto把消息转到平台服务器,服务器查看有哪些玩家是跟此玩家在同一房间,把消息传给那些玩家;平台再挂钩住接收消息用的UDP.recv From,把UDP.recvFrom的发送方地址修改为消息中的对方虚拟IP,再把数据传给真正的UDP.recvFr om。

问:万一广播信息不是建立主机而是其他的,被误截了怎么办?
答:大部分游戏包括War3的广播信息唯一的作用就是传播“建立主机”这一类需要传给所有局域网的机器的信息,就是说只有“建立主机”这一类信息会通过地址255.255.255,因此一般不会有误截发生。

实际上通过广播的信息还有主机是否人满,地图,主机是否取消建立,等信息。


其次,如何加入游戏:
魔兽在加入游戏后使用TCP协议,每个玩家对应一个连接。
在真正的局域网中,一个玩家看到和选择一个主机后点击加入,他的机器会使用TCP请求和对方连接,(地址从UDP中获得,端口是固定的6112),如果没有人满,被主机关闭等意外发生,主机就会答应此连接(使用TCP.accept),发送些数据(地图信息,其他玩家信息等),此玩家就加入了游戏,此后两机器就使用这个TCP连接通讯。

回到平台,在平台中,魔兽从UDP中获得的地址是服务器的地址啊(因为UDP信息是服务器转过来的),这样发起的TCP只能链接到服务器,怎么可能连接得上真正的玩家呢?别忘了,上面说过平台挂钩了UDP.sendTo(会把本机虚拟IP加入);挂钩了UDP.recvFrom(会把服务器这个发送方的IP改为对方虚拟IP);

接着魔兽向对方虚拟IP发起TCP.connect,可能成功吗?当然不可能,因为实际的局域网中根本没这个IP,那怎么请求连接和接受连接呢?

平台采用了这样的办法:TCP连接是靠TCP.connect发起的,平台挂钩住这个函数,把连接向服务器的地址修改为自己(即127.0.0.1或实际IP,一般用前者),然后再挂钩TCP.accept函数(此函数用来接受TCP连接),然后发送同样的连接请求由服务器转到另一台机器(即主机),根据那台机器的做法决定是否答应127.0.0.1的那个TCP.connect,(注意这个TCP.accept返回的新连接是挂钩代码创建的,挂钩的代码拥有它收到的所有数据),如果答应连接的话,是不是魔兽所有的数据就会发送到这个挂钩代码创建的连接这里了?
接着,挂钩代码把这些数据重新打包(例如{接收方机器的虚拟IP+发送数据+发送方的虚拟IP}的形式),使用UDP.sendto发到服务器,服务器从信息中获得接收方机器的虚拟IP,查找其真正的IP,并把数据发送过去,跨网的TCP发送就完成了。
(另外一台机器也按以上方法同样处理)

 

 

魔兽是游戏数据传输时基于TCP连接,此时主机作为TCP的服务端,非主机是客户端。但是没办法在Internet上实现两个非服务器主机之间的直接TCP连接(这个可以看P2P的实现原理)。那对战平台是怎么实现在Internet上通过War3的局域网模式连接对战的呢?

     简单来说是如果A建了一个主机,B要进A的主机,A通过平台转发过来的消息知道B要连接主机,就在自己本地创建一个TCP的客户端,让这个客户端与war进程的服务端连接,在魔兽中就相当于有玩家B连接进A主机建立的游戏,同时B主机在自己本地创建一个TCP服务端,让War进程的TCP客户端连接到自己本地的服务端。既然都是在本地建立的TCP连接,那么怎么实现主机和非主机的游戏数据交换的呢??

 

 

  从上面的图可以看出,War3主机和非主机的数据交换,其实是在两个本地模拟创建的TCPseverTCPClient之间进行的,当主机有数据要发给非主机,先会将数据发给主机本地的TCPClient,然后对战平台会从TCPClient数据缓冲池中取出数据,通过UDP的方式,发给非主机,非主机会将UDP数据放入TCPServer的发送数据缓冲池,由它发给魔兽进程中的TCPClient,反过来一样,这样就实现了魔兽数据的完成传输,Internet上的联网对战也就实现了。

 

   另外说一下T人挂的原理,为什么主机可以T人呢?为什么主机只是关闭本机的TCP连接就可以把远程的非主机玩家T出游戏呢?从上图我们应该可以获得答案。如果主机关闭了本地用来接受远程非主机传输的UDP信息的那个TCPClient,那么很显然,主机将不能获得这个非主机信息,远程的那个非主机也不能收到主机转发的游戏数据包了,这个时候这个非主机War3进程理所当然的认为自己与主机失去了连接,T人挂的目的也达到了。

posted @ 2019-11-19 17:16 长戟十三千 阅读(9) | 评论 (0)编辑 收藏
 转载请注明出处: http://blog.csdn.net/luotuo44/article/details/38403241
        上一篇博客说到了TAILQ_QUEUE队列,它可以把多个event结构体连在一起。是一种归类方式。本文也将讲解一种将event归类、连在一起的结构:哈希结构。
哈希结构体:
        哈希结构由下面几个结构体一起配合工作:
struct event_list
{
    struct event *tqh_first;
    struct event **tqh_last;
};
 
struct evmap_io {
//TAILQ_HEAD (event_list, event);
struct event_list events;
ev_uint16_t nread;
ev_uint16_t nwrite;
};
 
struct event_map_entry {
HT_ENTRY(event_map_entry) map_node; //next指针
evutil_socket_t fd;
union { /* This is a union in case we need to make more things that can
   be in the hashtable. */
struct evmap_io evmap_io;
} ent;
};
 
struct event_io_map
{
    //哈希表
    struct event_map_entry **hth_table;
    //哈希表的长度
    unsigned hth_table_length;
    //哈希的元素个数
    unsigned hth_n_entries;
    //resize 之前可以存多少个元素
    //在event_io_map_HT_GROW函数中可以看到其值为hth_table_length的
    //一半。但hth_n_entries>=hth_load_limit时,就会发生增长哈希表的长度
    unsigned hth_load_limit;
    //后面素数表中的下标值。主要是指明用到了哪个素数
    int hth_prime_idx;
};
        结构体event_io_map指明了哈希表的存储位置、哈希表的长度、元素个数等数据。该哈希表是使用链地址法解决冲突问题的,这一点可以从hth_talbe成员变量看到。它是一个二级指针,因为哈希表的元素是event_map_entry指针。
        除了怎么解决哈希冲突外,哈希表还有一个问题要解决,那就是哈希函数。这里的哈希函数就是模(%)。用event_map_entry结构体中的fd成员值模 event_io_map结构体中的hth_table_length。
 
        由上面那些结构体配合得到的哈希表结构如下图所示:
        
        从上图可以看到,两个发生了冲突的哈希表元素event_map_entry用一个next指向连在一起了(链地址解决冲突)。
        另外,从图或者从前面关于event_map_entry结构体的定义可以看到,它有一个evmap_io结构体。而这个evmap_io结构体又有一个struct event_list 类型的成员,而struct event_list类型就是一个TAILQ_HEAD。这正是前一篇博客说到的TAILQ_QUEUE队列的队列头。从这一点可以看到,这个哈希结构还是比较复杂的。
        为什么在哈希表的元素里面,还会有一个TAILQ_QUEUE队列呢?这得由Libevent的一个特征说起。Libevent允许用同一个文件描述符fd或者信号值,调用event_new、event_add多次。所以,同一个fd或者信号值就可以对应多个event结构体了。所以这个TAILQ_QUEUE队列就是将这些具有相同fd或者信号值的多个event结构体连一起。
 
什么情况会使用哈希表:
        有一点需要说明,那就是Libevent中的哈希表只会用于Windows系统,像遵循POSIX标准的OS是不会用到哈希表的。从下面的定义可以看到这一点。
//event-internal.h文件
#ifdef WIN32
/* If we're on win32, then file descriptors are not nice low densely packed
   integers.  Instead, they are pointer-like windows handles, and we want to
   use a hashtable instead of an array to map fds to events.
*/
#define EVMAP_USE_HT
#endif
 
 
#ifdef EVMAP_USE_HT
#include "ht-internal.h"
struct event_map_entry;
HT_HEAD(event_io_map, event_map_entry);
#else
#define event_io_map event_signal_map
#endif
        可以看到,如果是非Windows系统,那么event_io_map就会被定义成event_signal_map(这是一个很简单的结构)。而在Windows系统,那么就由HT_HEAD这个宏定义event_io_map。最后得到的event_io_map就是本文最前面所示的那样。
        为什么只有在Windows系统才会使用这个哈希表呢?代码里面的注释给出了一些解释。因为在Windows系统里面,文件描述符是一个比较大的值,不适合放到event_signal_map结构中。而通过哈希(模上一个小的值),就可以变得比较小,这样就可以放到哈希表的数组中了。而遵循POSIX标准的文件描述符是从0开始递增的,一般都不会太大,适用于event_signal_map。
哈希函数:
        前面说到哈希函数是 用文件描述符fd 模 哈希表的长度。实际上,并不是直接用fd,而是用一个叫hashsocket的函数将这个fd进行一些处理后,才去模 哈希表的长度。下面是hashsocket函数的实现:
//evmap.c文件
/* Helper used by the event_io_map hashtable code; tries to return a good hash
 * of the fd in e->fd. */
static inline unsigned
hashsocket(struct event_map_entry *e)
{
/* On win32, in practice, the low 2-3 bits of a SOCKET seem not to
* matter.  Our hashtable implementation really likes low-order bits,
* though, so let's do the rotate-and-add trick. */
unsigned h = (unsigned) e->fd;
h += (h >> 2) | (h << 30);
return h;
}
        前面的event_map_entry结构体中,还有一个HT_ENTRY的宏。从名字来看,它是一个哈希表的表项。它是一个条件宏,定义如下:
//ht-internal.h文件
#ifdef HT_CACHE_HASH_VALUES
#define HT_ENTRY(type)                          \
  struct {                                      \
    struct type *hte_next;                      \
    unsigned hte_hash;                          \
  }
#else
#define HT_ENTRY(type)                          \
  struct {                                      \
    struct type *hte_next;                      \
  }
#endif
        可以看到,如果定义了HT_CACHE_HASH_VALUES宏,那么就会多一个hte_hash变量。从宏的名字来看,这是一个cache。不错,变量hte_hash就是用来存储前面的hashsocket的返回值。当第一次计算得到后,就存放到hte_hash变量中。以后需要用到(会经常用到),就直接向这个变量要即可,无需再次计算hashsocket函数。如果没有这个变量,那么需要用到这个值,都要调用hashsocket函数计算一次。这一点从后面的代码可以看到。
哈希表操作函数:
        ht-internal.h文件里面定义了一系列的哈希函数的操作函数。下来就列出这些函数。如果打开ht-internal.h文件,就发现,它是宏的天下。该文件的函数都是由宏定义生成的。下面就贴出宏定义展开后的函数。同前一篇博文那样,是用gcc的-E选项展开的。下面的代码比较长,要有心理准备。
struct event_list
{
    struct event *tqh_first;
    struct event **tqh_last;
};
 
struct evmap_io
{
    struct event_list events;
    ev_uint16_t nread;
    ev_uint16_t nwrite;
};
 
 
struct event_map_entry
{
    struct
    {
        struct event_map_entry *hte_next;
#ifdef HT_CACHE_HASH_VALUES
        unsigned hte_hash;
#endif
    }map_node;
 
    evutil_socket_t fd;
    union
    {
        struct evmap_io evmap_io;
    }ent;
};
 
struct event_io_map
{
    //哈希表
    struct event_map_entry **hth_table;
    //哈希表的长度
    unsigned hth_table_length;
    //哈希的元素个数
    unsigned hth_n_entries;
    //resize 之前可以存多少个元素
    //在event_io_map_HT_GROW函数中可以看到其值为hth_table_length的
    //一半。但hth_n_entries>=hth_load_limit时,就会发生增长哈希表的长度
    unsigned hth_load_limit;
    //后面素数表中的下标值。主要是指明用到了哪个素数
    int hth_prime_idx;
};
 
 
 
int event_io_map_HT_GROW(struct event_io_map *ht, unsigned min_capacity);
void event_io_map_HT_CLEAR(struct event_io_map *ht);
 
int _event_io_map_HT_REP_IS_BAD(const struct event_io_map *ht);
 
//初始化event_io_map
static inline void event_io_map_HT_INIT(struct event_io_map *head)
{
    head->hth_table_length = 0;
    head->hth_table = NULL;
    head->hth_n_entries = 0;
    head->hth_load_limit = 0;
    head->hth_prime_idx = -1;
}
 
 
//在event_io_map这个哈希表中,找个表项elm
//在下面还有一个相似的函数,函数名少了最后的_P。那个函数的返回值
//是event_map_entry *。从查找来说,后面那个函数更适合。之所以
//有这个函数,是因为哈希表还有replace、remove这些操作。对于
//A->B->C这样的链表。此时,要replace或者remove节点B。
//如果只有后面那个查找函数,那么只能查找并返回一个指向B的指针。
//此时将无法修改A的next指针了。所以本函数有存在的必要。
//在本文件中,很多函数都使用了event_map_entry **。
//因为event_map_entry **类型变量,既可以修改本元素的hte_next变量
//也能指向下一个元素。
 
//该函数返回的是查找节点的前驱节点的hte_next成员变量的地址。
//所以返回值肯定不会为NULL,但是对返回值取*就可能为NULL
static inline struct event_map_entry **
_event_io_map_HT_FIND_P(struct event_io_map *head,
                        struct event_map_entry *elm)
{
    struct event_map_entry **p;
    if (!head->hth_table)
        return NULL;
 
#ifdef HT_CACHE_HASH_VALUES
    p = &((head)->hth_table[((elm)->map_node.hte_hash)
            % head->hth_table_length]);
#else
    p = &((head)->hth_table[(hashsocket(*elm))%head->hth_table_length]);
#endif
 
    //这里的哈希表是用链地址法解决哈希冲突的。
    //上面的 % 只是找到了冲突链的头。现在是在冲突链中查找。
    while (*p)
    {
        //判断是否相等。在实现上,只是简单地根据fd来判断是否相等
        if (eqsocket(*p, elm))
            return p;
 
        //p存放的是hte_next成员变量的地址
        p = &(*p)->map_node.hte_next;
    }
 
    return p;
}
 
/* Return a pointer to the element in the table 'head' matching 'elm',
 * or NULL if no such element exists */
//在event_io_map这个哈希表中,找个表项elm
static inline struct event_map_entry *
event_io_map_HT_FIND(const struct event_io_map *head,
                     struct event_map_entry *elm)
{
    struct event_map_entry **p;
    struct event_io_map *h = (struct event_io_map *) head;
 
#ifdef HT_CACHE_HASH_VALUES
    do
    {   //计算哈希值
        (elm)->map_node.hte_hash = hashsocket(elm);
    } while(0);
#endif
 
    p = _event_io_map_HT_FIND_P(h, elm);
    return p ? *p : NULL;
}
 
 
/* Insert the element 'elm' into the table 'head'.  Do not call this
 * function if the table might already contain a matching element. */
static inline void
event_io_map_HT_INSERT(struct event_io_map *head,
                       struct event_map_entry *elm)
{
    struct event_map_entry **p;
    if (!head->hth_table || head->hth_n_entries >= head->hth_load_limit)
        event_io_map_HT_GROW(head, head->hth_n_entries+1);
 
    ++head->hth_n_entries;
 
#ifdef HT_CACHE_HASH_VALUES
    do
    {   //计算哈希值.此哈希不同于用%计算的简单哈希。
        //存放到hte_hash变量中,作为cache
        (elm)->map_node.hte_hash = hashsocket(elm);
    } while (0);
 
    p = &((head)->hth_table[((elm)->map_node.hte_hash)
            % head->hth_table_length]);
#else
    p = &((head)->hth_table[(hashsocket(*elm))%head->hth_table_length]);
#endif
 
 
    //使用头插法,即后面才插入的链表,反而会在链表头。
    elm->map_node.hte_next = *p;
    *p = elm;
}
 
 
/* Insert the element 'elm' into the table 'head'. If there already
 * a matching element in the table, replace that element and return
 * it. */
static inline struct event_map_entry *
event_io_map_HT_REPLACE(struct event_io_map *head,
                        struct event_map_entry *elm)
{
    struct event_map_entry **p, *r;
 
    if (!head->hth_table || head->hth_n_entries >= head->hth_load_limit)
        event_io_map_HT_GROW(head, head->hth_n_entries+1);
 
#ifdef HT_CACHE_HASH_VALUES
    do
    {
        (elm)->map_node.hte_hash = hashsocket(elm);
    } while(0);
#endif
 
    p = _event_io_map_HT_FIND_P(head, elm);
 
    //由前面的英文注释可知,这个函数是替换插入一起进行的。如果哈希表
    //中有和elm相同的元素(指的是event_map_entry的fd成员变量值相等)
    //那么就发生替代(其他成员变量值不同,所以不是完全相同,有替换意义)
    //如果哈希表中没有和elm相同的元素,那么就进行插入操作
 
    //指针p存放的是hte_next成员变量的地址
    //这里的p存放的是被替换元素的前驱元素的hte_next变量地址
    r = *p; //r指向了要替换的元素。有可能为NULL
    *p = elm; //hte_next变量被赋予新值elm
 
    //找到了要被替换的元素r(不为NULL)
    //而且要插入的元素地址不等于要被替换的元素地址
    if (r && (r!=elm))
    {
        elm->map_node.hte_next = r->map_node.hte_next;
 
        r->map_node.hte_next = NULL;
        return r; //返回被替换掉的元素
    }
    else //进行插入操作
    {
        //这里貌似有一个bug。如果前一个判断中,r 不为 NULL,而且r == elm
        //对于同一个元素,多次调用本函数。就会出现这样情况。
        //此时,将会来到这个else里面
        //实际上没有增加元素,但元素的个数却被++了。因为r 的地址等于 elm
        //所以 r = *p; *p = elm; 等于什么也没有做。(r == elm)
//当然,因为这些函数都是Libevent内部使用的。如果它保证不会同于同
//一个元素调用本函数,那么就不会出现bug
        ++head->hth_n_entries;
        return NULL; //插入操作返回NULL,表示没有替换到元素
    }
}
 
 
/* Remove any element matching 'elm' from the table 'head'.  If such
 * an element is found, return it; otherwise return NULL. */
static inline struct event_map_entry *
event_io_map_HT_REMOVE(struct event_io_map *head,
                       struct event_map_entry *elm)
{
    struct event_map_entry **p, *r;
 
#ifdef HT_CACHE_HASH_VALUES
    do
    {
        (elm)->map_node.hte_hash = hashsocket(elm);
    } while (0);
#endif
 
    p = _event_io_map_HT_FIND_P(head,elm);
 
    if (!p || !*p)//没有找到
        return NULL;
 
    //指针p存放的是hte_next成员变量的地址
    //这里的p存放的是被替换元素的前驱元素的hte_next变量地址
    r = *p; //r现在指向要被删除的元素
    *p = r->map_node.hte_next;
    r->map_node.hte_next = NULL;
 
    --head->hth_n_entries;
 
    return r;
}
 
 
/* Invoke the function 'fn' on every element of the table 'head',
 using 'data' as its second argument.  If the function returns
 nonzero, remove the most recently examined element before invoking
 the function again. */
static inline void
event_io_map_HT_FOREACH_FN(struct event_io_map *head,
                           int (*fn)(struct event_map_entry *, void *),
                           void *data)
{
    unsigned idx;
    struct event_map_entry **p, **nextp, *next;
 
    if (!head->hth_table)
        return;
 
    for (idx=0; idx < head->hth_table_length; ++idx)
    {
        p = &head->hth_table[idx];
 
        while (*p)
        {
            //像A->B->C链表。p存放了A元素中hte_next变量的地址
            //*p则指向B元素。nextp存放B的hte_next变量的地址
            //next指向C元素。
            nextp = &(*p)->map_node.hte_next;
            next = *nextp;
 
            //对B元素进行检查
            if (fn(*p, data))
            {
                --head->hth_n_entries;
                //p存放了A元素中hte_next变量的地址
                //所以*p = next使得A元素的hte_next变量值被赋值为
                //next。此时链表变成A->C。即使抛弃了B元素。不知道
                //调用方是否能释放B元素的内存。
                *p = next;
            }
            else
            {
                p = nextp;
            }
        }
    }
}
 
 
/* Return a pointer to the first element in the table 'head', under
 * an arbitrary order.  This order is stable under remove operations,
 * but not under others. If the table is empty, return NULL. */
//获取第一条冲突链的第一个元素
static inline struct event_map_entry **
event_io_map_HT_START(struct event_io_map *head)
{
    unsigned b = 0;
 
    while (b < head->hth_table_length)
    {
        //返回哈希表中,第一个不为NULL的节点
        //即有event_map_entry元素的节点。
        //找到链。因为是哈希。所以不一定哈希表中的每一个节点都存有元素
        if (head->hth_table[b])
            return &head->hth_table[b];
 
        ++b;
    }
 
    return NULL;
}
 
 
 
/* Return the next element in 'head' after 'elm', under the arbitrary
 * order used by HT_START.  If there are no more elements, return
 * NULL.  If 'elm' is to be removed from the table, you must call
 * this function for the next value before you remove it.
 */
static inline struct event_map_entry **
event_io_map_HT_NEXT(struct event_io_map *head,
                     struct event_map_entry **elm)
{
    //本哈希解决冲突的方式是链地址
    //如果参数elm所在的链地址中,elm还有下一个节点,就直接返回下一个节点
    if ((*elm)->map_node.hte_next)
    {
        return &(*elm)->map_node.hte_next;
    }
    else //否则取哈希表中的下一条链中第一个元素
    {
#ifdef HT_CACHE_HASH_VALUES
        unsigned b = (((*elm)->map_node.hte_hash)
                      % head->hth_table_length) + 1;
#else
        unsigned b = ( (hashsocket(*elm)) % head->hth_table_length) + 1;
#endif
 
        while (b < head->hth_table_length)
        {
            //找到链。因为是哈希。所以不一定哈希表中的每一个节点都存有元素
            if (head->hth_table[b])
                return &head->hth_table[b];
            ++b;
        }
 
        return NULL;
    }
}
 
 
 
//功能同上一个函数。只是参数不同,另外本函数还会使得--hth_n_entries
//该函数主要是返回elm的下一个元素。并且哈希表的总元素个数减一。
//主调函数会负责释放*elm指向的元素。无需在这里动手
//在evmap_io_clear函数中,会调用本函数。
static inline struct event_map_entry **
event_io_map_HT_NEXT_RMV(struct event_io_map *head,
                         struct event_map_entry **elm)
{
#ifdef HT_CACHE_HASH_VALUES
    unsigned h = ((*elm)->map_node.hte_hash);
#else
    unsigned h = (hashsocket(*elm));
#endif
 
    //elm变量变成存放下一个元素的hte_next的地址
    *elm = (*elm)->map_node.hte_next;
 
    --head->hth_n_entries;
 
    if (*elm)
    {
        return elm;
    }
    else
    {
        unsigned b = (h % head->hth_table_length)+1;
 
        while (b < head->hth_table_length)
        {
            if (head->hth_table[b])
                return &head->hth_table[b];
 
            ++b;
        }
 
        return NULL;
    }
}
 
 
 
//素数表。之所以去素数,是因为在取模的时候,素数比合数更有优势。
//听说是更散,更均匀
static unsigned event_io_map_PRIMES[] =
{
    //素数表的元素具有差不多2倍的关系。
    //这使得扩容操作不会经常发生。每次扩容都预留比较大的空间
    53, 97, 193, 389, 769, 1543, 3079,
    6151, 12289, 24593, 49157, 98317,
    196613, 393241, 786433, 1572869, 3145739,
    6291469, 12582917, 25165843, 50331653, 100663319,
    201326611, 402653189, 805306457, 1610612741
};
 
 
//素数表中,元素的个数。
static unsigned event_io_map_N_PRIMES =
        (unsigned)(sizeof(event_io_map_PRIMES)
                   /sizeof(event_io_map_PRIMES[0]));
 
 
/* Expand the internal table of 'head' until it is large enough to
 * hold 'size' elements.  Return 0 on success, -1 on allocation
 * failure. */
int event_io_map_HT_GROW(struct event_io_map *head, unsigned size)
{
    unsigned new_len, new_load_limit;
    int prime_idx;
 
    struct event_map_entry **new_table;
    //已经用到了素数表中最后一个素数,不能再扩容了。
    if (head->hth_prime_idx == (int)event_io_map_N_PRIMES - 1)
        return 0;
 
    //哈希表中还够容量,无需扩容
    if (head->hth_load_limit > size)
        return 0;
 
    prime_idx = head->hth_prime_idx;
 
    do {
        new_len = event_io_map_PRIMES[++prime_idx];
 
        //从素数表中的数值可以看到,后一个差不多是前一个的2倍。
        //从0.5和后的new_load_limit <= size,可以得知此次扩容
        //至少得是所需大小(size)的2倍以上。免得经常要进行扩容
        new_load_limit = (unsigned)(0.5*new_len);
    } while (new_load_limit <= size
             && prime_idx < (int)event_io_map_N_PRIMES);
 
    if ((new_table = mm_malloc(new_len*sizeof(struct event_map_entry*))))
    {
        unsigned b;
        memset(new_table, 0, new_len*sizeof(struct event_map_entry*));
 
        for (b = 0; b < head->hth_table_length; ++b)
        {
            struct event_map_entry *elm, *next;
            unsigned b2;
 
            elm = head->hth_table[b];
            while (elm) //该节点有冲突链。遍历冲突链中所有的元素
            {
                next = elm->map_node.hte_next;
 
                //冲突链中的元素,相对于前一个素数同余(即模素数后,结果相当)
                //但对于现在的新素数就不一定同余了,即它们不一定还会冲突
                //所以要对冲突链中的所有元素都再次哈希,并放到它们应该在的地方
                //b2存放了再次哈希后,元素应该存放的节点下标。
#ifdef HT_CACHE_HASH_VALUES
                b2 = (elm)->map_node.hte_hash % new_len;
#else
                b2 = (hashsocket(*elm)) % new_len;
#endif
                //用头插法插入数据
                elm->map_node.hte_next = new_table[b2];
                new_table[b2] = elm;
 
                elm = next;
            }
        }
 
        if (head->hth_table)
            mm_free(head->hth_table);
 
        head->hth_table = new_table;
    }
    else
    {
        unsigned b, b2;
 
        //刚才mm_malloc失败,可能是内存不够。现在用更省内存的
        //mm_realloc方式。当然其代价就是更耗时(下面的代码可以看到)。
        //前面的mm_malloc会同时有hth_table和new_table两个数组。
        //而mm_realloc则只有一个数组,所以省内存,省了一个hth_table数组
        //的内存。此时,new_table数组的前head->hth_table_length个
        //元素存放了原来的冲突链的头部。也正是这个原因导致后面代码更耗时。
        //其实,只有在很特殊的情况下,这个函数才会比mm_malloc省内存.
        //就是堆内存head->hth_table区域的后面刚好有一段可以用的内存。
        //具体的,可以搜一下realloc这个函数。
        new_table = mm_realloc(head->hth_table,
                               new_len*sizeof(struct event_map_entry*));
 
        if (!new_table)
            return -1;
 
        memset(new_table + head->hth_table_length, 0,
               (new_len - head->hth_table_length)*sizeof(struct event_map_entry*)
               );
 
        for (b=0; b < head->hth_table_length; ++b)
        {
            struct event_map_entry *e, **pE;
 
            for (pE = &new_table[b], e = *pE; e != NULL; e = *pE)
            {
 
#ifdef HT_CACHE_HASH_VALUES
                b2 = (e)->map_node.hte_hash % new_len;
#else
                b2 = (hashsocket(*elm)) % new_len;
#endif
                //对于冲突链A->B->C.
                //pE是二级指针,存放的是A元素的hte_next指针的地址值
                //e指向B元素。
 
                //对新的素数进行哈希,刚好又在原来的位置
                if (b2 == b)
                {
                    //此时,无需修改。接着处理冲突链中的下一个元素即可
                    //pE向前移动,存放B元素的hte_next指针的地址值
                    pE = &e->map_node.hte_next;
                }
                else//这个元素会去到其他位置上。
                {
                    //此时冲突链修改成A->C。
                    //所以pE无需修改,还是存放A元素的hte_next指针的地址值
                    //但A元素的hte_next指针要指向C元素。用*pE去修改即可
                    *pE = e->map_node.hte_next;
 
                    //将这个元素放到正确的位置上。
                    e->map_node.hte_next = new_table[b2];
                    new_table[b2] = e;
                }
 
                //这种再次哈希的方式,很有可能会对某些元素操作两次。
                //当某个元素第一次在else中处理,那么它就会被哈希到正确的节点
                //的冲突链上。随着外循环的进行,处理到正确的节点时。在遍历该节点
                //的冲突链时,又会再次处理该元素。此时,就会在if中处理。而不会
                //进入到else中。
            }
        }
 
        head->hth_table = new_table;
    }
 
 
    //一般是当hth_n_entries >= hth_load_limit时,就会调用
    //本函数。hth_n_entries表示的是哈希表中节点的个数。而hth_load_limit
    //是hth_table_length的一半。hth_table_length则是哈希表中
    //哈希函数被模的数字。所以,当哈希表中的节点个数到达哈希表长度的一半时
    //就会发生增长,本函数被调用。这样的结果是:平均来说,哈希表应该比较少发生
    //冲突。即使有,冲突链也不会太长。这样就能有比较快的查找速度。
    head->hth_table_length = new_len;
    head->hth_prime_idx = prime_idx;
    head->hth_load_limit = new_load_limit;
 
    return 0;
}
 
 
/* Free all storage held by 'head'.  Does not free 'head' itself,
 * or individual elements. 并不需要释放独立的元素*/
//在evmap_io_clear函数会调用该函数。其是在删除所有哈希表中的元素后
//才调用该函数的。
void event_io_map_HT_CLEAR(struct event_io_map *head)
{
    if (head->hth_table)
        mm_free(head->hth_table);
 
    head->hth_table_length = 0;
 
    event_io_map_HT_INIT(head);
}
 
 
/* Debugging helper: return false iff the representation of 'head' is
 * internally consistent. */
int _event_io_map_HT_REP_IS_BAD(const struct event_io_map *head)
{
    unsigned n, i;
    struct event_map_entry *elm;
 
    if (!head->hth_table_length)
    {
        //刚被初始化,还没申请任何空间
        if (!head->hth_table && !head->hth_n_entries
                && !head->hth_load_limit && head->hth_prime_idx == -1
                )
            return 0;
        else
            return 1;
    }
 
    if (!head->hth_table || head->hth_prime_idx < 0
            || !head->hth_load_limit
            )
        return 2;
 
    if (head->hth_n_entries > head->hth_load_limit)
        return 3;
 
    if (head->hth_table_length != event_io_map_PRIMES[head->hth_prime_idx])
        return 4;
 
    if (head->hth_load_limit != (unsigned)(0.5*head->hth_table_length))
        return 5;
 
    for (n = i = 0; i < head->hth_table_length; ++i)
    {
        for (elm = head->hth_table[i]; elm; elm = elm->map_node.hte_next)
        {
 
#ifdef HT_CACHE_HASH_VALUES
 
            if (elm->map_node.hte_hash != hashsocket(elm))
                return 1000 + i;
 
            if( (elm->map_node.hte_hash % head->hth_table_length) != i)
                return 10000 + i;
 
#else
            if ( (hashsocket(*elm)) != hashsocket(elm))
                return 1000 + i;
 
            if( ( (hashsocket(*elm)) % head->hth_table_length) != i)
                return 10000 + i;
#endif
            ++n;
        }
    }
 
    if (n != head->hth_n_entries)
        return 6;
 
    return 0;
}
        代码中的注释已经对这个哈希表的一些特征进行了描述,这里就不多说了。
 
哈希表在Libevent的使用:
        现在来讲一下event_io_map的应用。
        在event_base这个结构体中有一个event_io_map类型的成员变量io。它就是一个哈希表。当一个监听读或者写操作的event,调用event_add函数插入到event_base中时,就会调用evmap_io_add函数。evmap_io_add函数应用到这个event_io_map结构体。该函数的定义如下,其中使用到了一个宏定义,我已经展开了。
int
evmap_io_add(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
    const struct eventop *evsel = base->evsel;
    struct event_io_map *io = &base->io;
    struct evmap_io *ctx = NULL;
    int nread, nwrite, retval = 0;
    short res = 0, old = 0;
    struct event *old_ev;
 
    EVUTIL_ASSERT(fd == ev->ev_fd);
 
    if (fd < 0)
        return 0;
 
    //GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
    // evsel->fdinfo_len);SLOT指的是fd
    //GET_IO_SLOT_AND_CTOR宏将展开成下面这个do{}while(0);
    do
    {
        struct event_map_entry _key, *_ent;
        _key.fd = fd;
 
        struct event_io_map *_ptr_head = io;
        struct event_map_entry **ptr;
 
        //哈希表扩容,减少冲突的可能性
        if (!_ptr_head->hth_table
                || _ptr_head->hth_n_entries >= _ptr_head->hth_load_limit)
        {
            event_io_map_HT_GROW(_ptr_head,
                                 _ptr_head->hth_n_entries + 1);
        }
 
#ifdef HT_CACHE_HASH_VALUES
        do{
            (&_key)->map_node.hte_hash = hashsocket((&_key));
        } while(0);
#endif
 
        //返回值ptr,是要查找节点的前驱节点的hte_next成员变量的地址.
        //所以返回值肯定不会为NULL,而*ptr就可能为NULL。说明hte_next
        //不指向任何节点。也正由于这个原因,所以即使*ptr 为NULL,但是可以
        //给*ptr赋值。此时,是修改前驱节点的hte_next成员变量的值,使之
        //指向另外一个节点。
        //这里调用_event_io_map_HT_FIND_P原因有二:1.查看该fd是否已经
        //插入过这个哈希表中。2.得到这个fd计算哈希位置。
        ptr = _event_io_map_HT_FIND_P(_ptr_head, (&_key));
 
        //在event_io_map这个哈希表中查找是否已经存在该fd的event_map_entry了
        //因为同一个fd可以调用event_new多次,然后event_add多次的。
        if (*ptr)
        {
            _ent = *ptr;
        }
        else
        {
            _ent = mm_calloc(1, sizeof(struct event_map_entry) + evsel->fdinfo_len);
            if (EVUTIL_UNLIKELY(_ent == NULL))
                return (-1);
 
            _ent->fd = fd;
  //调用初始化函数初始化这个evmap_io
            (evmap_io_init)(&_ent->ent.evmap_io);
 
#ifdef HT_CACHE_HASH_VALUES
            do
            {
                ent->map_node.hte_hash = (&_key)->map_node.hte_hash;
            }while(0);
#endif
            _ent->map_node.hte_next = NULL;
 
            //把这个新建的节点插入到哈希表中。ptr已经包含了哈希位置
            *ptr = _ent;
            ++(io->hth_n_entries);
        }
 
 
        //这里是获取该event_map_entry的next和prev指针。因为
        //evmap_io含有next、prev变量。这样在之后就可以把这个
        //event_map_entry连起来。这个外do{}while(0)的功能是
        //为这个fd分配一个event_map_entry,并且插入到现有的哈希
        //表中。同时,这个fd还是结构体event的一部分。而event必须
        //插入到event队列中。
        (ctx) = &_ent->ent.evmap_io;
 
    } while (0);
 
 
  ....
 
    //ctx->events是一个TAILQ_HEAD。结合之前讲到的TAILQ_QUEUE队列,
    //就可以知道:同一个fd,可能有多个event结构体。这里就把这些结构体连
    //起来。依靠的链表是,event结构体中的ev_io_next。ev_io_next是
    //一个TAILQ_ENTRY,具有前驱和后驱指针。队列头部为event_map_entry
    //结构体中的evmap_io成员的events成员。
    TAILQ_INSERT_TAIL(&ctx->events, ev, ev_io_next);
 
    return (retval);
}
        GET_IO_SLOT_AND_CTOR宏的作用就是让ctx指向struct event_map_entry结构体中的TAILQ_HEAD。这样就可以使用TAILQ_INSERT_TAIL宏,把ev变量插入到队列中。如果有现成的event_map_entry就直接使用,没有的话就新建一个。
————————————————
版权声明:本文为CSDN博主「luotuo44」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/luotuo44/article/details/38403241
posted @ 2019-11-12 17:26 长戟十三千 阅读(7) | 评论 (0)编辑 收藏

  在研究磁盘性能之前我们必须先了解磁盘的结构,以及工作原理。不过在这里就不再重复说明了,关系硬盘结构和工作原理的信息可以参考维基百科上面的相关词条——Hard disk drive(英文)和硬盘驱动器(中文)。

  读写IO(Read/Write IO)操作

  磁盘是用来给我们存取数据用的,因此当说到IO操作的时候,就会存在两种相对应的操作,存数据时候对应的是写IO操作,取数据的时候对应的是是读IO操作。

  单个IO操作

  当控制磁盘的控制器接到操作系统的读IO操作指令的时候,控制器就会给磁盘发出一个读数据的指令,并同时将要读取的数据块的地址传递给磁盘,然后磁盘会将读取到的数据传给控制器,并由控制器返回给操作系统,完成一个写IO的操作;同样的,一个写IO的操作也类似,控制器接到写的IO操作的指令和要写入的数据,并将其传递给磁盘,磁盘在数据写入完成之后将操作结果传递回控制器,再由控制器返回给操作系统,完成一个写IO的操作。单个IO操作指的就是完成一个写IO或者是读IO的操作。

  随机访问(Random Access)与连续访问(Sequential Access)

  随机访问指的是本次IO所给出的扇区地址和上次IO给出扇区地址相差比较大,这样的话磁头在两次IO操作之间需要作比较大的移动动作才能重新开始读/写数据。相反的,如果当次IO给出的扇区地址与上次IO结束的扇区地址一致或者是接近的话,那磁头就能很快的开始这次IO操作,这样的多个IO操作称为连续访问。因此尽管相邻的两次IO操作在同一时刻发出,但如果它们的请求的扇区地址相差很大的话也只能称为随机访问,而非连续访问。

  顺序IO模式(Queue Mode)/并发IO模式(Burst Mode)

  磁盘控制器可能会一次对磁盘组发出一连串的IO命令,如果磁盘组一次只能执行一个IO命令时称为顺序IO;当磁盘组能同时执行多个IO命令时,称为并发IO。并发IO只能发生在由多个磁盘组成的磁盘组上,单块磁盘只能一次处理一个IO命令。

2单个IO的大小(IO Chunk Size)

  单个IO的大小(IO Chunk Size)

  熟悉数据库的人都会有这么一个概念,那就是数据库存储有个基本的块大小(Block Size),不管是SQL Server还是Oracle,默认的块大小都是8KB,就是数据库每次读写都是以8k为单位的。那么对于数据库应用发出的固定8k大小的单次读写到了写磁盘这个层面会是怎么样的呢,就是对于读写磁盘来说单个IO操作操作数据的大小是多少呢,是不是也是一个固定的值?

  答案是不确定。首先操作系统为了提高 IO的性能而引入了文件系统缓存(File System Cache),系统会根据请求数据的情况将多个来自IO的请求先放在缓存里面,然后再一次性的提交给磁盘,也就是说对于数据库发出的多个8K数据块的读操作有可能放在一个磁盘读IO里就处理了。

  还有对于有些存储系统也是提供了缓存(Cache)的,接收到操作系统的IO请求之后也是会将多个操作系统的 IO请求合并成一个来处理。不管是操作系统层面的缓存还是磁盘控制器层面的缓存,目的都只有一个,提高数据读写的效率。因此每次单独的IO操作大小都是不一样的,它主要取决于系统对于数据读写效率的判断。

  当一次IO操作大小比较小的时候我们成为小的IO操作,比如说1K,4K,8K这样的;当一次IO操作的数据量比较的的时候称为大IO操作,比如说32K,64K甚至更大。

  在我们说到块大小(Block Size)的时候通常我们会接触到多个类似的概念,像我们上面提到的那个在数据库里面的数据最小的管理单位,Oralce称之为块(Block),大小一般为8K,SQL Server称之为页(Page),一般大小也为8k。

  在文件系统里面我们也能碰到一个文件系统的块,在现在很多的Linux系统中都是4K(通过 /usr/bin/time -v可以看到),它的作用其实跟数据库里面的块/页是一样的,都是为了方便数据的管理。但是说到单次IO的大小,跟这些块的大小都是没有直接关系的,在英文里单次IO大小通常被称为是IO Chunk Size,不会说成是IO Block Size的。

3IOPS(IO per Second)

  IOPS(IO per Second)

  IOPS,IO系统每秒所执行IO操作的次数,是一个重要的用来衡量系统IO能力的一个参数。对于单个磁盘组成的IO系统来说,计算它的IOPS不是一件很难的事情,只要我们知道了系统完成一次IO所需要的时间的话我们就能推算出系统IOPS来。

  现在我们就来推算一下磁盘的IOPS,假设磁盘的转速(Rotational Speed)为15K RPM,平均寻道时间为5ms,最大传输速率为40MB/s(这里将读写速度视为一样,实际会差别比较大)。

  对于磁盘来说一个完整的IO操作是这样进行的:当控制器对磁盘发出一个IO操作命令的时候,磁盘的驱动臂(Actuator Arm)带读写磁头(Head)离开着陆区(Landing Zone,位于内圈没有数据的区域),移动到要操作的初始数据块所在的磁道(Track)的正上方,这个过程被称为寻址(Seeking),对应消耗的时间被称为寻址时间(Seek Time);但是找到对应磁道还不能马上读取数据,这时候磁头要等到磁盘盘片(Platter)旋转到初始数据块所在的扇区(Sector)落在读写磁头正上方的之后才能开始读取数据,在这个等待盘片旋转到可操作扇区的过程中消耗的时间称为旋转延时(Rotational Delay);接下来就随着盘片的旋转,磁头不断的读/写相应的数据块,直到完成这次IO所需要操作的全部数据,这个过程称为数据传送(Data Transfer),对应的时间称为传送时间(Transfer Time)。完成这三个步骤之后一次IO操作也就完成了。

  在我们看硬盘厂商的宣传单的时候我们经常能看到3个参数,分别是平均寻址时间、盘片旋转速度以及最大传送速度,这三个参数就可以提供给我们计算上述三个步骤的时间。

  第一个寻址时间,考虑到被读写的数据可能在磁盘的任意一个磁道,既有可能在磁盘的最内圈(寻址时间最短),也可能在磁盘的最外圈(寻址时间最长),所以在计算中我们只考虑平均寻址时间,也就是磁盘参数中标明的那个平均寻址时间,这里就采用当前最多的10krmp硬盘的5ms。

  第二个旋转延时,和寻址一样,当磁头定位到磁道之后有可能正好在要读写扇区之上,这时候是不需要额外额延时就可以立刻读写到数据,但是最坏的情况确实要磁盘旋转整整一圈之后磁头才能读取到数据,所以这里我们也考虑的是平均旋转延时,对于10krpm的磁盘就是(60s/15k)*(1/2) = 2ms。

  第三个传送时间,磁盘参数提供我们的最大的传输速度,当然要达到这种速度是很有难度的,但是这个速度却是磁盘纯读写磁盘的速度,因此只要给定了单次 IO的大小,我们就知道磁盘需要花费多少时间在数据传送上,这个时间就是IO Chunk Size / Max Transfer Rate。

4IOPS计算公式

  IOPS计算公式

  现在我们就可以得出这样的计算单次IO时间的公式:

  IO Time = Seek Time + 60 sec/Rotational Speed/2 + IO Chunk Size/Transfer Rate

  于是我们可以这样计算出IOPS

  IOPS = 1/IO Time = 1/(Seek Time + 60 sec/Rotational Speed/2 + IO Chunk Size/Transfer Rate)

  对于给定不同的IO大小我们可以得出下面的一系列的数据

  4K (1/7.1 ms = 140 IOPS)

  5ms + (60sec/15000RPM/2) + 4K/40MB = 5 + 2 + 0.1 = 7.1

  8k (1/7.2 ms = 139 IOPS)

  5ms + (60sec/15000RPM/2) + 8K/40MB = 5 + 2 + 0.2 = 7.2

  16K (1/7.4 ms = 135 IOPS)

  5ms + (60sec/15000RPM/2) + 16K/40MB = 5 + 2 + 0.4 = 7.4

  32K (1/7.8 ms = 128 IOPS)

  5ms + (60sec/15000RPM/2) + 32K/40MB = 5 + 2 + 0.8 = 7.8

  64K (1/8.6 ms = 116 IOPS)

  5ms + (60sec/15000RPM/2) + 64K/40MB = 5 + 2 + 1.6 = 8.6

  从上面的数据可以看出,当单次IO越小的时候,单次IO所耗费的时间也越少,相应的IOPS也就越大。

  上面我们的数据都是在一个比较理想的假设下得出来的,这里的理想的情况就是磁盘要花费平均大小的寻址时间和平均的旋转延时,这个假设其实是比较符合我们实际情况中的随机读写,在随机读写中,每次IO操作的寻址时间和旋转延时都不能忽略不计,有了这两个时间的存在也就限制了IOPS的大小。现在我们考虑一种相对极端的顺序读写操作,比如说在读取一个很大的存储连续分布在磁盘的的文件,因为文件的存储的分布是连续的,磁头在完成一个读IO操作之后,不需要从新的寻址,也不需要旋转延时,在这种情况下我们能到一个很大的IOPS值,如下

  4K (1/0.1 ms = 10000 IOPS)

  0ms + 0ms + 4K/40MB = 0.1

  8k (1/0.2 ms = 5000 IOPS)

  0ms + 0ms + 8K/40MB = 0.2

  16K (1/0.4 ms = 2500 IOPS)

  0ms + 0ms + 16K/40MB = 0.4

  32K (1/0.8 ms = 1250 IOPS)

  0ms + 0ms + 32K/40MB = 0.8

  64K (1/1.6 ms = 625 IOPS)

  0ms + 0ms + 64K/40MB = 1.6

  相比第一组数据来说差距是非常的大的,因此当我们要用IOPS来衡量一个IO系统的系能的时候我们一定要说清楚是在什么情况的IOPS,也就是要说明读写的方式以及单次IO的大小,当然在实际当中,特别是在OLTP的系统的,随机的小IO的读写是最有说服力的。

5传输速度/吞吐率

  传输速度(Transfer Rate)/吞吐率(Throughput)

  现在我们要说的传输速度(另一个常见的说法是吞吐率)不是磁盘上所表明的最大传输速度或者说理想传输速度,而是磁盘在实际使用的时候从磁盘系统总线上流过的数据量。有了IOPS数据之后我们是很容易就能计算出对应的传输速度来的

  Transfer Rate = IOPS * IO Chunk Size

  还是那上面的第一组IOPS的数据我们可以得出相应的传输速度如下

  4K: 140 * 4K = 560K / 40M = 1.36%

  8K: 139 * 8K = 1112K / 40M = 2.71%

  16K: 135 * 16K = 2160K / 40M = 5.27%

  32K: 116 * 32K = 3712K / 40M = 9.06%

  可以看出实际上的传输速度是很小的,对总线的利用率也是非常的小。

  这里一定要明确一个概念,那就是尽管上面我们使用IOPS来计算传输速度,但是实际上传输速度和IOPS是没有直接关系,在没有缓存的情况下它们共同的决定因素都是对磁盘系统的访问方式以及单个IO的大小。对磁盘进行随机访问时候我们可以利用IOPS来衡量一个磁盘系统的性能,此时的传输速度不会太大;但是当对磁盘进行连续访问时,此时的IOPS已经没有了参考的价值,这个时候限制实际传输速度却是磁盘的最大传输速度。因此在实际的应用当中,只会用IOPS来衡量小IO的随机读写的性能,而当要衡量大IO连续读写的性能的时候就要采用传输速度而不能是IOPS了。

6IO响应时间

  IO响应时间(IO Response Time)

  最后来关注一下能直接描述IO性能的IO响应时间。IO响应时间也被称为IO延时(IO Latency),IO响应时间就是从操作系统内核发出的一个读或者写的IO命令到操作系统内核接收到IO回应的时间,注意不要和单个IO时间混淆了,单个IO时间仅仅指的是IO操作在磁盘内部处理的时间,而IO响应时间还要包括IO操作在IO等待队列中所花费的等待时间。

  计算IO操作在等待队列里面消耗的时间有一个衍生于利托氏定理(Little’s Law)的排队模型M/M/1模型可以遵循,由于排队模型算法比较复杂,到现在还没有搞太明白(如果有谁对M/M/1模型比较精通的话欢迎给予指导),这里就罗列一下最后的结果,还是那上面计算的IOPS数据来说:

  8K IO Chunk Size (135 IOPS, 7.2 ms)

  135 => 240.0 ms

  105 => 29.5 ms

  75 => 15.7 ms

  45 => 10.6 ms

  64K IO Chunk Size(116 IOPS, 8.6 ms)

  135 => 没响应了……

  105 => 88.6 ms

  75 => 24.6 ms

  45 => 14.6 ms

  从上面的数据可以看出,随着系统实际IOPS越接近理论的最大值,IO的响应时间会成非线性的增长,越是接近最大值,响应时间就变得越大,而且会比预期超出很多。一般来说在实际的应用中有一个70%的指导值,也就是说在IO读写的队列中,当队列大小小于最大IOPS的70%的时候,IO的响应时间增加会很小,相对来说让人比较能接受的,一旦超过70%,响应时间就会戏剧性的暴增,所以当一个系统的IO压力超出最大可承受压力的70%的时候就是必须要考虑调整或升级了。

  另外补充说一下这个70%的指导值也适用于CPU响应时间,这也是在实践中证明过的,一旦CPU超过70%,系统将会变得受不了的慢。很有意思的东西。

posted @ 2019-10-23 16:29 长戟十三千 阅读(12) | 评论 (0)编辑 收藏
     摘要: 一、术语解释脏页:linux内核中的概念,因为硬盘的读写速度远赶不上内存的速度,系统就把读写比较频繁的数据事先放到内存中,以提高读写速度,这就叫高速缓存,linux是以页作为高速缓存的单位,当进程修改了高速缓存里的数据时,该页就被内核标记为脏页,内核将会在合适的时间把脏页的数据写到磁盘中去,以保持高速缓存中的数据和磁盘中的数据是一致的。内存映射:内存映射文件,是由一个文件到一块内存的映射。Win3...  阅读全文
posted @ 2019-10-23 15:52 长戟十三千 阅读(17) | 评论 (0)编辑 收藏

既前篇nodejs深入学习系列之libuv基础篇(一)学习的基本概念之后,我们在第二篇将带大家去学习为什么libuv的并发能力这么优秀?这并发后面的实现机制是什么?

3、libuv的事件循环机制

好了,了解了上述的基本概念之后,我们来扯一扯Libuv的事件循环机制,也就是event-loop。还是以[译文]libuv设计思想概述一文展示的两张图片,再结合代码来学习整个Libuv的事件循环机制。

3.1、解密第一张图片

首先是第一张图片:

细心的童鞋会发现这张图片被我用红框分割成了两部分,为什么呢?因为Libuv处理fs I/O和网络I/O用了两套机制去实现,或者说更全面的讲应该是fs I/O和 DNS等实现的方式和网络 I/O是不一样的。为什么这么说呢?请看下图,你就会明白了:

上图左侧是libuv的两大基石:event-loop线程和thread pool。而从图的右侧有两条轨迹分别连接到这两个基石,我特别用红色加粗标记,可以看到:

  • Network I/O最后的调用都会归结到uv__io_start这个函数,而该函数会将需要执行的I/O事件和回调塞到watcher队列中,之后uv_run函数执行的Poll for I/O阶段做的便是从watcher队列中取出事件调用系统的接口,这是其中一条主线
  • Fs I/O和DNS的所有操作都会归结到调用uv__work_sumit这个函数,而该函数就是执行线程池初始化并调度的终极函数。这是另外一条主线。

3.2、解密第二张图片

接着我们来看第二张图片,我们依然将该图片进行改造如下:

整个事件循环的执行主体是在uv_run中,每一次的循环经历的阶段对应的函数在上图中已经标注出来,有几个重点要说一下:

  1. 循环是否退出(也就是进程是否结束)取决于以下几个条件中的一个

    1.1、loop->stop_flag变为1并且uv__loop_alive返回不为0,也就是调用uv_stop函数并且loop不存在活跃的和被引用的句柄、活跃的请求或正在关闭的句柄。

    1.2、事件循环运行模式等于UV_RUN_ONCE或者是UV_RUN_NOWAIT

  2. I/O循环的超时时间的确定:

    2.1、如果时间循环运行模式是UV_RUN_NOWAIT,超时为0。

    2.2、如果循环将要停止(代码调用了uv_stop()),超时为0。

    2.3、如果没有活跃句柄或请求,超时为0。

    2.4、如果有任何Idle句柄处于活跃状态,超时为0。

    2.5、如果有等待关闭的句柄,超时为0。

    2.6、如果以上情况都不匹配,则采用最近的计时器的超时时间-当前时间(handle->timeout-loop->time),或者如果没有活动计时器,则为无穷大(即返回-1)。

  3. I/O循环的实现主体uv__io_poll根据系统不同,使用方式不一样,如果对linux系统熟悉的话,epoll方式应该也会了解。更多epoll的只是可以参考该文章:Linux IO模式及 select、poll、epoll详解

4、libuv的线程池

说完时间循环的主线程,接下去我们继续揭秘libuv的线程池。

libuv提供了一个threadpool,可用来运行用户代码并在事件循环线程(event-loop)中得到通知。这个线程池在内部用于运行所有文件系统操作,以及getaddrinfo和getnameinfo请求。当然如果你想要将自己的代码放在线程池中运行也是可以的,libuv提供除了uv_queue_work的方法供开发者自己选择。

它的默认大小为4,但是可以在启动时通过将UV_THREADPOOL_SIZE环境变量设置为任意值(最大值为1024)来更改它。

threadpool是全局的,并在所有事件循环中共享。当一个特定的函数使用threadpool(即当使用uv_queue_work())时,libuv预先分配并初始化UV_THREADPOOL_SIZE所允许的最大线程数。这导致了相对较小的内存开销(128个线程大约1MB),但在运行时提高了线程的性能。

关于线程的操作,demo中的文件是:传送门

在实例中,我们用了三种方式来实现和线程相关的一些操作:

  1. 从线程池中调度一个线程运行回调: uv_queue_work
  2. 使用uv_async_send来“唤醒” event loop主线程并执行uv_async_init当初设置好的回调
  3. 使用uv_thread_create手动创建一个线程来执行

我们在上一节中知道,想要创建线程池并让他们工作,唯一绕不开的函数是uv__work_submit,大家可以在libuv源码中搜寻这个,可以发现能够找到的也就这几个文件:(以unix系统为例)

threadpool.c   1. uv__work_submit实现地方   2. uv_queue_work调用 fs.c   1. 宏定义POST调用,所有的fs操作都会调用POST这个宏 getaddrinfo.c   1. uv_getaddrinfo调用 getnameinfo.c   1. uv_getnameinfo调用 

细心的童鞋发现,每一处调用的地方都会传一个叫做enum uv__work_kind kind的操作,根据上面的调用,可以看出分为了3种任务类型:

  • UV__WORK_CPU:CPU 密集型,UV_WORK 类型的请求被定义为这种类型。因此根据这个分类,不推荐在 uv_queue_work 中做 I/O 密集的操作。
  • UV__WORK_FAST_IO:快 IO 型,UV_FS 类型的请求被定义为这种类型。
  • UV__WORK_SLOW_IO:慢 IO 型,UV_GETADDRINFO 和 UV_GETNAMEINFO 类型的请求被定义为这种类型

4.2、线程池的初始化

学习线程池初始化之前,我们先得普及一下线程间的同步原语。这样后面看的代码才不会糊里糊涂

libuv提供了mutex锁读写锁信号量(Semaphores)条件量(Conditions)屏障(Barriers)五种手段来实现线程间资源竞争互斥同步等操作。接下去会简单地介绍,以便待会的初始化流程可以读懂。

4.2.1、Mutex锁

互斥锁用于对资源的互斥访问,当你访问的内存资源可能被别的线程访问到,这个时候你就可以考虑使用互斥锁,在访问的时候锁住。对应的使用流程可能是这样的:

  • 初始化互斥锁:uv_mutex_init(uv_mutex_t* handle)
  • 锁住互斥资源:uv_mutex_lock(uv_mutex_t* handle)
  • 解锁互斥资源:uv_mutex_unlock(uv_mutex_t* handle)

在线程初始化的过程中,我们会初始化一个全局的互斥锁:

static void init_threads(void) {   ...   if (uv_mutex_init(&mutex))     abort()   ... } 

而后在每个线程的执行实体worker函数中,就使用互斥锁对下面几个公共资源进行锁住与解锁:

  • 请求队列 wq:线程池收到 UVWORK_CPU 和 UVWORK_FAST_IO 类型的请求后将其插到此队列的尾部,并通过 uv_cond_signal 唤醒 worker 线程去处理,这是线程池请求的主队列。
  • 慢 I/O 队列 slow_io_pending_wq:线程池收到 UV__WORK_SLOW_IO 类型的请求后将其插到此队列的尾部。
  • 慢 I/O 标志位节点 run_slow_work_message:当存在慢 I/O 请求时,用来作为一个标志位放在请求队列 wq 中,表示当前有慢 I/O 请求,worker 线程处理请求时需要关注慢 I/O 队列的请求;当慢 I/O 队列的请求都处理完毕后这个标志位将从请求队列 wq 中移除。
static void worker(void* arg) {   ...   uv_mutex_lock(&mutex);    ...   uv_mutex_unlock(&mutex); } 

4.2.2、读写锁

读写锁没有用在线程的启动过程中,我们在demo中用来实践对某个全局变量的访问。具体使用步骤参考代码,这里就不再赘述。

4.2.3、信号量

信号量是一种专门用于提供不同进程间或线程间同步手段的原语。信号量本质上是一个非负整数计数器,代表共享资源的数目,通常是用来控制对共享资源的访问。一般使用步骤是这样的:

  • 初始化信号量:int uv_sem_init(uv_sem_t* sem, unsigned int value)
  • 信号量加1:void uv_sem_wait(uv_sem_t* sem)
  • 信号量减1:void uv_sem_post(uv_sem_t* sem)
  • 信号量销毁:void uv_sem_wait(uv_sem_t* sem)

在线程池初始化过程中,我们利用信号量来等待所有的线程初始化结束,如下代码:

static void init_threads(void) {   ...   for (i = 0; i < nthreads; i++)     uv_sem_wait(&sem);    uv_sem_destroy(&sem); }  // 而每个线程的执行实体都会去将信号量-1: static void worker(void* arg) {   struct uv__work* w;   QUEUE* q;   int is_slow_work;    uv_sem_post((uv_sem_t*) arg);   ... } 

这样只要所有的线程没有初始化完成,uv_sem_destroy这个函数是不会执行到的,整个初始化函数也不会返回,此时的主线程也就阻塞在这里了。

4.2.4、条件变量

而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足。条件变量的内部实质上是一个等待队列,放置等待(阻塞)的线程,线程在条件变量上等待和通知,互斥锁用来保护等待队列(因为所有的线程都可以放入等待队列,所以等待队列成为了一个共享的资源,需要被上锁保护),因此条件变量通常和互斥锁一起使用。一般使用步骤是这样的:

  • 初始化条件变量:int uv_cond_init(uv_cond_t* cond)
  • 线程阻塞等待被唤醒:void uv_cond_wait(uv_cond_t cond, uv_mutex_t mutex)
  • 别的线程唤醒阻塞的线程:void uv_cond_signal(uv_cond_t* cond)

libuv使用条件变量来阻塞线程池和唤醒线程池,使用代码如下:

static void init_threads(void) {   if (uv_cond_init(&cond))     abort(); }  static void worker(void* arg) {   ...   for (;;) {     /* `mutex` should always be locked at this point. */      /* Keep waiting while either no work is present or only slow I/O        and we're at the threshold for that. */     while (QUEUE_EMPTY(&wq) ||            (QUEUE_HEAD(&wq) == &run_slow_work_message &&             QUEUE_NEXT(&run_slow_work_message) == &wq &&             slow_io_work_running >= slow_work_thread_threshold())) {       idle_threads += 1;       uv_cond_wait(&cond, &mutex);       idle_threads -= 1;     }     ...   } } static void post(QUEUE* q, enum uv__work_kind kind) {   ...   if (idle_threads > 0)     uv_cond_signal(&cond)   ... } 

从上面三处代码可以看到线程启动之后就进入阻塞状态,直到有I/O请求调用uv_cond_signal来唤醒,按照uv_cond_wait调用的顺序形成一个等待队列,循环调用。

4.2.5、屏障

在多线程的时候,我们总会碰到一个需求,就是需要等待一组进程全部执行完毕后再执行某些事,由于多线程是乱序的,无法预估线程都执行到哪里了,这就要求我们有一个屏障作为同步点,在所有有屏障的地方都会阻塞等待,直到所有的线程都的代码都执行到同步点,再继续执行后续代码。使用步骤一般是:

  • 初始化屏障需要达到的个数:int uv_barrier_init(uv_barrier_t* barrier, unsigned int count)
  • 每当达到条件便将计数+1:int uv_barrier_wait(uv_barrier_t* barrier)
  • 销毁屏障:void uv_barrier_destroy(uv_barrier_t* barrier)

只有当初始化计数的值为0,主线程才会继续执行,具体使用方法可以参考demo。

至此借助于线程间同步原语,我们就哗啦啦地把线程的初始化以及大概的工作机制讲完了,总结出了下面一张图:

4.1、线程池工作调度

线程池的工作利用的是主线程post函数和各个线程的worker函数,post函数的工作内容如下:

  • 判断请求的请求类型是否是 UV__WORK_SLOW_IO:
    • 如果是,将这个请求插到慢 I/O 请求队列 slow_io_pending_wq 的尾部,同时在请求队列 wq 的尾部插入一个 run_slow_work_message 节点作为标志位,告知请求队列 wq 当前存在慢 I/O 请求。
    • 如果不是,将请求插到请求队列 wq 尾部。
  • 如果有空闲的线程,唤醒某一个去执行请求。

并发的慢 I/O 的请求数量不会超过线程池大小的一半,这样做的好处是避免多个慢 I/O 的请求在某段时间内把所有线程都占满,导致其它能够快速执行的请求需要排队。

static unsigned int slow_work_thread_threshold(void) {   return (nthreads + 1) / 2; } 

而各个线程的工作内容如下:

  • 等待唤醒。
  • 取出请求队列 wq 或者慢 I/O 请求队列的头部请求去执行。 => w->work(w);
  • 通知 uv loop 线程完成了一个请求的处理。=> uv_async_send
  • 回到最开始循环的位置。

4.2、线程间的通信

上一小节清晰地描述了libuv的主线程是如何将请求分给各个线程以及线程是如何处理请求的,那么上述过程中还有一个步骤:线程池里面的线程完成工作之后是如何通知主线程的?主线程收到通知之后又继续做了些什么?

这个过程我们称之为线程间的通信。上一小节中或者我们的demo中已经知道,完成这个事情的主要函数是uv_async_send,那么这个函数是如何实现的呢?请看下图:

从图中我们可以看到,借助于io poll与管道,线程池的线程写入数据,被主线程轮询出来,知道有消息过来,就开始执行对应的回调函数。整个流程就是这么easy~

posted @ 2019-10-17 15:30 长戟十三千 阅读(11) | 评论 (0)编辑 收藏
     摘要: 学习完nodejs基石之一的v8基础篇(还没看过的童鞋请跳转到这里:nodejs深入学习系列之v8基础篇),我们这次将要继续学习另外一块基石:libuv。关于libuv的设计思想,我已经翻译成中文,还没看过的童鞋还是请跳转到这里: [译文]libuv设计思想概述,如果还没看完这篇文章的童鞋,下面的内容也不建议细看了,因为会有”代沟“的问题~本文的所有示例代码都可以...  阅读全文
posted @ 2019-10-17 15:30 长戟十三千 阅读(25) | 评论 (0)编辑 收藏
     摘要: libuv实现了一个线程池,该线程池在用户提交了第一个任务的时候初始化,而不是系统启动的时候就初始化。入口代码如下。static void init_once(void) { #ifndef _WIN32 /* Re-initialize the threadpool after fork. * Note that this discards the global mutex and c...  阅读全文
posted @ 2019-10-14 15:46 长戟十三千 阅读(23) | 评论 (0)编辑 收藏
仅列出标题  下一页