loop_in_codes

低调做技术__欢迎移步我的独立博客 codemaro.com 微博 kevinlynx

2015年7月4日 #

写了一个分布式名字服务JCM

之前在公司里维护了一个名字服务,这个名字服务日常管理了近4000台机器,有4000个左右的客户端连接上来获取机器信息,由于其基本是一个单点服务,所以某些模块接近瓶颈。后来倒是有重构计划,详细设计做了,代码都写了一部分,结果由于某些原因重构就被终止了。

JCM是我业余时间用Java重写的一个版本,功能上目前只实现了基础功能。由于它是个完全分布式的架构,所以理论上可以横向扩展,大大增强系统的服务能力。

名字服务

在分布式系统中,某个服务为了提升整体服务能力,通常部署了很多实例。这里我把这些提供相同服务的实例统称为集群(cluster),每个实例称为一个节点(Node)。一个应用可能会使用很多cluster,每次访问一个cluster时,就通过名字服务获取该cluster下一个可用的node。那么,名字服务至少需要包含的功能:

  • 根据cluster名字获取可用的node
  • 对管理的所有cluster下的所有node进行健康度的检测,以保证始终返回可用的node

有些名字服务仅对node管理,不参与应用与node间的通信,而有些则可能作为应用与node间的通信转发器。虽然名字服务功能简单,但是要做一个分布式的名字服务还是比较复杂的,因为数据一旦分布式了,就会存在同步、一致性问题的考虑等。

What’s JCM

JCM围绕前面说的名字服务基础功能实现。包含的功能:

  • 管理cluster到node的映射
  • 分布式架构,可水平扩展以实现管理10,000个node的能力,足以管理一般公司的后台服务集群
  • 对每个node进行健康检查,健康检查可基于HTTP协议层的检测或TCP连接检测
  • 持久化cluster/node数据,通过zookeeper保证数据一致性
  • 提供JSON HTTP API管理cluster/node数据,后续可提供Web管理系统
  • 以库的形式提供与server的交互,库本身提供各种负载均衡策略,保证对一个cluster下node的访问达到负载均衡

项目地址git jcm

JCM主要包含两部分:

  • jcm.server,JCM名字服务,需要连接zookeeper以持久化数据
  • jcm.subscriber,客户端库,负责与jcm.server交互,提供包装了负载均衡的API给应用使用

架构

基于JCM的系统整体架构如下:

simple-arch.jpg

cluster本身是不需要依赖JCM的,要通过JCM使用这些cluster,只需要通过JCM HTTP API注册这些cluster到jcm.server上。要通过jcm.server使用这些cluster,则是通过jcm.subscriber来完成。

使用

可参考git READMe.md

需要jre1.7+

  1. 启动zookeeper
  2. 下载jcm.server git jcm.server-0.1.0.jar
  3. jcm.server-0.1.0.jar目录下建立config/application.properties文件进行配置,参考config/application.properties
  4. 启动jcm.server

     java -jar jcm.server-0.1.0.jar
    
  5. 注册需要管理的集群,参考cluster描述:doc/cluster_sample.json,通过HTTP API注册:

     curl -i -X POST http://10.181.97.106:8080/c -H "Content-Type:application/json" --data-binary @./doc/cluster_sample.json
    

部署好了jcm.server,并注册了cluster后,就可以通过jcm.subscriber使用:

// 传入需要使用的集群名hello9/hello,以及传入jcm.server地址,可以多个:127.0.0.1:8080
Subscriber subscriber = new Subscriber( Arrays.asList("127.0.0.1:8080"), Arrays.asList("hello9", "hello"));
// 使用轮询负载均衡策略
RRAllocator rr = new RRAllocator();
subscriber.addListener(rr);
subscriber.startup();
for (int i = 0; i < 2; ++i) {
  // rr.alloc 根据cluster名字获取可用的node
  System.out.println(rr.alloc("hello9", ProtoType.HTTP));
}
subscriber.shutdown();

JCM实现

JCM目前的实现比较简单,参考模块图:

impl-module

  • model,即cluster/node这些数据结构的描述,同时被jcm.server和jcm.subscriber依赖
  • storage,持久化数据到zookeeper,同时包含jcm.server实例之间的数据同步
  • health check,健康检查模块,对各个node进行健康检查

以上模块都不依赖Spring,基于以上模块又有:

  • http api,使用spring-mvc,包装了一些JSON HTTP API
  • Application,基于spring-boot,将各个基础模块组装起来,提供standalone的模式启动,不用部署到tomcat之类的servlet容器中

jcm.subscriber的实现更简单,主要是负责与jcm.server进行通信,以更新自己当前的model层数据,同时提供各种负载均衡策略接口:

  • subscriber,与jcm.server通信,定期增量拉取数据
  • node allocator,通过listener方式从subscriber中获取数据,同时实现各种负载均衡策略,对外统一提供alloc node的接口

接下来看看关键功能的实现

数据同步

既然jcm.server是分布式的,每一个jcm.server instance(实例)都是支持数据读和写的,那么当jcm.server管理着一堆cluster上万个node时,每一个instance是如何进行数据同步的?jcm.server中的数据主要有两类:

  • cluster本身的数据,包括cluster/node的描述,例如cluster name、node IP、及其他附属数据
  • node健康检查的数据

对于cluster数据,因为cluster对node的管理是一个两层的树状结构,而对cluster有增删node的操作,所以并不能在每一个instance上都提供真正的数据写入,这样会导致数据丢失。假设同一时刻在instance A和instance B上同时对cluster c1添加节点N1和N2,那么instance A写入c1(N1),而instance B还没等到数据同步就写入c1(N2),那么c1(N1)就被覆盖为c1(N2),从而导致添加的节点N1丢失。

所以,jcm.server instance是分为leaderfollower的,真正的写入操作只有leader进行,follower收到写操作请求时转发给leader。leader写数据优先更新内存中的数据再写入zookeeper,内存中的数据更新当然是需要加锁互斥的,从而保证数据的正确性。

write-watch.jpg

leader和follower是如何确定角色的?这个很简单,标准的利用zookeeper来进行主从选举的实现

jcm.server instance数据间的同步是基于zookeeper watch机制的。这个可以算做是一个JCM的一个瓶颈,每一个instance都会作为一个watch,使得实际上jcm.server并不能无限水平扩展,扩展到一定程度后,watch的效率就可能不足以满足性能了,参考zookeeper节点数与watch的性能测试 (那个时候我就在考虑对我们系统的重构了) 。

jcm.server中对node健康检查的数据采用同样的同步机制,但node健康检查数据是每一个instance都会写入的,下面看看jcm.server是如何通过分布式架构来分担压力的。

健康检查

jcm.server的另一个主要功能的是对node的健康检查,jcm.server集群可以管理几万的node,既然已经是分布式了,那么显然是要把node均分到多个instance的。这里我是以cluster来分配的,方法就是简单的使用一致性哈希。通过一致性哈希,决定一个cluster是否属于某个instance负责。每个instance都有一个server spec,也就是该instance对外提供服务的地址(IP+port),这样在任何一个instance上,它看到的所有instance server spec都是相同的,从而保证在每一个instance上计算cluster的分配得到的结果都是一致的。

健康检查按cluster划分,可以简化数据的写冲突问题,在正常情况下,每个instance写入的健康检查结果都是不同的。

health-check.jpg

健康检查一般以1秒的频率进行,jcm.server做了优化,当检查结果和上一次一样时,并不写入zookeeper。写入的数据包含了node的完整key (IP+Port+Spec),这样可以简化很多地方的数据同步问题,但会增加写入数据的大小,写入数据的大小是会影响zookeeper的性能的,所以这里简单地对数据进行了压缩。

健康检查是可以支持多种检查实现的,目前只实现了HTTP协议层的检查。健康检查自身是单个线程,在该线程中基于异步HTTP库,发起异步请求,实际的请求在其他线程中发出。

jcm.subscriber通信

jcm.subscriber与jcm.server通信,主要是为了获取最新的cluster数据。subscriber初始化时会拿到一个jcm.server instance的地址列表,访问时使用轮询策略以平衡jcm.server在处理请求时的负载。subscriber每秒都会请求一次数据,请求中描述了本次请求想获取哪些cluster数据,同时携带一个cluster的version。每次cluster在server变更时,version就变更(时间戳)。server回复请求时,如果version已最新,只需要回复node的状态。

subscriber可以拿到所有状态的node,后面可以考虑只拿正常状态的node,进一步减少数据大小。

压力测试

目前只对健康检查部分做了压测,详细参考test/benchmark.md。在A7服务器上测试,发现写zookeeper及zookeeper的watch足以满足要求,jcm.server发起的HTTP请求是主要的性能热点,单jcm.server instance大概可以承载20000个node的健康监测。

网络带宽:

1
2
3
4
5
Time              ---------------------traffic--------------------
Time               bytin  bytout   pktin  pktout  pkterr  pktdrp
01/07/15-21:30:48   3.2M    4.1M   33.5K   34.4K    0.00    0.00
01/07/15-21:30:50   3.3M    4.2M   33.7K   35.9K    0.00    0.00
01/07/15-21:30:52   2.8M    4.1M   32.6K   41.6K    0.00    0.00

CPU,通过jstack查看主要的CPU消耗在HTTP库实现层,以及健康检查线程:

1
2
3
4
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
13301 admin     20   0 13.1g 1.1g  12m R 76.6  2.3   2:40.74 java         httpchecker
13300 admin     20   0 13.1g 1.1g  12m S 72.9  2.3   0:48.31 java
13275 admin     20   0 13.1g 1.1g  12m S 20.1  2.3   0:18.49 java

代码中增加了些状态监控:

1
checker HttpChecker stat count 20 avg check cost(ms) 542.05, avg flush cost(ms) 41.35

表示平均每次检查耗时542毫秒,写数据因为开启了cache没有参考价值。

虽然还可以从我自己的代码中做不少优化,但既然单机可以承载20000个节点的检测,一般的应用远远足够了。

总结

名字服务在分布式系统中虽然是基础服务,但往往承担了非常重要的角色,数据同步出现错误、节点状态出现瞬时的错误,都可能对整套系统造成较大影响,业务上出现较大故障。所以名字服务的健壮性、可用性非常重要。实现中需要考虑很多异常情况,包括网络不稳定、应用层的错误等。为了提高足够的可用性,一般还会加多层的数据cache,例如subscriber端的本地cache,server端的本地cache,以保证在任何情况下都不会影响应用层的服务。

posted @ 2015-07-04 17:50 Kevin Lynx 阅读(1314) | 评论 (0)编辑 收藏

2015年5月5日 #

无锁有序链表的实现

无锁有序链表可以保证元素的唯一性,使其可用于哈希表的桶,甚至直接作为一个效率不那么高的map。普通链表的无锁实现相对简单点,因为插入元素可以在表头插,而有序链表的插入则是任意位置。

本文主要基于论文High Performance Dynamic Lock-Free Hash Tables实现。

主要问题

链表的主要操作包含insertremove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例:

struct node_t {
        key_t key;
        value_t val;
        node_t *next;
    };

    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        while (item) {
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        } 
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }

    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 如果pred本身被移除了
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
    }

    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return TRUE;
            }
            // B. 如果pred被移除;如果item也被移除
            if (CAS(&pred->next, item, item->next)) {
                haz_free(item);
                return TRUE;
            }
        }
    }

l_find函数返回查找到的前序元素和元素本身,代码A和B虽然拿到了preditem,但在CAS的时候,其可能被其他线程移除。甚至,在l_find过程中,其每一个元素都可能被移除。问题在于,任何时候拿到一个元素时,都不确定其是否还有效。元素的有效性包括其是否还在链表中,其指向的内存是否还有效。

解决方案

通过为元素指针增加一个有效性标志位,配合CAS操作的互斥性,就可以解决元素有效性判定问题。

因为node_t放在内存中是会对齐的,所以指向node_t的指针值低几位是不会用到的,从而可以在低几位里设置标志,这样在做CAS的时候,就实现了DCAS的效果,相当于将两个逻辑上的操作变成了一个原子操作。想象下引用计数对象的线程安全性,其内包装的指针是线程安全的,但对象本身不是。

CAS的互斥性,在若干个线程CAS相同的对象时,只有一个线程会成功,失败的线程就可以以此判定目标对象发生了变更。改进后的代码(代码仅做示例用,不保证正确):

typedef size_t markable_t;
    // 最低位置1,表示元素被删除
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define MARK(p) ((markable_t)p | 0x01)
    #define STRIP_MARK(p) ((markable_t)p & ~0x01)

    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) { 
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 虽然find拿到了合法的pred,但是在以下代码之前pred可能被删除,此时pred->next被标记
            //    pred->next != item,该CAS会失败,失败后重试
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
        return FALSE;
    }

    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            node_t *inext = item->next;
            // B. 删除item前先标记item->next,如果CAS失败,那么情况同insert一样,有其他线程在find之后
            //    删除了item,失败后重试
            if (!CAS(&item->next, inext, MARK(inext))) {
                continue;
            }
            // C. 对同一个元素item删除时,只会有一个线程成功走到这里
            if (CAS(&pred->next, item, STRIP_MARK(item->next))) {
                haz_defer_free(item);
                return TRUE;
            }
        }
        return FALSE;
    }

    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        hazard_t *hp1 = haz_get(0);
        hazard_t *hp2 = haz_get(1);
        while (item) {
            haz_set_ptr(hp1, pred);
            haz_set_ptr(hp2, item);
            /* 
             如果已被标记,那么紧接着item可能被移除链表甚至释放,所以需要重头查找
            */
            if (HAS_MARK(item->next)) { 
                return l_find(pred_ptr, item_ptr, head, key);
            }
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        } 
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }

haz_gethaz_set_ptr之类的函数是一个hazard pointer实现,用于支持多线程下内存的GC。上面的代码中,要删除一个元素item时,会标记item->next,从而使得insert时中那个CAS不需要做任何调整。总结下这里的线程竞争情况:

  • insertfind到正常的preditempred->next == item,然后在CAS前有线程删除了pred,此时pred->next == MARK(item)CAS失败,重试;删除分为2种情况:a) 从链表移除,得到标记,pred可继续访问;b) pred可能被释放内存,此时再使用pred会错误。为了处理情况b,所以引入了类似hazard pointer的机制,可以有效保障任意一个指针p只要还有线程在使用它,它的内存就不会被真正释放
  • insert中有多个线程在pred后插入元素,此时同样由insert中的CAS保证,这个不多说
  • remove中情况同insertfind拿到了有效的prednext,但在CAS的时候pred被其他线程删除,此时情况同insertCAS失败,重试
  • 任何时候改变链表结构时,无论是remove还是insert,都需要重试该操作
  • find中遍历时,可能会遇到被标记删除的item,此时item根据remove的实现很可能被删除,所以需要重头开始遍历

ABA问题

ABA问题还是存在的,insert中:

if (CAS(&pred->next, item, new_item)) {
        return TRUE;
    }

如果CAS之前,pred后的item被移除,又以相同的地址值加进来,但其value变了,此时CAS会成功,但链表可能就不是有序的了。pred->val < new_item->val > item->val

为了解决这个问题,可以利用指针值地址对齐的其他位来存储一个计数,用于表示pred->next的改变次数。当insert拿到pred时,pred->next中存储的计数假设是0,CAS之前其他线程移除了pred->next又新增回了item,此时pred->next中的计数增加,从而导致insertCAS失败。

// 最低位留作删除标志
    #define MASK ((sizeof(node_t) - 1) & ~0x01)

    #define GET_TAG(p) ((markable_t)p & MASK)
    #define TAG(p, tag) ((markable_t)p | (tag))
    #define MARK(p) ((markable_t)p | 0x01)
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define STRIP_MARK(p) ((node_t*)((markable_t)p & ~(MASK | 0x01)))

remove的实现:

/* 先标记再删除 */
    if (!CAS(&sitem->next, inext, MARK(inext))) {
        continue;
    }
    int tag = GET_TAG(pred->next) + 1;
    if (CAS(&pred->next, item, TAG(STRIP_MARK(sitem->next), tag))) {
        haz_defer_free(sitem);
        return TRUE;
    }

insert中也可以更新pred->next的计数。

总结

无锁的实现,本质上都会依赖于CAS的互斥性。从头实现一个lock free的数据结构,可以深刻感受到lock free实现的tricky。最终代码可以从这里github获取。代码中为了简单,实现了一个不是很强大的hazard pointer,可以参考之前的博文

posted @ 2015-05-05 19:47 Kevin Lynx 阅读(9661) | 评论 (0)编辑 收藏

2015年5月3日 #

并行编程中的内存回收Hazard Pointer

接上篇使用RCU技术实现读写线程无锁,在没有GC机制的语言中,要实现Lock free的算法,就免不了要自己处理内存回收的问题。

Hazard Pointer是另一种处理这个问题的算法,而且相比起来不但简单,功能也很强大。锁无关的数据结构与Hazard指针中讲得很好,Wikipedia Hazard pointer也描述得比较清楚,所以我这里就不讲那么细了。

一个简单的实现可以参考我的github haz_ptr.c

原理

基本原理无非也是读线程对指针进行标识,指针(指向的内存)要释放时都会缓存起来延迟到确认没有读线程了才对其真正释放。

<Lock-Free Data Structures with Hazard Pointers>中的描述:

Each reader thread owns a single-writer/multi-reader shared pointer called “hazard pointer.” When a reader thread assigns the address of a map to its hazard pointer, it is basically announcing to other threads (writers), “I am reading this map. You can replace it if you want, but don’t change its contents and certainly keep your deleteing hands off it.”

关键的结构包括:Hazard pointerThread Free list

Hazard pointer:一个读线程要使用一个指针时,就会创建一个Hazard pointer包装这个指针。一个Hazard pointer会被一个线程写,多个线程读。

struct HazardPointer {
        void *real_ptr; // 包装的指针
        ... // 不同的实现有不同的成员
    };

    void func() {
        HazardPointer *hp = accquire(_real_ptr);
        ... // use _real_ptr
        release(hp);
    }

Thread Free List:每个线程都有一个这样的列表,保存着将要释放的指针列表,这个列表仅对应的线程读写

void defer_free(void *ptr) {
        _free_list.push_back(ptr);
    }

当某个线程要尝试释放Free List中的指针时,例如指针ptr,就检查所有其他线程使用的Hazard pointer,检查是否存在包装了ptr的Hazard pointer,如果没有则说明没有读线程正在使用ptr,可以安全释放ptr

void gc() {
        for(ptr in _free_list) {
            conflict = false
            for (hp in _all_hazard_pointers) {
                if (hp->_real_ptr == ptr) {
                    confilict = true
                    break
                }
            }
            if (!conflict)
                delete ptr
        }
    }

以上,其实就是Hazard Pointer的主要内容。

Hazard Pointer的管理

上面的代码中没有提到_all_hazard_pointersaccquire的具体实现,这就是Hazard Pointer的管理问题。

《锁无关的数据结构与Hazard指针》文中创建了一个Lock free的链表来表示这个全局的Hazard Pointer List。每个Hazard Pointer有一个成员标识其是否可用。这个List中也就保存了已经被使用的Hazard Pointer集合和未被使用的Hazard Pointer集合,当所有Hazard Pointer都被使用时,就会新分配一个加进这个List。当读线程不使用指针时,需要归还Hazard Pointer,直接设置可用成员标识即可。要gc()时,就直接遍历这个List。

要实现一个Lock free的链表,并且仅需要实现头插入,还是非常简单的。本身Hazard Pointer标识某个指针时,都是用了后立即标识,所以这个实现直接支持了动态线程,支持线程的挂起等。

nbds项目中也有一个Hazard Pointer的实现,相对要弱一点。它为每个线程都设置了自己的Hazard Pointer池,写线程要释放指针时,就访问所有其他线程的Hazard Pointer池。

typedef struct haz_local {
        // Free List
        pending_t *pending; // to be freed
        int pending_size;
        int pending_count;

        // Hazard Pointer 池,动态和静态两种
        haz_t static_haz[STATIC_HAZ_PER_THREAD];

        haz_t **dynamic;
        int dynamic_size;
        int dynamic_count;

    } __attribute__ ((aligned(CACHE_LINE_SIZE))) haz_local_t;

    static haz_local_t haz_local_[MAX_NUM_THREADS] = {};

每个线程当然就涉及到haz_local_索引(ID)的分配,就像使用RCU技术实现读写线程无锁中的一样。这个实现为了支持线程动态创建,就需要一套线程ID的重用机制,相对复杂多了。

附录

最后,附上一些并行编程中的一些概念。

Lock Free & Wait Free

常常看到Lock FreeWait Free的概念,这些概念用于衡量一个系统或者说一段代码的并行级别,并行级别可参考并行编程——并发级别。总之Wait Free是一个比Lock Free更牛逼的级别。

我自己的理解,例如《锁无关的数据结构与Hazard指针》中实现的Hazard Pointer链表就可以说是Lock Free的,注意它在插入新元素到链表头时,因为使用CAS,总免不了一个busy loop,有这个特征的情况下就算是Lock Free,虽然没锁,但某个线程的执行情况也受其他线程的影响。

相对而言,Wait Free则是每个线程的执行都是独立的,例如《锁无关的数据结构与Hazard指针》中的Scan函数。“每个线程的执行时间都不依赖于其它任何线程的行为”

锁无关(Lock-Free)意味着系统中总存在某个线程能够得以继续执行;而等待无关(Wait-Free)则是一个更强的条件,它意味着所有线程都能往下进行。

ABA问题

在实现Lock Free算法的过程中,总是要使用CAS原语的,而CAS就会带来ABA问题。

在进行CAS操作的时候,因为在更改V之前,CAS主要询问“V的值是否仍然为A”,所以在第一次读取V之后以及对V执行CAS操作之前,如果将值从A改为B,然后再改回A,会使基于CAS的算法混乱。在这种情况下,CAS操作会成功。这类问题称为ABA问题。

Wiki Hazard Pointer提到了一个ABA问题的好例子:在一个Lock free的栈实现中,现在要出栈,栈里的元素是[A, B, C]head指向栈顶,那么就有compare_and_swap(target=&head, newvalue=B, expected=A)。但是在这个操作中,其他线程把A B都出栈,且删除了B,又把A压入栈中,即[A, C]。那么前一个线程的compare_and_swap能够成功,此时head指向了一个已经被删除的B。stackoverflow上也有个例子 Real-world examples for ABA in multithreading

对于CAS产生的这个ABA问题,通常的解决方案是采用CAS的一个变种DCAS。DCAS,是对于每一个V增加一个引用的表示修改次数的标记符。对于每个V,如果引用修改了一次,这个计数器就加1。然后再这个变量需要update的时候,就同时检查变量的值和计数器的值。

但也早有人提出DCAS也不是ABA problem 的银弹

posted @ 2015-05-03 20:46 Kevin Lynx 阅读(7676) | 评论 (0)编辑 收藏

2015年4月19日 #

使用RCU技术实现读写线程无锁

在一个系统中有一个写线程和若干个读线程,读写线程通过一个指针共用了一个数据结构,写线程改写这个结构,读线程读取该结构。在写线程改写这个数据结构的过程中,加锁情况下读线程由于等待锁耗时会增加。

可以利用RCU (Read Copy Update What is rcu)的思想来去除这个锁。本文提到的主要实现代码:gist

RCU

RCU可以说是一种替代读写锁的方法。其基于一个事实:当写线程在改变一个指针时,读线程获取这个指针,要么获取到老的值,要么获取到新的值。RCU的基本思想其实很简单,参考What is RCU中Toy implementation可以很容易理解。一种简单的RCU流程可以描述为:

写线程:

old_ptr = _ptr
tmp_ptr = copy(_ptr)     // copy
change(tmp_ptr)          // change 
_ptr = tmp_ptr           // update
synchroize(tmp_ptr)

写线程要更新_ptr指向的内容时,先复制一份新的,基于新的进行改变,更新_ptr指针,最后同步释放老的内存。

读线程:

tmp_ptr = _ptr
use(tmp_ptr)
dereference(tmp_ptr)

读线程直接使用_ptr,使用完后需要告诉写线程自己不再使用_ptr。读线程获取_ptr时,可能会获取到老的也可能获取到新的,无论哪种RCU都需要保证这块内存是有效的。重点在synchroizedereferencesynchroize会等待所有使用老的_ptr的线程dereference,对于新的_ptr使用者其不需要等待。这个问题说白了就是写线程如何知道old_ptr没有任何读线程在使用,可以安全地释放。

这个问题实际上在wait-free的各种实现中有好些解法,how-when-to-release-memory-in-wait-free-algorithms这里有人总结了几种方法,例如Hazard pointersQuiescence period based reclamation

简单地使用引用计数智能指针是无法解决这个问题的,因为智能指针自己不是线程安全的,例如:

tmp_ptr = _ptr      // 1
tmp_ptr->addRef()   // 2
use
tmp_ptr->release()

代码1/2行不是原子的,所以当取得tmp_ptr准备addRef时,tmp_ptr可能刚好被释放了。

Quiescence period based reclamation方法指的是读线程需要声明自己处于Quiescence period,也就是不使用_ptr的时候,当其使用_ptr的时候实际是进入了一个逻辑上的临界区,当所有读线程都不再使用_ptr的时候,写线程就可以对内存进行安全地释放。

本文正是描述了一种Quiescence period based reclamation实现。这个实现可以用于有一个写线程和多个读线程共用若干个数据的场景。

实现

该方法本质上把数据同步分解为基本的内存单元读写。使用方式上可描述为:

读线程:

tmp_ptr = _ptr
use
update() // 标识自己不再使用任何共享数据

写线程:

old_ptr = _ptr
tmp_ptr = copy(_ptr)
change(tmp_ptr)
_ptr = tmp_ptr
gc()
defer_free(old_ptr)

以下具体描述读写线程的实现。

写线程

写线程负责标识内存需要被释放,以及检查何时可以真正释放内存。其维护了一个释放内存队列:

void *_pending[8]
    uint64_t _head, _tail

    void defer_free(void *p) {
        _head ++
        _pending[PENDING_POS(_head)] = p
    }

    gc() {
        for (_tail -> find_free_pos())
            free(_pending[_tail])
    }

find_free_pos找到一个可释放内存位置,在[_tail, find_free_pos())这个区间内所有内存是可以安全被释放的。

队列位置_head/_tail一直增大,PENDING_POS就是对这个位置取模,限定在队列大小范围内也是可行的,无论哪种方式,_head从逻辑上说一直>=_tail,但在实际中可能小于_tail,所以实现时不使用大小判定,而是:

gc() {
        pos = find_free_pos()
        while (_tail != pos) {
            free(_pending[PENDING_POS(_tail)])
            _tail ++
        }
    }

读线程

读线程不再使用共享内存时,就标识自己:

update() {
        static __thread int tid
        _tmark[tid] = _head
    }

读线程的状态会影响写线程的回收逻辑,其状态分为:

  • 初始
  • 活跃,会调用到update
  • 暂停,其他地方同步,或被挂起
  • 退出

读线程处于活跃状态时,它会不断地更新自己可释放内存位置(_tmark[tid])。写线程检查所有读线程的_tmark[tid][_tail, min(_tmark[]))是所有读线程都不再使用的内存区间,可以被安全释放。

find_free_pos() {
        min = MAX_INTEGER
        pos = 0
        for (tid = 0; tid < max_threads; ++tid) {
            tpos = _tmark[tid]
            offset = tpos - tail
            if (offset < min) {
                min = offset
                pos = tpos
            }
        }
        return pos
    }

当读线程暂停时,其_tmark[tid]可能会在很长一段时间里得不到更新,此时会阻碍写线程释放内存。所以需要方法来标识读线程是否进入暂停状态。通过设置一个上次释放内存位置_tfreeds[tid],标识每个线程当前内存释放到的位置。如果一个线程处于暂停状态了,那么在一定时间后,_tfreeds[tid] == _tmark[tid]。在查找可释放位置时,就需要忽略暂停状态的读线程:

find_free_pos() {
        min = MAX_INTEGER
        pos = _head
        for (tid = 0; tid < max_threads; ++tid) {
            tpos = _tmark[tid]
            if (tpos == _tfreeds[tid]) continue
            offset = tpos - tail
            if (offset < min) {
                min = offset
                pos = tpos
            }
        }
        for (tid = 0; tid < max_threads; ++tid) {
            if (_tfreeds[tid] != _tmark[tid]) 
                _tfreeds[tid] = pos
        }
        return pos
    }

但是当所有线程都处于暂停状态时,写线程可能还在工作,上面的实现就会返回_head,此时写线程依然可以正常释放内存。

小结,该方法原理可用下图表示:

线程动态增加/减少

如果读线程可能中途退出,中途动态增加,那么_tmark[]就需要被复用,此时线程tid的分配调整为动态的即可:

class ThreadIdPool {
    public:
        // 动态获取一个线程tid,某线程每次调用该接口返回相同的值
        int get()
        // 线程退出时回收该tid
        void put(int id)
    }

ThreadIdPool的实现无非就是利用TLS,以及在线程退出时得到通知以回收tid。那么对于读线程的update实现变为:

update() {
        tid = _idPool->get()
        _tmark[tid] = _head
    }

当某个线程退出时,_tmark[tid]_tfreeds[tid]不需要做任何处理,当新创建的线程复用了该tid时,可以立即复用_tmark[tid]_tfreeds[tid],此时这2个值必然是相等的。

以上,就是整个方法的实现。

线程可读可写

以上方法适用场景还是不够通用。在nbds项目(实现了一些无锁数据结构的toy project)中有一份虽然简单但也有启发的实现(rcu.c)。该实现支持任意线程defer_free,所有线程updateupdate除了声明不再使用任何共享内存外,还可能回收内存。任意线程都可能维护一些待释放的内存,任意一块内存可能被任意其他线程使用。那么它是如何内存回收的?

本文描述的方法是所有读线程自己声明自己,然后由写线程主动来检查。不同于此方法, nbds的实现,基于一种通知扩散的方式。该方式以这样一种方式工作:

当某个线程尝试内存回收时,它需要知道所有其他线程的空闲位置(相当于_tmark[tid]),它通知下一个线程我需要释放的范围。当下一个线程update时(离开临界区),它会将上个线程的通知继续告诉下一个线程,直到最后这个通知回到发起线程。那么对于发起线程而言,这个释放请求在所有线程中走了一遍,得到了大家的认可,可以安全释放。每个线程都以这样的方式工作。

void rcu_defer_free (void *x) {
        ...
        rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
        ...
    }

    void rcu_update (void) {
        ...
        for (i = 0; i < num_threads_; ++i) {
            ...     
            uint64_t x = rcu_[tid_][i]; // 其它线程发给自己的通知
            rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; // 扩散出去
            ...
        }
        ...
        while (q->tail != rcu_[tid_][tid_]) {
            free
        }     
        ...
    }

这个实现相对简单,不支持线程暂停,以及线程动态增加和减少。

posted @ 2015-04-19 19:10 Kevin Lynx 阅读(4076) | 评论 (3)编辑 收藏

2015年4月6日 #

记一次tcmalloc分配内存引起的coredump

现象

线上的服务出现coredump,堆栈为:

#0  0x000000000045d145 in GetStackTrace(void**, int, int) ()
#1  0x000000000045ec22 in tcmalloc::PageHeap::GrowHeap(unsigned long) ()
#2  0x000000000045eeb3 in tcmalloc::PageHeap::New(unsigned long) ()
#3  0x0000000000459ee8 in tcmalloc::CentralFreeList::Populate() ()
#4  0x000000000045a088 in tcmalloc::CentralFreeList::FetchFromSpansSafe() ()
#5  0x000000000045a10a in tcmalloc::CentralFreeList::RemoveRange(void**, void**, int) ()
#6  0x000000000045c282 in tcmalloc::ThreadCache::FetchFromCentralCache(unsigned long, unsigned long) ()
#7  0x0000000000470766 in tc_malloc ()
#8  0x00007f75532cd4c2 in __conhash_get_rbnode (node=0x22c86870, hash=30)
        at build/release64/cm_sub/conhash/conhash_inter.c:88
#9  0x00007f75532cd76e in __conhash_add_replicas (conhash=0x24fbc7e0, iden=<value optimized out>)
        at build/release64/cm_sub/conhash/conhash_inter.c:45
#10 0x00007f75532cd1fa in conhash_add_node (conhash=0x24fbc7e0, iden=0) at build/release64/cm_sub/conhash/conhash.c:72
#11 0x00007f75532c651b in cm_sub::TopoCluster::initLBPolicyInfo (this=0x2593a400)
        at build/release64/cm_sub/topo_cluster.cpp:114
#12 0x00007f75532cad73 in cm_sub::TopoClusterManager::processClusterMapTable (this=0xa219e0, ref=0x267ea8c0)
        at build/release64/cm_sub/topo_cluster_manager.cpp:396
#13 0x00007f75532c5a93 in cm_sub::SubRespMsgProcess::reinitCluster (this=0x9c2f00, msg=0x4e738ed0)
        at build/release64/cm_sub/sub_resp_msg_process.cpp:157
...

查看了应用层相关数据结构,基本数据都是没有问题的。所以最初怀疑是tcmalloc内部维护了错误的内存,在分配内存时出错,这个堆栈只是问题的表象。几天后,线上的另一个服务,基于同样的库,也core了,堆栈还是一样的。

最初定位问题都是从最近更新的东西入手,包括依赖的server环境,但都没有明显的问题,所以最后只能从core的直接原因入手。

分析GetStackTrace

确认core的详细位置:

# core在该指令
0x000000000045d145 <_Z13GetStackTracePPvii+21>: mov    0x8(%rax),%r9

(gdb) p/x $rip              # core 的指令位置
$9 = 0x45d145
(gdb) p/x $rax              
$10 = 0x4e73aa58
(gdb) x/1a $rax+0x8         # rax + 8 = 0x4e73aa60
0x4e73aa60:     0x0

该指令尝试从[0x4e73aa60]处读取内容,然后出错,这个内存单元不可读。但是具体这个指令在代码中是什么意思,需要将这个指令对应到代码中。获取tcmalloc的源码,发现GetStackTrace根据编译选项有很多实现,所以这里选择最可能的实现,然后对比汇编以确认代码是否匹配。最初选择的是stacktrace_x86-64-inl.h,后来发现完全不匹配,又选择了stacktrace_x86-inl.h。这个实现版本里也有对64位平台的支持。

stacktrace_x86-inl.h里使用了一些宏来生成函数名和参数,精简后代码大概为:

int GET_STACK_TRACE_OR_FRAMES {
      void **sp;
      unsigned long rbp;
      __asm__ volatile ("mov %%rbp, %0" : "=r" (rbp));
      sp = (void **) rbp;

      int n = 0;
      while (sp && n < max_depth) {
        if (*(sp+1) == reinterpret_cast<void *>(0)) {
          break;
        }
        void **next_sp = NextStackFrame<!IS_STACK_FRAMES, IS_WITH_CONTEXT>(sp, ucp);
        if (skip_count > 0) {
          skip_count--;
        } else {
          result[n] = *(sp+1);
          n++;
        }
        sp = next_sp;
      }
      return n;
    }

NextStackFrame是一个模板函数,包含一大堆代码,精简后非常简单:

template<bool STRICT_UNWINDING, bool WITH_CONTEXT>
    static void **NextStackFrame(void **old_sp, const void *uc) {
      void **new_sp = (void **) *old_sp;
      if (STRICT_UNWINDING) {
        if (new_sp <= old_sp) return NULL;
        if ((uintptr_t)new_sp - (uintptr_t)old_sp > 100000) return NULL;
      } else {
        if (new_sp == old_sp) return NULL;
        if ((new_sp > old_sp)
            && ((uintptr_t)new_sp - (uintptr_t)old_sp > 1000000)) return NULL;
      }
      if ((uintptr_t)new_sp & (sizeof(void *) - 1)) return NULL;

      return new_sp;
    }

上面这个代码到汇编的对比过程还是花了些时间,其中汇编中出现的一些常量可以大大缩短对比时间,例如上面出现了100000,汇编中就有:

0x000000000045d176 <_Z13GetStackTracePPvii+70>: cmp    $0x186a0,%rbx  # 100000=0x186a0

注意NextStackFrame中的 if (STRICT_UNWINDING)使用的是模板参数,这导致生成的代码中根本没有else部分,也就没有1000000这个常量

在对比代码的过程中,可以知道关键的几个寄存器、内存位置对应到代码中的变量,从而可以还原core时的现场环境。分析过程中不一定要从第一行汇编读,可以从较明显的位置读,从而还原整个代码,函数返回指令、跳转指令、比较指令、读内存指令、参数寄存器等都是比较明显对应的地方。

另外注意GetStackTraceRecordGrowth中调用,传入了3个参数:

GetStackTrace(t->stack, kMaxStackDepth-1, 3); // kMaxStackDepth = 31

以下是我分析的简单注解:

(gdb) disassemble
Dump of assembler code for function _Z13GetStackTracePPvii:
0x000000000045d130 <_Z13GetStackTracePPvii+0>:  push   %rbp
0x000000000045d131 <_Z13GetStackTracePPvii+1>:  mov    %rsp,%rbp
0x000000000045d134 <_Z13GetStackTracePPvii+4>:  push   %rbx
0x000000000045d135 <_Z13GetStackTracePPvii+5>:  mov    %rbp,%rax
0x000000000045d138 <_Z13GetStackTracePPvii+8>:  xor    %r8d,%r8d
0x000000000045d13b <_Z13GetStackTracePPvii+11>: test   %rax,%rax
0x000000000045d13e <_Z13GetStackTracePPvii+14>: je     0x45d167 <_Z13GetStackTracePPvii+55>
0x000000000045d140 <_Z13GetStackTracePPvii+16>: cmp    %esi,%r8d        # while ( .. max_depth > n ?
0x000000000045d143 <_Z13GetStackTracePPvii+19>: jge    0x45d167 <_Z13GetStackTracePPvii+55>
0x000000000045d145 <_Z13GetStackTracePPvii+21>: mov    0x8(%rax),%r9    # 关键位置:*(sp+1) -> r9, rax 对应 sp变量
0x000000000045d149 <_Z13GetStackTracePPvii+25>: test   %r9,%r9          # *(sp+1) == 0 ?
0x000000000045d14c <_Z13GetStackTracePPvii+28>: je     0x45d167 <_Z13GetStackTracePPvii+55>
0x000000000045d14e <_Z13GetStackTracePPvii+30>: mov    (%rax),%rcx      # new_sp = *old_sp,这里已经是NextStackFrame的代码
0x000000000045d151 <_Z13GetStackTracePPvii+33>: cmp    %rcx,%rax        # new_sp <= old_sp ? 
0x000000000045d154 <_Z13GetStackTracePPvii+36>: jb     0x45d170 <_Z13GetStackTracePPvii+64>  # new_sp > old_sp 跳转
0x000000000045d156 <_Z13GetStackTracePPvii+38>: xor    %ecx,%ecx
0x000000000045d158 <_Z13GetStackTracePPvii+40>: test   %edx,%edx        # skip_count > 0 ?
0x000000000045d15a <_Z13GetStackTracePPvii+42>: jle    0x45d186 <_Z13GetStackTracePPvii+86>
0x000000000045d15c <_Z13GetStackTracePPvii+44>: sub    $0x1,%edx        # skip_count--
0x000000000045d15f <_Z13GetStackTracePPvii+47>: mov    %rcx,%rax        
0x000000000045d162 <_Z13GetStackTracePPvii+50>: test   %rax,%rax        # while (sp ?
0x000000000045d165 <_Z13GetStackTracePPvii+53>: jne    0x45d140 <_Z13GetStackTracePPvii+16>
0x000000000045d167 <_Z13GetStackTracePPvii+55>: pop    %rbx
0x000000000045d168 <_Z13GetStackTracePPvii+56>: leaveq 
0x000000000045d169 <_Z13GetStackTracePPvii+57>: mov    %r8d,%eax        # r8 存储了返回值,r8=n
0x000000000045d16c <_Z13GetStackTracePPvii+60>: retq                    # return n
0x000000000045d16d <_Z13GetStackTracePPvii+61>: nopl   (%rax)
0x000000000045d170 <_Z13GetStackTracePPvii+64>: mov    %rcx,%rbx        
0x000000000045d173 <_Z13GetStackTracePPvii+67>: sub    %rax,%rbx        # offset = new_sp - old_sp
0x000000000045d176 <_Z13GetStackTracePPvii+70>: cmp    $0x186a0,%rbx    # offset > 100000 ?
0x000000000045d17d <_Z13GetStackTracePPvii+77>: ja     0x45d156 <_Z13GetStackTracePPvii+38> # return NULL
0x000000000045d17f <_Z13GetStackTracePPvii+79>: test   $0x7,%cl         # new_sp & (sizeof(void*) - 1)
0x000000000045d182 <_Z13GetStackTracePPvii+82>: je     0x45d158 <_Z13GetStackTracePPvii+40>
0x000000000045d184 <_Z13GetStackTracePPvii+84>: jmp    0x45d156 <_Z13GetStackTracePPvii+38>
0x000000000045d186 <_Z13GetStackTracePPvii+86>: movslq %r8d,%rax        # rax = n
0x000000000045d189 <_Z13GetStackTracePPvii+89>: add    $0x1,%r8d        # n++
0x000000000045d18d <_Z13GetStackTracePPvii+93>: mov    %r9,(%rdi,%rax,8)# 关键位置:result[n] = *(sp+1)
0x000000000045d191 <_Z13GetStackTracePPvii+97>: jmp    0x45d15f <_Z13GetStackTracePPvii+47>

分析过程比较耗时,同时还可以分析下GetStackTrace函数的实现原理,其实就是利用RBP寄存器不断回溯,从而得到整个调用堆栈各个函数的地址(严格来说是返回地址)。简单示意下函数调用中RBP的情况:

   ...
saved registers          # i.e push rbx
local variabes           # i.e sub 0x10, rsp
return address           # call xxx
last func RBP            # push rbp; mov rsp, rbp
saved registers
local variables 
return address
last func RBP
...                      # rsp

总之,一般情况下,任何一个函数中,RBP寄存器指向了当前函数的栈基址,该栈基址中又存储了调用者的栈基址,同时该栈基址前面还存储了调用者的返回地址。所以,GetStackTrace的实现,简单来说大概就是:

sp = rbp  // 取得当前函数GetStackTrace的栈基址
    while (n < max_depth) {
        new_sp = *sp
        result[n] = *(new_sp+1)
        n++
    }

以上,最终就知道了以下关键信息:

  • r8 对应变量 n,表示当前取到第几个栈帧了
  • rax 对应变量 sp,代码core在 *(sp+1)
  • rdi 对应变量 result,用于存储取得的各个地址

然后可以看看现场是怎样的:

(gdb) x/10a $rdi
0x1ffc9b98:     0x45a088 <_ZN8tcmalloc15CentralFreeList18FetchFromSpansSafeEv+40>       0x45a10a <_ZN8tcmalloc15CentralFreeList11RemoveRangeEPPvS2_i+106>
0x1ffc9ba8:     0x45c282 <_ZN8tcmalloc11ThreadCache21FetchFromCentralCacheEmm+114>      0x470766 <tc_malloc+790>
0x1ffc9bb8:     0x7f75532cd4c2 <__conhash_get_rbnode+34>        0x0
0x1ffc9bc8:     0x0     0x0
0x1ffc9bd8:     0x0     0x0

(gdb) p/x $r8
$3 = 0x5

(gdb) p/x $rax
$4 = 0x4e73aa58

小结:

GetStackTrace在取调用__conhash_get_rbnode的函数时出错,取得了5个函数地址。当前使用的RBP为0x4e73aa58

错误的RBP

RBP也是从堆栈中取出来的,既然这个地址有问题,首先想到的就是有代码局部变量/数组写越界。例如sprintf的使用。而且,一般写越界破坏堆栈,都可能是把调用者的堆栈破坏了,例如:

char s[32];
memcpy(s, p, 1024);

因为写入都是从低地址往高地址写,而调用者的堆栈在高地址。当然,也会遇到写坏调用者的调用者的堆栈,也就是跨栈帧越界写,例如以前遇到的:

len = vsnprintf(buf, sizeof(buf), fmt, wtf-long-string);
buf[len] = 0;

__conhash_get_rbnode的RBP是在tcmalloc的堆栈中取的:

(gdb) f 7
#7  0x0000000000470766 in tc_malloc ()
(gdb) x/10a $rsp
0x4e738b80:     0x4e73aa58      0x22c86870
0x4e738b90:     0x4e738bd0      0x85
0x4e738ba0:     0x4e73aa58      0x7f75532cd4c2 <__conhash_get_rbnode+34>   # 0x4e73aa58

所以这里就会怀疑是tcmalloc这个函数里有把堆栈破坏,这个时候就是读代码,看看有没有疑似危险的地方,未果。这里就陷入了僵局,怀疑又遇到了跨栈帧破坏的情况,这个时候就只能__conhash_get_rbnode调用栈中周围的函数翻翻,例如调用__conhash_get_rbnode的函数__conhash_add_replicas中恰好有字符串操作:

void __conhash_add_replicas(conhash_t *conhash, int32_t iden)
    {
        node_t* node = __conhash_create_node(iden, conhash->replica);
        ...
        char buf[buf_len]; // buf_len = 64
        ...
        snprintf(buf, buf_len, VIRT_NODE_HASH_FMT, node->iden, i);
        uint32_t hash = conhash->cb_hashfunc(buf);
        if(util_rbtree_search(&(conhash->vnode_tree), hash) == NULL)
        {
            util_rbtree_node_t* rbnode = __conhash_get_rbnode(node, hash);
            ...

这段代码最终发现是没有问题的,这里又耗费了不少时间。后来发现若干个函数里的RBP都有点奇怪,这个调用栈比较正常的范围是:0x4e738c90

(gdb) f 8
#8  0x00007f75532cd4c2 in __conhash_get_rbnode (node=0x22c86870, hash=30)
(gdb) p/x $rbp
$6 = 0x4e73aa58     # 这个还不算特别可疑
(gdb) f 9
#9  0x00007f75532cd76e in __conhash_add_replicas (conhash=0x24fbc7e0, iden=<value optimized out>)
(gdb) p/x $rbp
$7 = 0x4e738c60     # 这个也不算特别可疑
(gdb) f 10
#10 0x00007f75532cd1fa in conhash_add_node (conhash=0x24fbc7e0, iden=0) at build/release64/cm_sub/conhash/conhash.c:72
(gdb) p/x $rbp      # 可疑
$8 = 0x0
(gdb) f 11
#11 0x00007f75532c651b in cm_sub::TopoCluster::initLBPolicyInfo (this=0x2593a400)
(gdb) p/x $rbp      # 可疑
$9 = 0x2598fef0

为什么很多函数中RBP都看起来不正常? 想了想真要是代码里把堆栈破坏了,这错误得发生得多巧妙?

错误RBP的来源

然后转机来了,脑海中突然闪出-fomit-frame-pointer。编译器生成的代码中是可以不需要栈基址指针的,也就是RBP寄存器不作为栈基址寄存器。大部分函数或者说开启了frame-pointer的函数,其函数头都会有以下指令:

push   %rbp
mov    %rsp,%rbp
...

表示保存调用者的栈基址到栈中,以及设置自己的栈基址。看下__conhash系列函数;

Dump of assembler code for function __conhash_get_rbnode:
0x00007f75532cd4a0 <__conhash_get_rbnode+0>:    mov    %rbx,-0x18(%rsp)
0x00007f75532cd4a5 <__conhash_get_rbnode+5>:    mov    %rbp,-0x10(%rsp)
...

这个库是单独编译的,没有显示指定-fno-omit-frame-pointer,查阅gcc手册,o2优化是开启了omit-frame-pinter 的。

在没有RBP的情况下,tcmalloc的GetStackTrace尝试读RBP取获取调用返回地址,自然是有问题的。但是,如果整个调用栈中的函数,要么有RBP,要么没有RBP,那么GetStackTrace取出的结果最多就是跳过一些栈帧,不会出错。 除非,这中间的某个函数把RBP寄存器另作他用(编译器省出这个寄存器肯定是要另作他用的)。所以这里继续追查这个错误地址0x4e73aa58的来源。

来源已经比较明显,肯定是__conhash_get_rbnode中设置的,因为这个函数的RBP是在被调用者tcmalloc中保存的。

Dump of assembler code for function __conhash_get_rbnode:
0x00007f75532cd4a0 <__conhash_get_rbnode+0>:    mov    %rbx,-0x18(%rsp)
0x00007f75532cd4a5 <__conhash_get_rbnode+5>:    mov    %rbp,-0x10(%rsp)
0x00007f75532cd4aa <__conhash_get_rbnode+10>:   mov    %esi,%ebp                    # 改写了RBP
0x00007f75532cd4ac <__conhash_get_rbnode+12>:   mov    %r12,-0x8(%rsp)
0x00007f75532cd4b1 <__conhash_get_rbnode+17>:   sub    $0x18,%rsp
0x00007f75532cd4b5 <__conhash_get_rbnode+21>:   mov    %rdi,%r12
0x00007f75532cd4b8 <__conhash_get_rbnode+24>:   mov    $0x30,%edi
0x00007f75532cd4bd <__conhash_get_rbnode+29>:   callq  0x7f75532b98c8 <malloc@plt>  # 调用tcmalloc,汇编到这里即可

这里打印RSI寄存器的值可能会被误导,因为任何时候打印寄存器的值可能都是错的,除非它有被显示保存。不过这里可以看出RSI的值来源于参数(RSI对应第二个参数):

void __conhash_add_replicas(conhash_t *conhash, int32_t iden)
    {
        node_t* node = __conhash_create_node(iden, conhash->replica);
        ...
        char buf[buf_len]; // buf_len = 64
        ...
        snprintf(buf, buf_len, VIRT_NODE_HASH_FMT, node->iden, i);
        uint32_t hash = conhash->cb_hashfunc(buf); // hash值由一个字符串哈希函数计算
        if(util_rbtree_search(&(conhash->vnode_tree), hash) == NULL)
        {
            util_rbtree_node_t* rbnode = __conhash_get_rbnode(node, hash);  // hash值
            ...

追到__conhash_add_replicas

0x00007f75532cd764 <__conhash_add_replicas+164>:        mov    %ebx,%esi    # 来源于rbx
0x00007f75532cd766 <__conhash_add_replicas+166>:        mov    %r15,%rdi
0x00007f75532cd769 <__conhash_add_replicas+169>:        callq  0x7f75532b9e48 <__conhash_get_rbnode@plt>

(gdb) p/x $rbx
$11 = 0x4e73aa58
(gdb) p/x hash
$12 = 0x4e73aa58      # 0x4e73aa58

找到了0x4e73aa58的来源。这个地址值竟然是一个字符串哈希算法算出来的!这里还可以看看这个字符串的内容:

(gdb) x/1s $rsp
0x4e738bd0:      "conhash-00000-00133"

这个碉堡的哈希函数是conhash_hash_def

coredump的条件

以上,既然只要某个库omit-frame-pointer,那tcmalloc就可能出错,为什么发生的频率并不高呢?这个可以回到GetStackTrace尤其是NextStackFrame的实现,其中包含了几个合法RBP的判定:

if (new_sp <= old_sp) return NULL;  // 上一个栈帧的RBP肯定比当前的大
        if ((uintptr_t)new_sp - (uintptr_t)old_sp > 100000) return NULL; // 指针值范围还必须在100000内
        ...
    if ((uintptr_t)new_sp & (sizeof(void *) - 1)) return NULL; // 由于本身保存的是指针,所以还必须是sizeof(void*)的整数倍,对齐

有了以上条件,才使得这个core几率变得很低。

总结

最后,如果你很熟悉tcmalloc,整个问题估计就被秒解了:tcmalloc INSTALL

另外附上另一个有意思的东西。

在分析__conhash_add_replicas时,其内定义了一个64字节的字符数组,查看其堆栈:

(gdb) x/20a $rsp
0x4e738bd0:     0x2d687361686e6f63      0x30302d3030303030          # 这些是字符串conhash-00000-00133
0x4e738be0:     0x333331        0x0
0x4e738bf0:     0x0     0x7f75532cd69e <__conhash_create_node+78>
0x4e738c00:     0x24fbc7e0      0x4e738c60
0x4e738c10:     0x24fbc7e0      0x7f75532cd6e3 <__conhash_add_replicas+35>
0x4e738c20:     0x0     0x24fbc7e8
0x4e738c30:     0x4e738c20      0x24fbc7e0
0x4e738c40:     0x22324360      0x246632c0
0x4e738c50:     0x0     0x0
0x4e738c60:     0x0     0x7f75532cd1fa <conhash_add_node+74>

最开始我觉得buf占64字节,也就是整个[0x4e738bd0, 0x4e738c10)内存,但是这块内存里居然有函数地址,这一度使我怀疑这里有问题。后来醒悟这些地址是定义buf前调用__conhash_create_node产生的,调用过程中写到堆栈里,调用完后栈指针改变,但并不需要清空栈中的内容。

posted @ 2015-04-06 18:33 Kevin Lynx 阅读(5146) | 评论 (2)编辑 收藏

2014年12月3日 #

基于内存查看STL常用容器内容

有时候在线上使用gdb调试程序core问题时,可能没有符号文件,拿到的仅是一个内存地址,如果这个指向的是一个STL对象,那么如何查看这个对象的内容呢?

只需要知道STL各个容器的数据结构实现,就可以查看其内容。本文描述了SGI STL实现中常用容器的数据结构,以及如何在gdb中查看其内容。

string

string,即basic_string bits/basic_string.h

mutable _Alloc_hider  _M_dataplus;
    ... 
      const _CharT*
      c_str() const
      { return _M_data(); }
    ...    
      _CharT*
      _M_data() const 
      { return  _M_dataplus._M_p; }

    ...
      struct _Alloc_hider : _Alloc
      {
    _Alloc_hider(_CharT* __dat, const _Alloc& __a)
    : _Alloc(__a), _M_p(__dat) { }

    _CharT* _M_p; // The actual data.
      };
   
      size_type
      length() const
      { return _M_rep()->_M_length; }

      _Rep*
      _M_rep() const
      { return &((reinterpret_cast<_Rep*> (_M_data()))[-1]); }

      ...
       struct _Rep_base
      {
    size_type       _M_length;
    size_type       _M_capacity;
    _Atomic_word        _M_refcount;
      };

      struct _Rep : _Rep_base

即,string内有一个指针,指向实际的字符串位置,这个位置前面有一个_Rep结构,其内保存了字符串的长度、可用内存以及引用计数。当我们拿到一个string对象的地址时,可以通过以下代码获取相关值:

void ds_str_i(void *p) {
        char **raw = (char**)p;
        char *s = *raw;
        size_t len = *(size_t*)(s - sizeof(size_t) * 3);
        printf("str: %s (%zd)\n", s, len);
    }

    size_t ds_str() {
        std::string s = "hello";
        ds_str_i(&s);
        return s.size();
    }

在gdb中拿到一个string的地址时,可以以下打印出该字符串及长度:

(gdb) x/1a p
0x7fffffffe3a0: 0x606028
(gdb) p (char*)0x606028
$2 = 0x606028 "hello"
(gdb) x/1dg 0x606028-24
0x606010:       5

vector

众所周知vector实现就是一块连续的内存,bits/stl_vector.h

template<typename _Tp, typename _Alloc = std::allocator<_Tp> >
    class vector : protected _Vector_base<_Tp, _Alloc>

    ...
    template<typename _Tp, typename _Alloc>
    struct _Vector_base
    {
      typedef typename _Alloc::template rebind<_Tp>::other _Tp_alloc_type;

      struct _Vector_impl
      : public _Tp_alloc_type
      {
    _Tp*           _M_start;
    _Tp*           _M_finish;
    _Tp*           _M_end_of_storage;
    _Vector_impl(_Tp_alloc_type const& __a)
    : _Tp_alloc_type(__a), _M_start(0), _M_finish(0), _M_end_of_storage(0)
    { }
      };


      _Vector_impl _M_impl;

可以看出sizeof(vector<xxx>)=24,其内也就是3个指针,_M_start指向首元素地址,_M_finish指向最后一个节点+1,_M_end_of_storage是可用空间最后的位置。

iterator
      end()
      { return iterator (this->_M_impl._M_finish); }
      const_iterator
      ...
      begin() const
      { return const_iterator (this->_M_impl._M_start); }
      ...
      size_type
      capacity() const
      { return size_type(const_iterator(this->_M_impl._M_end_of_storage)
             - begin()); }

可以通过代码从一个vector对象地址输出其信息:

template <typename T>
    void ds_vec_i(void *p) {
        T *start = *(T**)p;
        T *finish = *(T**)((char*)p + sizeof(void*));
        T *end_storage = *(T**)((char*)p + 2 * sizeof(void*));
        printf("vec size: %ld, avaiable size: %ld\n", finish - start, end_storage - start); 
    }

    size_t ds_vec() {
        std::vector<int> vec;
        vec.push_back(0x11);
        vec.push_back(0x22);
        vec.push_back(0x33);
        ds_vec_i<int>(&vec);
        return vec.size();
    }

使用gdb输出一个vector中的内容:

(gdb) p p
$3 = (void *) 0x7fffffffe380
(gdb) x/1a p
0x7fffffffe380: 0x606080
(gdb) x/3xw 0x606080
0x606080:       0x00000011      0x00000022      0x00000033

list

众所周知list被实现为一个链表。准确来说是一个双向链表。list本身是一个特殊节点,其代表end,其指向的下一个元素才是list真正的第一个节点:

bits/stl_list.h

bool
      empty() const
      { return this->_M_impl._M_node._M_next == &this->_M_impl._M_node; }

      const_iterator
      begin() const
      { return const_iterator(this->_M_impl._M_node._M_next); }

      iterator
      end()
      { return iterator(&this->_M_impl._M_node); }

      ...

    struct _List_node_base
    {
        _List_node_base* _M_next;   ///< Self-explanatory
        _List_node_base* _M_prev;   ///< Self-explanatory
        ...
    };
         
    template<typename _Tp>
    struct _List_node : public _List_node_base
    {
      _Tp _M_data;                ///< User's data.
    };
      
    template<typename _Tp, typename _Alloc>
    class _List_base
    {
        ...
      struct _List_impl
      : public _Node_alloc_type
      {
    _List_node_base _M_node;
        ...
      };

      _List_impl _M_impl;

          
    template<typename _Tp, typename _Alloc = std::allocator<_Tp> >
    class list : protected _List_base<_Tp, _Alloc>

所以sizeof(list<xx>)=16,两个指针。每一个真正的节点首先是包含两个指针,然后是元素内容(_List_node)。

通过代码输出list的内容:

#define NEXT(ptr, T) do { \
        void *n = *(char**)ptr; \
        T val = *(T*)((char**)ptr + 2); \
        printf("list item %p val: 0x%x\n", ptr, val); \
        ptr = n; \
    } while (0)

    template <typename T>
    void ds_list_i(void *p) {
        void *ptr = *(char**)p;

        NEXT(ptr, T);
        NEXT(ptr, T);
        NEXT(ptr, T);
    }

    size_t ds_list() {
        std::list<int> lst;
        lst.push_back(0x11);
        lst.push_back(0x22);
        lst.push_back(0x33);
        ds_list_i<int>(&lst);
        return lst.size();
    }

在gdb中可以以下方式遍历该list:

(gdb) p p
$4 = (void *) 0x7fffffffe390
(gdb) x/1a p
0x7fffffffe390: 0x606080
(gdb) x/1xw 0x606080+16         # 元素1 
0x606090:       0x00000011
(gdb) x/1a 0x606080
0x606080:       0x6060a0
(gdb) x/1xw 0x6060a0+16         # 元素2
0x6060b0:       0x00000022

map

map使用的是红黑树实现,实际使用的是stl_tree.h实现:

bits/stl_map.h

typedef _Rb_tree<key_type, value_type, _Select1st<value_type>,
               key_compare, _Pair_alloc_type> _Rep_type;
    ...
     _Rep_type _M_t;
    ... 

      iterator
      begin()
      { return _M_t.begin(); }

bits/stl_tree.h

struct _Rb_tree_node_base
      {
        typedef _Rb_tree_node_base* _Base_ptr;
        typedef const _Rb_tree_node_base* _Const_Base_ptr;

        _Rb_tree_color  _M_color;
        _Base_ptr       _M_parent;
        _Base_ptr       _M_left;
        _Base_ptr       _M_right;
        
        ...
      };

    template<typename _Val>
    struct _Rb_tree_node : public _Rb_tree_node_base
    {
      typedef _Rb_tree_node<_Val>* _Link_type;
      _Val _M_value_field;
    };


    template<typename _Key_compare,
           bool _Is_pod_comparator = std::__is_pod<_Key_compare>::__value>
        struct _Rb_tree_impl : public _Node_allocator
        {
      _Key_compare      _M_key_compare;
      _Rb_tree_node_base    _M_header;
      size_type         _M_node_count; // Keeps track of size of tree.
      ...
        }
    
    _Rb_tree_impl<_Compare> _M_impl;
    ...

      iterator
      begin()
      {
    return iterator(static_cast<_Link_type>
            (this->_M_impl._M_header._M_left));
      }

所以可以看出,大部分时候(取决于_M_key_compare) sizeof(map<xx>)=48,主要的元素是:

_Rb_tree_color  _M_color; // 节点颜色
        _Base_ptr       _M_parent; // 父节点
        _Base_ptr       _M_left; // 左节点
        _Base_ptr       _M_right; // 右节点
        _Val            _M_value_field // 同list中节点技巧一致,后面是实际的元素

同list中的实现一致,map本身作为一个节点,其不是一个存储数据的节点,

_Rb_tree::end

iterator
      end()
      { return iterator(static_cast<_Link_type>(&this->_M_impl._M_header)); }

由于节点值在_Rb_tree_node_base后,所以任意时候拿到节点就可以偏移这个结构体拿到节点值,节点的值是一个pair,包含了key和value。

在gdb中打印以下map的内容:

size_t ds_map() {
        std::map<std::string, int> imap;
        imap["abc"] = 0xbbb;
        return imap.size();
    }
(gdb) p/x &imap
$7 = 0x7fffffffe370
(gdb) x/1a (char*)&imap+24       # _M_left 真正的节点
0x7fffffffe388: 0x606040          
(gdb) x/1xw 0x606040+32+8        # 偏移32字节是节点值的地址,再偏移8则是value的地址
0x606068:       0x00000bbb
(gdb) p *(char**)(0x606040+32)   # 偏移32字节是string的地址
$8 = 0x606028 "abc"

或者很多时候没有必要这么装逼+蛋疼:

(gdb) p *(char**)(imap._M_t._M_impl._M_header._M_left+1)
$9 = 0x606028 "abc"
(gdb) x/1xw (char*)(imap._M_t._M_impl._M_header._M_left+1)+8
0x606068:       0x00000bbb

posted @ 2014-12-03 22:08 Kevin Lynx 阅读(3219) | 评论 (2)编辑 收藏

2014年11月4日