posts - 200, comments - 8, trackbacks - 0, articles - 0

在Linux中,系统调用是用户空间访问内核的唯一手段,它们是内核唯一的合法入口。实际上,其他的像设备文件和/proc之类的方式,最终也还是要通过系统调用进行的。

       一般情况下,应用程序通过应用编程接口(API)而不是直接通过系统调用来编程,而且这种编程接口实际上并不需要和内核提供的系统调用对应。一个API定义了一组应用程序使用的编程接口。它们可以实现成一个系统调用,也可以通过调用多个系统调用来实现,即使不使用任何系统调用也不存在问题。实际上,API可以在各种不同的操作系统上实现,给应用程序提供完全相同的接口,而它们本身在这些系统上的实现却可能迥异。

       在Unix世界中,最流行的应用编程接口是基于POSIX标准的,Linux是与POSIX兼容的。

       从程序员的角度看,他们只需要给API打交道就可以了,而内核只跟系统调用打交道;库函数及应用程序是怎么使用系统调用不是内核关心的。

       系统调用(在linux中常称作syscalls)通常通过函数进行调用。它们通常都需要定义一个或几个参数(输入)而且可能产生一些副作用。这些副作用通过一个long类型的返回值来表示成功(0值)或者错误(负值)。在系统调用出现错误的时候会把错误码写入errno全局变量。通过调用perror()函数,可以把该变量翻译成用户可以理解的错误字符串。

       系统调用的实现有两个特别之处:1)函数声明中都有asmlinkage限定词,用于通知编译器仅从栈中提取该函数的参数。2)系统调用getXXX()在内核中被定义为sys_getXXX()。这是Linux中所有系统调用都应该遵守的命名规则。

       系统调用号:在linux中,每个系统调用都赋予一个系统调用号,通过这个独一无二的号就可以关联系统调用。当用户空间的进程执行一个系统调用的时候,这个系统调用号就被用来指明到底要执行哪个系统调用;进程不会提及系统调用的名称。系统调用号一旦分配就不能再有任何变更(否则编译好的应用程序就会崩溃),如果一个系统调用被删除,它所占用的系统调用号也不允许被回收利用。Linux有一个"未使用"系统调用sys_ni_syscall(),它除了返回-ENOSYS外不做任何其他工作,这个错误号就是专门针对无效的系统调用而设的。虽然很罕见,但如果有一个系统调用被删除,这个函数就要负责“填补空位”。

       内核记录了系统调用表中所有已注册过的系统调用的列表,存储在sys_call_table中。它与体系结构有关,一般在entry.s中定义。这个表中为每一个有效的系统调用指定了唯一的系统调用号。

       用户空间的程序无法直接执行内核代码。它们不能直接调用内核空间的函数,因为内核驻留在受保护的地址空间上,应用程序应该以某种方式通知系统,告诉内核自己需要执行一个系统调用,系统系统切换到内核态,这样内核就可以代表应用程序来执行该系统调用了。这种通知内核的机制是通过软中断实现的。x86系统上的软中断由int$0x80指令产生。这条指令会触发一个异常导致系统切换到内核态并执行第128号异常处理程序,而该程序正是系统调用处理程序,名字叫system_call().它与硬件体系结构紧密相关,通常在entry.s文件中通过汇编语言编写。

       所有的系统调用陷入内核的方式都是一样的,所以仅仅是陷入内核空间是不够的。因此必须把系统调用号一并传给内核。在x86上,这个传递动作是通过在触发软中断前把调用号装入eax寄存器实现的。这样系统调用处理程序一旦运行,就可以从eax中得到数据。上述所说的system_call()通过将给定的系统调用号与NR_syscalls做比较来检查其有效性。如果它大于或者等于NR_syscalls,该函数就返回-ENOSYS.否则,就执行相应的系统调用:call *sys_call_table(, %eax, 4);

       由于系统调用表中的表项是以32位(4字节)类型存放的,所以内核需要将给定的系统调用号乘以4,然后用所得到的结果在该表中查询器位置。如图图一所示:

                                      结构 

     上面已经提到,除了系统调用号以外,还需要一些外部的参数输入。最简单的办法就是像传递系统调用号一样把这些参数也存放在寄存器里。在x86系统上ebx,ecx,edx,esi和edi按照顺序存放前5个参数。需要六个或六个以上参数的情况不多见,此时,应该用一个单独的寄存器存放指向所有这些参数在用户空间地址的指针。给用户空间的返回值也通过寄存器传递。在x86系统上,它存放在eax寄存器中。

       系统调用必须仔细检查它们所有的参数是否合法有效。系统调用在内核空间执行。如果任由用户将不合法的输入传递给内核,那么系统的安全和稳定将面临极大的考验。最重要的一种检查就是检查用户提供的指针是否有效,内核在接收一个用户空间的指针之前,内核必须要保证:

1)指针指向的内存区域属于用户空间
2)指针指向的内存区域在进程的地址空间里
3)如果是读,读内存应该标记为可读。如果是写,该内存应该标记为可写。

       内核提供了两种方法来完成必须的检查和内核空间与用户空间之间数据的来回拷贝。这两个方法必须有一个被调用。

copy_to_user():向用户空间写入数据,需要3个参数。第一个参数是进程空间中的目的内存地址。第二个是内核空间内的源地址
                     .第三个是需要拷贝的数据长度(字节数)。
copy_from_user():向用户空间读取数据,需要3个参数。第一个参数是进程空间中的目的内存地址。第二个是内核空间内的源地
                     址.第三个是需要拷贝的数据长度(字节数)。
注意:这两个都有可能引起阻塞。当包含用户数据的页被换出到硬盘上而不是在物理内存上的时候,这种情况就会发生。此时,进程就会休眠,直到缺页处理程序将该页从硬盘重新换回到物理内存。

       内核在执行系统调用的时候处于进程上下文,current指针指向当前任务,即引发系统调用的那个进程。在进程上下文中,内核可以休眠(比如在系统调用阻塞或显式调用schedule()的时候)并且可以被抢占。当系统调用返回的时候,控制权仍然在system_call()中,它最终会负责切换到用户空间并让用户进程继续执行下去。

       给linux添加一个系统调用时间很简单的事情,怎么设计和实现一个系统调用是难题所在。实现系统调用的第一步是决定它的用途,这个用途是明确且唯一的,不要尝试编写多用途的系统调用。ioctl则是一个反面教材。新系统调用的参数,返回值和错误码该是什么,这些都很关键。一旦一个系统调用编写完成后,把它注册成为一个正式的系统调用是件琐碎的工作,一般下面几步:

1)在系统调用表(一般位于entry.s)的最后加入一个表项。从0开始算起,系统表项在该表中的位置就是它的系统调用号。如第
   10个系统调用分配到系统调用号为9
2)任何体系结构,系统调用号都必须定义于include/asm/unistd.h中
3)系统调用必须被编译进内核映像(不能编译成模块)。这只要把它放进kernel/下的一个相关文件就可以。

       通常,系统调用靠C库支持,用户程序通过包含标准头文件并和C库链接,就可以使用系统调用(或者使用库函数,再由库函数实际调用)。庆幸的是linux本身提供了一组宏用于直接对系统调用进行访问。它会设置好寄存器并调用int $0x80指令。这些宏是_syscalln(),其中n的范围是从0到6.代表需要传递给系统调用的参数个数。这是由于该宏必须了解到底有多少参数按照什么次序压入寄存器。以open系统调用为例:

open()系统调用定义如下是:
long open(const char *filename, int flags, int mode)
直接调用此系统调用的宏的形式为:
#define NR_open 5
_syscall3(long, open, const char *, filename, int , flags, int, mode)

    这样,应用程序就可以直接使用open().调用open()系统调用直接把上面的宏放置在应用程序中就可以了。对于每个宏来说,都有2+2*n个参数。每个参数的意义简单明了,这里就不详细说明了。

posted @ 2012-10-22 20:03 鑫龙 阅读(280) | 评论 (0)编辑 收藏

list_for_each()的定义:


  1. /**  
  2.  * list_for_each    -   iterate over a list  
  3.  * @pos:    the &struct list_head to use as a loop counter.  
  4.  * @head:   the head for your list.  
  5.  */  
  6. #define list_for_each(pos, head) \  
  7.     for (pos = (head)->next, prefetch(pos->next); pos != (head); \  
  8.         pos = pos->next, prefetch(pos->next))  

list_for_each_safe()的定义:

  1. /**  
  2.  * list_for_each_safe   -   iterate over a list safe against removal of list entry  
  3.  * @pos:    the &struct list_head to use as a loop counter.  
  4.  * @n:      another &struct list_head to use as temporary storage  
  5.  * @head:   the head for your list.  
  6.  */  
  7. #define list_for_each_safe(pos, n, head) \  
  8.     for (pos = (head)->next, n = pos->next; pos != (head); \  
  9.         pos = n, n = pos->next)  

由上面两个对比来看,list_for_each_safe()函数比list_for_each()多了一个中间变量n

 

当在遍历的过程中需要删除结点时,来看一下会出现什么情况:

list_for_each():list_del(pos)将pos的前后指针指向undefined state,导致kernel panic,另如果list_del_init(pos)将pos前后指针指向自身,导致死循环。

list_for_each_safe():首先将pos的后指针缓存到n,处理一个流程后再赋回pos,避免了这种情况发生。


因此之遍历链表不删除结点时,可以使用list_for_each(),而当由删除结点操作时,则要使用list_for_each_safe()。

其他带safe的处理也是基于这个原因。

 

posted @ 2012-10-22 10:45 鑫龙 阅读(960) | 评论 (0)编辑 收藏

一、 链表数据结构简介 

链表是一种常用的组织有序数据的数据结构,它通过指针将一系列数据节点连接成一条数据链,是线性表的一种重要实现方式。相对于数组,链表具有更好的动态性,建立链表时无需预先知道数据总量,可以随机分配空间,可以高效地在链表中的任意位置实时插入或删除数据。链表的开销主要是访问的顺序性和组织链的空间损失。

通常链表数据结构至少应包含两个域:数据域和指针域,数据域用于存储数据,指针域用于建立与下一个节点的联系。按照指针域的组织以及各个节点之间的联系形式,链表又可以分为单链表、双链表、循环链表等多种类型,下面分别给出这几类常见链表类型的示意图:

1. 单链表


图1 单链表
图1 单链表

单链表是最简单的一类链表,它的特点是仅有一个指针域指向后继节点(next),因此,对单链表的遍历只能从头至尾(通常是NULL空指针)顺序进行。

2. 双链表


图2 双链表
图2 双链表

通过设计前驱和后继两个指针域,双链表可以从两个方向遍历,这是它区别于单链表的地方。如果打乱前驱、后继的依赖关系,就可以构成"二叉树";如果再让首节点的前驱指向链表尾节点、尾节点的后继指向首节点(如图2中虚线部分),就构成了循环链表;如果设计更多的指针域,就可以构成各种复杂的树状数据结构。

3. 循环链表

循环链表的特点是尾节点的后继指向首节点。前面已经给出了双循环链表的示意图,它的特点是从任意一个节点出发,沿两个方向的任何一个,都能找到链表中的任意一个数据。如果去掉前驱指针,就是单循环链表。

在Linux内核中使用了大量的链表结构来组织数据,包括设备列表以及各种功能模块中的数据组织。这些链表大多采用在[include/linux/list.h]实现的一个相当精彩的链表数据结构。本文的后继部分就将通过示例详细介绍这一数据结构的组织和使用。


二、 Linux 2.6内核链表数据结构的实现

尽管这里使用2.6内核作为讲解的基础,但实际上2.4内核中的链表结构和2.6并没有什么区别。不同之处在于2.6扩充了两种链表数据结构:链表的读拷贝更新(rcu)和HASH链表(hlist)。这两种扩展都是基于最基本的list结构,因此,本文主要介绍基本链表结构,然后再简要介绍一下rcu和hlist。

链表数据结构的定义很简单(节选自[include/linux/list.h],以下所有代码,除非加以说明,其余均取自该文件):

struct list_head {
	struct list_head *next, *prev;
};

list_head结构包含两个指向list_head结构的指针prev和next,由此可见,内核的链表具备双链表功能,实际上,通常它都组织成双循环链表。

和第一节介绍的双链表结构模型不同,这里的list_head没有数据域。在Linux内核链表中,不是在链表结构中包含数据,而是在数据结构中包含链表节点。

在数据结构课本中,链表的经典定义方式通常是这样的(以单链表为例):

struct list_node {
	struct list_node *next;
	ElemType	data;
};

因为ElemType的缘故,对每一种数据项类型都需要定义各自的链表结构。有经验的C++程序员应该知道,标准模板库中的<list>采用的是C++ Template,利用模板抽象出和数据项类型无关的链表操作接口。

在Linux内核链表中,需要用链表组织起来的数据通常会包含一个struct list_head成员,例如在[include/linux/netfilter.h]中定义了一个nf_sockopt_ops结构来描述Netfilter为某一协议族准备的getsockopt/setsockopt接口,其中就有一个(struct list_head list)成员,各个协议族的nf_sockopt_ops结构都通过这个list成员组织在一个链表中,表头是定义在[net/core/netfilter.c]中的nf_sockopts(struct list_head)。从下图中我们可以看到,这种通用的链表结构避免了为每个数据项类型定义自己的链表的麻烦。Linux的简捷实用、不求完美和标准的风格,在这里体现得相当充分。


图3 nf_sockopts链表示意图
图3 nf_sockopts链表示意图

三、 链表操作接口

1. 声明和初始化

实际上Linux只定义了链表节点,并没有专门定义链表头,那么一个链表结构是如何建立起来的呢?让我们来看看LIST_HEAD()这个宏:

#define LIST_HEAD_INIT(name) { &(name), &(name) }
#define LIST_HEAD(name) struct list_head name = LIST_HEAD_INIT(name)

当我们用LIST_HEAD(nf_sockopts)声明一个名为nf_sockopts的链表头时,它的next、prev指针都初始化为指向自己,这样,我们就有了一个空链表,因为Linux用头指针的next是否指向自己来判断链表是否为空:

static inline int list_empty(const struct list_head *head)
{
		return head->next == head;
}

除了用LIST_HEAD()宏在声明的时候初始化一个链表以外,Linux还提供了一个INIT_LIST_HEAD宏用于运行时初始化链表:

#define INIT_LIST_HEAD(ptr) do { \
	(ptr)->next = (ptr); (ptr)->prev = (ptr); \
} while (0)

我们用INIT_LIST_HEAD(&nf_sockopts)来使用它。

2. 插入/删除/合并

a) 插入

对链表的插入操作有两种:在表头插入和在表尾插入。Linux为此提供了两个接口:

static inline void list_add(struct list_head *new, struct list_head *head);
static inline void list_add_tail(struct list_head *new, struct list_head *head);

因为Linux链表是循环表,且表头的next、prev分别指向链表中的第一个和最末一个节点,所以,list_add和list_add_tail的区别并不大,实际上,Linux分别用

__list_add(new, head, head->next);

__list_add(new, head->prev, head);

来实现两个接口,可见,在表头插入是插入在head之后,而在表尾插入是插入在head->prev之后。

假设有一个新nf_sockopt_ops结构变量new_sockopt需要添加到nf_sockopts链表头,我们应当这样操作:

list_add(&new_sockopt.list, &nf_sockopts);

从这里我们看出,nf_sockopts链表中记录的并不是new_sockopt的地址,而是其中的list元素的地址。如何通过链表访问到new_sockopt呢?下面会有详细介绍。

b) 删除

static inline void list_del(struct list_head *entry);

当我们需要删除nf_sockopts链表中添加的new_sockopt项时,我们这么操作:

list_del(&new_sockopt.list);

被剔除下来的new_sockopt.list,prev、next指针分别被设为LIST_POSITION2和LIST_POSITION1两个特殊值,这样设置是为了保证不在链表中的节点项不可访问--对LIST_POSITION1和LIST_POSITION2的访问都将引起页故障。与之相对应,list_del_init()函数将节点从链表中解下来之后,调用LIST_INIT_HEAD()将节点置为空链状态。

c) 搬移

Linux提供了将原本属于一个链表的节点移动到另一个链表的操作,并根据插入到新链表的位置分为两类:

static inline void list_move(struct list_head *list, struct list_head *head);
static inline void list_move_tail(struct list_head *list, struct list_head *head);

例如list_move(&new_sockopt.list,&nf_sockopts)会把new_sockopt从它所在的链表上删除,并将其再链入nf_sockopts的表头。

d) 合并

除了针对节点的插入、删除操作,Linux链表还提供了整个链表的插入功能:

static inline void list_splice(struct list_head *list, struct list_head *head);

假设当前有两个链表,表头分别是list1和list2(都是struct list_head变量),当调用list_splice(&list1,&list2)时,只要list1非空,list1链表的内容将被挂接在list2链表上,位于list2和list2.next(原list2表的第一个节点)之间。新list2链表将以原list1表的第一个节点为首节点,而尾节点不变。如图(虚箭头为next指针):


图4 链表合并list_splice(&list1,&list2)
图4 链表合并list_splice(&list1,&list2)

当list1被挂接到list2之后,作为原表头指针的list1的next、prev仍然指向原来的节点,为了避免引起混乱,Linux提供了一个list_splice_init()函数:

static inline void list_splice_init(struct list_head *list, struct list_head *head);
	

该函数在将list合并到head链表的基础上,调用INIT_LIST_HEAD(list)将list设置为空链。

3. 遍历

遍历是链表最经常的操作之一,为了方便核心应用遍历链表,Linux链表将遍历操作抽象成几个宏。在介绍遍历宏之前,我们先看看如何从链表中访问到我们真正需要的数据项。

a) 由链表节点到数据项变量

我们知道,Linux链表中仅保存了数据项结构中list_head成员变量的地址,那么我们如何通过这个list_head成员访问到作为它的所有者的节点数据呢?Linux为此提供了一个list_entry(ptr,type,member)宏,其中ptr是指向该数据中list_head成员的指针,也就是存储在链表中的地址值,type是数据项的类型,member则是数据项类型定义中list_head成员的变量名,例如,我们要访问nf_sockopts链表中首个nf_sockopt_ops变量,则如此调用:

list_entry(nf_sockopts->next, struct nf_sockopt_ops, list);

这里"list"正是nf_sockopt_ops结构中定义的用于链表操作的节点成员变量名。

list_entry的使用相当简单,相比之下,它的实现则有一些难懂:

#define list_entry(ptr, type, member) container_of(ptr, type, member)
container_of宏定义在[include/linux/kernel.h]中:
#define container_of(ptr, type, member) ({			\
        const typeof( ((type *)0)->member ) *__mptr = (ptr);	\
        (type *)( (char *)__mptr - offsetof(type,member) );})
offsetof宏定义在[include/linux/stddef.h]中:
#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)

size_t最终定义为unsigned int(i386)。

这里使用的是一个利用编译器技术的小技巧,即先求得结构成员在与结构中的偏移量,然后根据成员变量的地址反过来得出属主结构变量的地址。

container_of()和offsetof()并不仅用于链表操作,这里最有趣的地方是((type *)0)->member,它将0地址强制"转换"为type结构的指针,再访问到type结构中的member成员。在container_of宏中,它用来给typeof()提供参数(typeof()是gcc的扩展,和sizeof()类似),以获得member成员的数据类型;在offsetof()中,这个member成员的地址实际上就是type数据结构中member成员相对于结构变量的偏移量。

如果这么说还不好理解的话,不妨看看下面这张图:


图5 offsetof()宏的原理
图5 offsetof()宏的原理

对于给定一个结构,offsetof(type,member)是一个常量,list_entry()正是利用这个不变的偏移量来求得链表数据项的变量地址。

b) 遍历宏

在[net/core/netfilter.c]的nf_register_sockopt()函数中有这么一段话:

		……
struct list_head *i;
……
	list_for_each(i, &nf_sockopts) {
		struct nf_sockopt_ops *ops = (struct nf_sockopt_ops *)i;
		……
	}
	……
	

函数首先定义一个(struct list_head *)指针变量i,然后调用list_for_each(i,&nf_sockopts)进行遍历。在[include/linux/list.h]中,list_for_each()宏是这么定义的:

        	#define list_for_each(pos, head) \
	for (pos = (head)->next, prefetch(pos->next); pos != (head); \
        	pos = pos->next, prefetch(pos->next))
        	

它实际上是一个for循环,利用传入的pos作为循环变量,从表头head开始,逐项向后(next方向)移动pos,直至又回到head(prefetch()可以不考虑,用于预取以提高遍历速度)。

那么在nf_register_sockopt()中实际上就是遍历nf_sockopts链表。为什么能直接将获得的list_head成员变量地址当成struct nf_sockopt_ops数据项变量的地址呢?我们注意到在struct nf_sockopt_ops结构中,list是其中的第一项成员,因此,它的地址也就是结构变量的地址。更规范的获得数据变量地址的用法应该是:

struct nf_sockopt_ops *ops = list_entry(i, struct nf_sockopt_ops, list);

大多数情况下,遍历链表的时候都需要获得链表节点数据项,也就是说list_for_each()和list_entry()总是同时使用。对此Linux给出了一个list_for_each_entry()宏:

#define list_for_each_entry(pos, head, member)		……

与list_for_each()不同,这里的pos是数据项结构指针类型,而不是(struct list_head *)。nf_register_sockopt()函数可以利用这个宏而设计得更简单:

……
struct nf_sockopt_ops *ops;
list_for_each_entry(ops,&nf_sockopts,list){
	……
}
……

某些应用需要反向遍历链表,Linux提供了list_for_each_prev()和list_for_each_entry_reverse()来完成这一操作,使用方法和上面介绍的list_for_each()、list_for_each_entry()完全相同。

如果遍历不是从链表头开始,而是从已知的某个节点pos开始,则可以使用list_for_each_entry_continue(pos,head,member)。有时还会出现这种需求,即经过一系列计算后,如果pos有值,则从pos开始遍历,如果没有,则从链表头开始,为此,Linux专门提供了一个list_prepare_entry(pos,head,member)宏,将它的返回值作为list_for_each_entry_continue()的pos参数,就可以满足这一要求。

4. 安全性考虑

在并发执行的环境下,链表操作通常都应该考虑同步安全性问题,为了方便,Linux将这一操作留给应用自己处理。Linux链表自己考虑的安全性主要有两个方面:

a) list_empty()判断

基本的list_empty()仅以头指针的next是否指向自己来判断链表是否为空,Linux链表另行提供了一个list_empty_careful()宏,它同时判断头指针的next和prev,仅当两者都指向自己时才返回真。这主要是为了应付另一个cpu正在处理同一个链表而造成next、prev不一致的情况。但代码注释也承认,这一安全保障能力有限:除非其他cpu的链表操作只有list_del_init(),否则仍然不能保证安全,也就是说,还是需要加锁保护。

b) 遍历时节点删除

前面介绍了用于链表遍历的几个宏,它们都是通过移动pos指针来达到遍历的目的。但如果遍历的操作中包含删除pos指针所指向的节点,pos指针的移动就会被中断,因为list_del(pos)将把pos的next、prev置成LIST_POSITION2和LIST_POSITION1的特殊值。

当然,调用者完全可以自己缓存next指针使遍历操作能够连贯起来,但为了编程的一致性,Linux链表仍然提供了两个对应于基本遍历操作的"_safe"接口:list_for_each_safe(pos, n, head)、list_for_each_entry_safe(pos, n, head, member),它们要求调用者另外提供一个与pos同类型的指针n,在for循环中暂存pos下一个节点的地址,避免因pos节点被释放而造成的断链。


四、 扩展

1. hlist


图6 list和hlist
图6 list和hlist

精益求精的Linux链表设计者(因为list.h没有署名,所以很可能就是Linus Torvalds)认为双头(next、prev)的双链表对于HASH表来说"过于浪费",因而另行设计了一套用于HASH表应用的hlist数据结构--单指针表头双循环链表,从上图可以看出,hlist的表头仅有一个指向首节点的指针,而没有指向尾节点的指针,这样在可能是海量的HASH表中存储的表头就能减少一半的空间消耗。

因为表头和节点的数据结构不同,插入操作如果发生在表头和首节点之间,以往的方法就行不通了:表头的first指针必须修改指向新插入的节点,却不能使用类似list_add()这样统一的描述。为此,hlist节点的prev不再是指向前一个节点的指针,而是指向前一个节点(可能是表头)中的next(对于表头则是first)指针(struct list_head **pprev),从而在表头插入的操作可以通过一致的"*(node->pprev)"访问和修改前驱节点的next(或first)指针。

2. read-copy update

在Linux链表功能接口中还有一系列以"_rcu"结尾的宏,与以上介绍的很多函数一一对应。RCU(Read-Copy Update)是2.5/2.6内核中引入的新技术,它通过延迟写操作来提高同步性能。

我们知道,系统中数据读取操作远多于写操作,而rwlock机制在smp环境下随着处理机增多性能会迅速下降(见参考资料4)。针对这一应用背景,IBM Linux技术中心的Paul E. McKenney提出了"读拷贝更新"的技术,并将其应用于Linux内核中。RCU技术的核心是写操作分为写-更新两步,允许读操作在任何时候无阻访问,当系统有写操作时,更新动作一直延迟到对该数据的所有读操作完成为止。Linux链表中的RCU功能只是Linux RCU的很小一部分,对于RCU的实现分析已超出了本文所及,有兴趣的读者可以自行参阅本文的参考资料;而对RCU链表的使用和基本链表的使用方法基本相同。



五、 示例

附件中的程序除了能正向、反向输出文件以外,并无实际作用,仅用于演示Linux链表的使用。

为了简便,例子采用的是用户态程序模板,如果需要运行,可采用如下命令编译:

gcc -D__KERNEL__ -I/usr/src/linux-2.6.7/include pfile.c -o pfile

因为内核链表限制在内核态使用,但实际上对于数据结构本身而言并非只能在核态运行,因此,在笔者的编译中使用"-D__KERNEL__"开关"欺骗"编译器。


参考资料

  1. 维基百科 http://zh.wikipedia.org,一个在GNU Documentation License下发布的网络辞典,自由软件理念的延伸,本文的"链表"概念即使用它的版本。
  2. 《Linux内核情景分析》,毛德操先生的这本关于Linux内核的巨著几乎可以回答绝大部分关于内核的问题,其中也包括内核链表的几个关键数据结构。
  3. Linux内核2.6.7源代码,所有不明白的问题,只要潜心看代码,总能清楚。
  4. Kernel Korner: Using RCU in the Linux 2.5 Kernel,RCU主要开发者Paul McKenney 2003年10月发表于Linux Journal上的一篇介绍RCU的文章。在 http://www.rdrop.com/users/paulmck/rclock/上可以获得更多关于RCU的帮助。

关于作者

杨沙洲,目前在国防科技大学计算机学院攻读软件方向博士学位。对文中存在的技术问题,欢迎向 pubb@163.net质疑。

posted @ 2012-10-22 10:31 鑫龙 阅读(335) | 评论 (0)编辑 收藏

1.概述
      CFS(completely fair schedule)是最终被内核采纳的调度器。它从RSDL/SD中吸取了完全公平的思想,不再跟踪进程的睡眠时间,也不再企图区分交互式进程。它将所有的进程都统一对待,这就是公平的含义。CFS的算法和实现都相当简单,众多的测试表明其性能也非常优越。

      CFS 背后的主要想法是维护为任务提供处理器时间方面的平衡(公平性)。这意味着应给进程分配相当数量的处理器。分给某个任务的时间失去平衡时(意味着一个或多个任务相对于其他任务而言未被给予相当数量的时间),应给失去平衡的任务分配时间,让其执行。 

      CFS抛弃了时间片,抛弃了复杂的算法,从一个新的起点开始了调度器的新时代,最开始的2.6.23版本,CFS提供一个虚拟的时钟,所有进程复用这个虚拟时钟的时间,CFS将时钟的概念从底层体系相关的硬件中抽象出来,进程调度模块直接和这个虚拟的时钟接口 而不必再为硬件时钟操作而操心,如此一来,整个进程调度模块就完整了,从时钟到调度算法,到不同进程的不同策略,全部都由虚拟系统提供,也正是在这个新的内核,引入了调度类。因此新的调度器就是不同特性的进程在统一的虚拟时钟下按照不同的策略被调度。

      按照作者Ingo Molnar的说法:"CFS百分之八十的工作可以用一句话概括:CFS在真实的硬件上模拟了完全理想的多任务处理器"。在“完全理想的多任务处理器 “下,每个进程都能同时获得CPU的执行时间。当系统中有两个进程时,CPU的计算时间被分成两份,每个进程获得50%。然而在实际的硬件上,当一个进程占用CPU时,其它进程就必须等待。这就产生了不公平。

2.相关概念
调度实体(sched entiy):就是调度的对象,可以理解为进程。
虚拟运行时间(vruntime):即每个调度实体的运行时间。任务的虚拟运行时间越小, 意味着任务被允许访问服务器的时间越短 — 其对处理器的需求越高。
公平调度队列(cfs_rq):采取公平调度的调度实体的运行队列。

3.CFS的核心思想
      全公平调度器(CFS)的设计思想是:在一个真实的硬件上模型化一个理想的、精确的多任务CPU。该理想CPU模型运行在100%的负荷、在精确 平等速度下并行运行每个任务,每个任务运行在1/n速度下,即理想CPU有n个任务运行,每个任务的速度为CPU整个负荷的1/n。
      由于真实硬件上,每次只能运行一个任务,这就得引入"虚拟运行时间"(virtual runtime)的概念,虚拟运行时间为一个任务在理想CPU模型上执行的下一个时间片(timeslice)。实际上,一个任务的虚拟运行时间为考虑到运行任务总数的实际运行时间。 

      CFS 背后的主要想法是维护为任务提供处理器时间方面的平衡(公平性)。CFS为了体现的公平表现在2个方面
(1)进程的运行时间相等
      CFS 在叫做虚拟运行时 的地方维持提供给某个任务的时间量。任务的虚拟运行时越小, 意味着任务被允许访问服务器的时间越短 — 其对处理器的需求越高。
            假设runqueue中有n个进程,当前进程运行了 10ms。在“完全理想的多任务处理器”中,10ms应该平分给n个进程(不考虑各个进程的nice值),因此当前进程应得的时间是(10/n)ms,但 是它却运行了10ms。所以CFS将惩罚当前进程,使其它进程能够在下次调度时尽可能取代当前进程。最终实现所有进程的公平调度。

(2)睡眠的进程进行补偿
      CFS 还包含睡眠公平概念以便确保那些目前没有运行的任务(例如,等待 I/O)在其最终需要时获得相当份额的处理器。 

      CFS调度器的运行时间是O(logN),而以前的调度器的运行时间是O(1),这是不是就是说CFS的效率比O(1)的更差呢?
      答案并不是那样,我们知道 CFS调度器下的运行队列是基于红黑树组织的,找出下一个进程就是截下左下角的节点,固定时间完成,所谓的O(logN)指的是插入时间,可是红黑树的统 计性能是不错的,没有多大概率真的用得了那么多时间,因为红节点和黑节点的特殊排列方式既保证了树的一定程度的平衡,又不至于花太多的时间来维持这种平 衡,插入操作大多数情况下都可以很快的完成,特别是对于组织得相当好的数据。

4.CFS的实现
4.1 2.6.23 VS 2.6.25
      在2.6.23内核中,刚刚实现的CFS调度器显得很淳朴,每次的时钟滴答中都会将当前进程先出队,推进其虚拟时钟和系统虚拟时钟后再入队,然后判断红黑 树的左下角的进程是否还是当前进程而抉择是否要调度,这种调度器的key的计算是用当前的虚拟时钟减去待计算进程的等待时间,如果该计算进程在运行,那么其等待时间就是负值,这样,等待越长的进程key越小,从而越容易被选中投入运行;
      在2.6.25内核以后实现了一种更为简单的方式,就是设置一个运行队列的虚拟时钟,它单调增长并且跟踪该队列的最小虚拟时钟的进程,key值由进程的vruntime和队列的虚拟时钟的差值计算,这种方式就是真正的追赶, 比2.6.23实现的简单,但是很巧妙,不必在每次时钟滴答中都将当前进程出队,入队,而是根据当前进程实际运行的时间和理想应该运行的时间判断是否应该调度。

4.2红黑树
      与之前的 Linux 调度器不同,它没有将任务维护在运行队列中,CFS 维护了一个以时间为顺序的红黑树(参见下图)。 红黑树 是一个树,具有很多有趣、有用的属性。首先,它是自平衡的,这意味着树上没有路径比任何其他路径长两倍以上。 第二,树上的运行按 O(log n) 时间发生(其中 n 是树中节点的数量)。这意味着您可以快速高效地插入或删除任务。 


      任务存储在以时间为顺序的红黑树中(由 sched_entity 对象表示),对处理器需求最多的任务 (最低虚拟运行时)存储在树的左侧,处理器需求最少的任务(最高虚拟运行时)存储在树的右侧。 为了公平,调度器先选取红黑树最左端的节点调度为下一个以便保持公平性。任务通过将其运行时间添加到虚拟运行时, 说明其占用 CPU 的时间,然后如果可运行,再插回到树中。这样,树左侧的任务就被给予时间运行了,树的内容从右侧迁移到左侧以保持公平。 因此,每个可运行的任务都会追赶其他任务以维持整个可运行任务集合的执行平衡。 

4.3 CFS内部原理
      Linux 内的所有任务都由称为 task_struct 的任务结构表示。该结构完整地描述了任务并包括了任务的当前状态、其堆栈、进程标识、优先级(静态和动态)等等。您可以在 ./linux/include/linux/sched.h 中找到这些内容以及相关结构。 但是因为不是所有任务都是可运行的,您在 task_struct 中不会发现任何与 CFS 相关的字段。 相反,会创建一个名为 sched_entity 的新结构来跟踪调度信息(参见下图)。



      树的根通过 rb_root 元素通过 cfs_rq 结构(在 ./kernel/sched.c 中)引用。红黑树的叶子不包含信息,但是内部节点代表一个或多个可运行的任务。红黑树的每个节点都由 rb_node 表示,它只包含子引用和父对象的颜色。 rb_node 包含在 sched_entity 结构中,该结构包含 rb_node 引用、负载权重以及各种统计数据。最重要的是, sched_entity 包含 vruntime(64 位字段),它表示任务运行的时间量,并作为红黑树的索引。 最后,task_struct 位于顶端,它完整地描述任务并包含 sched_entity 结构。 

      CFS 调度函数非常简单。 在 ./kernel/sched.c 中的 schedule() 函数中,它会先抢占当前运行任务(除非它通过 yield() 代码先抢占自己)。注意 CFS 没有真正的时间切片概念用于抢占,因为抢占时间是可变的。 当前运行任务(现在被抢占的任务)通过对 put_prev_task 调用(通过调度类)返回到红黑树。 当 schedule 函数开始确定下一个要调度的任务时,它会调用 pick_next_task 函数。此函数也是通用的(在 ./kernel/sched.c 中),但它会通过调度器类调用 CFS 调度器。 CFS 中的 pick_next_task 函数可以在 ./kernel/sched_fair.c(称为 pick_next_task_fair())中找到。 此函数只是从红黑树中获取最左端的任务并返回相关 sched_entity。通过此引用,一个简单的 task_of() 调用确定返回的 task_struct 引用。通用调度器最后为此任务提供处理器。

4.4 CFS的优先级
      CFS 不直接使用优先级而是将其用作允许任务执行的时间的衰减系数。 低优先级任务具有更高的衰减系数,而高优先级任务具有较低的衰减系数。 这意味着与高优先级任务相比,低优先级任务允许任务执行的时间消耗得更快。 这是一个绝妙的解决方案,可以避免维护按优先级调度的运行队列。

posted @ 2012-10-16 16:30 鑫龙 阅读(649) | 评论 (0)编辑 收藏

进程在退出时,必须释放它所拥有的资源,并通过某种方式告诉父进程。进程的退出一般是显示或隐式地调用了eixt(),或者接受了某种信号。不过什么原因退出,最终都调用了do_exit。




用于进程退出的系统调用有两个exit和exit_group,exit只是终止某个进程,而exit_group整个线程中的进程。它们在内核中的服务函数分别为sys_exit和sys_exit_group,它们又分别调用了do_exit和do_group_exit。而do_group最终又调用了do_exit。

 

do_exit定义在kernel/exit.c中:

僵死进程:僵死进程是一个进程已经退出,它的内存和资源已经释放掉了,但是位了时系统在它退出后能够获得它的退出状态等信息,它的进程描述符仍然保留。

一个进程退出时,它的父进程会接收到一个SIGCHLD信号,一般情况下这个信号的处理函数会执行wait系列函数等待子进程的结束。从子进程退出到父进程调用wait(子进程结束)的这段时间,子进程称为僵死进程。执行ps –ef命令以“z”结尾的为僵死进程。

 

僵死进程很特殊,它没有任何可执行代码,不会被调度,只有一个进程描述符用来记录退出等状态,除此之外不再占用其他任何资源。

 

如果僵死进程的父进程没有调用wait,则该进程会一直处于僵死状态。如果父进程结束,内核会在当前线程组里为其找一个父进程,如果没找到则把init作为其父进程,此时新的父进程将负责清楚其进程。如果父进程一直不结束,该进程会一直僵死。在root下用kill -9 也不能将其杀死。


下面只对do_exit重点地方解析下: 


  1. struct task_struct *tsk = current;//获取当前要释放进程的进程描述符   


    exit_signals(tsk);  /* sets PF_EXITING 以免内和其他部分访问该进程*/   

    exit_mm(tsk);  
  2.   
  3.     if (group_dead)  
  4.         acct_process();  
  5.     trace_sched_process_exit(tsk);  
  6.   
  7.     exit_sem(tsk);  
  8.     exit_files(tsk);  
  9.     exit_fs(tsk);  
  10.     check_stack_usage();  

  11. /*更新父子关系,并告诉父进程正在退出*/  
  12.     exit_notify(tsk, group_dead);
  13. /*切换到其他进程*/  
  14.     schedule(); 
  15.     exit_thread();  


posted @ 2012-10-15 11:52 鑫龙 阅读(449) | 评论 (0)编辑 收藏

今天看到写时拷贝这个概念,当时一下没有理解,后来查看一些网上的资料,找到了这篇文章,里面的那份个小程序能够很好的说明进程创建写时拷贝的概念。怕以后找不到就转载了。嘿嘿。
下面是那篇文章的原文:

Linux进程创建,子进程对 父进程资源“写时拷贝”的证明     传统的fork()系统调用直接把所有的资源复制给新创建的进程。这种实现过于简单并且效率低下,因为它拷贝的数据或许可以共享(This approach is significantly na?ve and inefficient in that it copies much data that might otherwise be shared.)。更糟糕的是,如果新进程打算立即执行一个新的映像,那么所有的拷贝都将前功尽弃。
    Linux的fork()使用写时拷贝 (copy- on-write)页实现。写时拷贝是一种可以推迟甚至避免拷贝数据的技术。内核此 时并不复制整个进程的地址空间,而是让父子进程共享同一个地址空间。只用在需要写入的时候才会复制地址空间,从而使各个进行拥有各自的地址空间。也就是 说,资源的复制是在需要写入的时候才会进行,在此之前,只有以只读方式共享。这种技术使地址空间上的页的拷贝被推迟到实际发生写入的时候。在页根本不会被 写入的情况下---例如,fork()后立即执行exec(),地址空间就无需被复制了。fork()的实际开销就是复制父进程的页表以及给子进程创建一 个进程描述符。下列程序可证明写时拷贝:

#include <stdio.h>

#include <sched.h>

int data = 10;

int child_process()
{
    printf("Child process %d, data %dn",getpid(),data);
    data = 20;
    printf("Child process %d, data %dn",getpid(),data);
    while(1);
}

int main(int argc, char* argv[])
{
    if(fork()==0) {
      child_process();    
    }else{
        sleep(1);
        printf("Parent process %d, data %dn",getpid(), data);
        while(1);
    }
}
运行结果
Child process 6427, data 10
Child process 6427, data 20
Parent process 6426, data 10 

    第1个Child process 6427, data 10是因为子进程创建时task_struct的mm直接拷贝自parent的mm;第2个Child process 6427, data 20是因为子进程进行了“写时拷贝”,有了自己的dataa;第3个Parent process 6426, data 10输出10是因为子进程的data和父进程的data不是同一份。
    如果把上述程序改为:

#include <stdio.h>
#include <sched.h>
#include <stdlib.h>

int data = 10;

int child_process()
{
    printf("Child process %d, data %dn",getpid(),data);
    data = 20;
    printf("Child process %d, data %dn",getpid(),data);
    while(1);
}

int main(int argc, char* argv[])
{
    void **child_stack;
    child_stack = (void **) malloc(16384);
    clone(child_process, child_stack, CLONE_VM|CLONE_FILES|CLONE_SIGHAND, NULL);

    sleep(1);
    printf("Parent process %d, data %dn",getpid(), data);
    while(1);
}

运行结果将是
Child process 6443, data 10
Child process 6443, data 20
Parent process 6442, data 20

    由于使用了CLONE_VM创建进程,子进程的mm实际直接指向父进程的mm,所以data是同一份。改变父子进程的data都会互相看到。 

posted @ 2012-10-15 11:18 鑫龙 阅读(168) | 评论 (0)编辑 收藏

list_for_each遍历子进程方法,顺便分析下container_of宏的实现过程

Linux系统中的每个进程都有一个父进程(init进程除外);每个进程还有0个或多个子进程。在进程描述符中parent指针指向其父进程,还有一个名为children的子进程链表(父进程task_struct中的children相当于链表的表头)。
而我们可以使用list_for_each(/include/linux/list.h)来依次遍历访问子进程:
  1. struct task_struct *task;
  2. struct list_head *list;
  3. list_for_each(list, &current->children) {
  4.       task = list_entry(list, struct task_struct, sibling);
  5. }
其中task即为某个子进程的地址
首先需要说明一点task_struct中的children指针指向其某个子进程的进程描述符task_struct中children的地址,而非直接指向某个子进程的地址,也就是说子进程链表中存放的仅仅是各个task_struct成员children的地址。
我们查看源文件找到list_for_each的定义:
  1. #define list_for_each(pos, head) \
  2.         for (pos = (head)->next; prefetch(pos->next), pos != (head); \
  3.                 pos = pos->next)
从上可以看出list_for_each其实就是一个for循环,在网上看到prefetch()是一个预抓取的函数,我并不理解它(哪位大牛知道的讲下哦),不过这个对for()并没有多大的影响。for()实现的就是一个children链表的遍历,而由children的地址如何取到task_struct的地址呢,它是由list_entry宏来实现的。
我们先给出所需函数或宏的源代码
  1. list_entry(/include/linux/list.h)
  2. #define list_entry(ptr, type, member) \
  3.         container_of(ptr, type, member)
  4. ---------------------------------------------------
  5. container_of(include/linux/kernel.h)
  6. #define container_of(ptr, type, member) ({                        \
  7.         const typeof( ((type *)0)->member ) *__mptr = (ptr);        \
  8.         (type *)( (char *)__mptr - offsetof(type,member) );})
  9. -------------------------------------------
  10. offsetof(/include/linux/stddef.h)
  11. #define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
对于list_entry宏来说ptr在这里为指向children链表的指针,type为task_struct结构体的类型,member为链表成员的变量名,即children。
container_of()思路为先求出结构体成员member(即children)在结构体(即task_struct)中的偏移量,然后再根据member的地址(即ptr)来求出结构体(即task_struct)的地址。

哇哈哈  下面是我觉得最经典的地方((type *)0)->member,他将地址0强制转换为type类型的指针,然后再指向成员member,此时((type *)0)->member的地址即为member成员相对于结构体的位移。
其中typeof()相当于C的sizeof(),(char *)__mptr这个强制转换用来计算偏移字节量,size_t被定义为unsigned int 类型。



       这样这个过程就不难理解了吧


PS:网上找到的list_entry宏定义的另一个版本(有人说是老版本kernel里面的),其实是一样的,大家自己理解吧。^_^
#define list_entry(ptr, type, member) \ 
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) 

posted @ 2012-10-15 10:57 鑫龙 阅读(319) | 评论 (0)编辑 收藏

Linux通过slab分配器动态分配task_struct结构,该结构定义在了<include/linux/sched.h>文件中,进程描述符中包含一个具体进程的所有信息,各个进程的task_struct存放在它们内核栈的尾端。在栈底(对于向下增长的栈)或栈顶(对于向上增长的栈)创建一个新的结构struct thread_info。利用这个新的机构来迅速的找到task_struct的位置。
  下面是kernel2.6.32.10里task_struct的定义(对于x86类型的CPU来说文件位于:arch/x86/include/asm /include/asm/thread_info.h):
  1. struct thread_info {
  2.         struct task_struct        *task;                /* main task structure */
  3.         struct exec_domain        *exec_domain;        /* execution domain */
  4.         __u32                        flags;                /* low level flags */
  5.         __u32                        status;                /* thread synchronous flags */
  6.         __u32                        cpu;                /* current CPU */
  7.         int                        preempt_count;        /* 0 => preemptable, <0 => BUG */
  8.         mm_segment_t                addr_limit;
  9.         struct restart_block    restart_block;
  10.         void __user                *sysenter_return;
  11. #ifdef CONFIG_X86_32
  12.         unsigned long           previous_esp;   /* ESP of the previous stack in   case of nested (IRQ) stacks*/
  13.         __u8                        supervisor_stack[0];
  14. #endif
  15.         int                        uaccess_err;
  16. };
其中的task的值就为task_struct的位置。
kernel利用current宏寻找task_struct的位置,假设栈的大小为8k(13个二进制位),我们可以将进程栈的地址的后13位屏蔽掉,这样得到的刚好就是进程栈的起始地址,而thread_info刚好就是位于进程栈的底部,所以进程栈的起始地址就是struct thread_info的地址,得到了thread_info的地址,我们就很容易找到task_struct的地址了。
汇编实现过程为
movl  %-8192 ,%eax
andl   %esp ,%eax
寄存器esp存放进程栈的当前地址,eax最后存放的就是进程栈的起始地址。current使用current_thread_info来实现这个过程。
kernel源码(对于x86类型的CPU来说文件位于arch/x86/include/asm //include/asm/thread_info.h)
  1. /* how to get the current stack pointer from C */
  2. register unsigned long current_stack_pointer asm("esp") __used;

  3. /* how to get the thread information struct from C */
  4. static inline struct thread_info *current_thread_info(void)
  5. {
  6.         return (struct thread_info *)
  7.                 (current_stack_pointer & ~(THREAD_SIZE - 1));
  8. };
其中current_stack_pointer为进程栈的当前地址,THREAD_SIZE为进程栈的大小。
所以current_thread_info()->task即为task_struct()的地址。 

posted @ 2012-10-15 10:37 鑫龙 阅读(534) | 评论 (0)编辑 收藏

一、Observer模式的意图:

                在对象的内部状态发生变化时,自动通知外部对象进行响应。

二、Observer模式的构成:

               ·被观察者:内部状态有可能被改变,而且又需要通知外部的对象

               ·观察者:需要对内部状态的改变做出响应的对象

三、Observer模式的Java实现:

                Java的API中已经为我们提供了Observer模式的实现。具体由java.util.Observable类和java.util.Observer接口完成。

                前者有两个重要的方法:

                       ·setChanged:设置内部状态为已改变

                       ·notifyObservers(Object obj):通知观察者所发生的改变,参数obj是一些改变的信息

                后者有一个核心方法:

                       ·update(Object obj):相应被观察者的改变,其中obj就是被观察者传递过来的信息,该方法会在notifyObservers被调用时自动调用。

              下面是Observer模式的实现过程:

                     ·创建一个被观察者,继承java.util.Observable

                     ·创建一个观察者,实现java.util.Observer接口

                    · 注册观察着,调用addObserver(Observer observer)

                    ·在被观察者改变对象内部状态的地方,调用setChanged()方法,然后调用notifyObservers(Object)方法,通知被观察者

                   ·在观察者的update(Object)方法中,对改变做出响应。

四、Observer模式的好处:

                 1.Observer模式的优点:

                       ·被观察者只需要知道谁在观察它,无需知道具体的观察细节

                       ·被观察者一旦发生变化,只需要通过广播的方式告知观察者,至于消息如何到达则不需知道。这样的话无疑消除了被观察者和观察者之间通信的硬编码

                       ·当一个被观察者同时被多个观察着观察时,观察者可以只选择自己感兴趣的事件,而忽略其它的事件
   
                      ·多个观察者组合起来可以形成一个观察链,如果一旦需要回滚多个操作,此时观察链可以发挥作用

                      ·观察者可以实时对被观察对象的变化做出响应,例如自动告警、中断运行等


                2.运用Observer模式可以

                     ·屏蔽线程间的通信机制:例如两个线程之间,主线程可以作为观察者,执行线程是被观察者。彼此之间只知道对方存在,但不知道之间通信的细节

                    ·消除硬编码:如果没有Observer模式,则只能采用回调的模式,或者在代码中显示地调用观察者

                    ·优化异常机制:特别适合在异常发生时向顶层监控,减少try-catch代码量

代码:

 

  1. public class Observable {      
  2.     private boolean changed = false;      
  3.     private Vector obs;      
  4.          
  5.     //创建被观察者时就创建一个它持有的观察者列表,注意,这个列表是需要同步的。      
  6.     public Observable() {      
  7.     obs = new Vector();      
  8.     }      
  9.      
  10.     /**    
  11.      * 添加观察者到观察者列表中去    
  12.      */     
  13.     public synchronized void addObserver(Observer o) {      
  14.         if (o == null)      
  15.             throw new NullPointerException();      
  16.     if (!obs.contains(o)) {      
  17.         obs.addElement(o);      
  18.     }      
  19.     }      
  20.      
  21.     /**    
  22.      * 删除一个观察者    
  23.      */     
  24.     public synchronized void deleteObserver(Observer o) {      
  25.         obs.removeElement(o);      
  26.     }      
  27.      
  28.     /**    
  29.      * 通知操作,即被观察者发生变化,通知对应的观察者进行事先设定的操作,不传参数的通知方法    
  30.      */     
  31.     public void notifyObservers() {      
  32.     notifyObservers(null);      
  33.     }      
  34.      
  35.     /**    
  36.      * 与上面的那个通知方法不同的是,这个方法接受一个参数,这个参数一直传到观察者里,以供观察者使用    
  37.      */     
  38.     public void notifyObservers(Object arg) {      
  39.           
  40.         Object[] arrLocal;      
  41.      
  42.     synchronized (this) {      
  43.         if (!changed)      
  44.                 return;      
  45.             arrLocal = obs.toArray();      
  46.             clearChanged();      
  47.         }      
  48.      
  49.         for (int i = arrLocal.length-1; i>=0; i--)      
  50.             ((Observer)arrLocal[i]).update(this, arg);      
  51.     }      
  52. }     
  53. public interface Observer {      
  54.     /**    
  55.      * This method is called whenever the observed object is changed. An    
  56.      * application calls an <tt>Observable</tt> object's    
  57.      * <code>notifyObservers</code> method to have all the object's    
  58.      * observers notified of the change.    
  59.      *    
  60.      * @param   o     the observable object.    
  61.      * @param   arg   an argument passed to the <code>notifyObservers</code>    
  62.      *                 method.    
  63.      */     
  64.     void update(Observable o, Object arg);      
  65. }      
  66. }     
  67. public class MailObserver implements Observer{      
  68.      
  69.     /**    
  70.      * 这个类取名为MailObserver,顾名思义,她是一个用来发送邮件的观察者    
  71.      */     
  72.     public void update(Observable o, Object arg) {      
  73.         System.out.println("发送邮件的观察者已经被执行");      
  74.     }      
  75. }     
  76. public class JMSObserver implements Observer{      
  77.      
  78.     public void update(Observable o, Object arg) {      
  79.         System.out.println("发送消息给jms服务器的观察者已经被执行");      
  80.     }      
  81. }     
  82. public class Subject extends Observable{      
  83.           
  84.     /**    
  85.      * 业务方法,一旦执行某个操作,则通知观察者    
  86.      */     
  87.     public void doBusiness(){      
  88.         if (true) {      
  89.             super.setChanged();      
  90.         }      
  91.         notifyObservers("现在还没有的参数");      
  92.     }      
  93.      
  94.           
  95.     public static void main(String [] args) {      
  96.         //创建一个被观察者      
  97.         Subject subject = new Subject();      
  98.               
  99.         //创建两个观察者      
  100.         Observer mailObserver = new MailObserver();      
  101.         Observer jmsObserver = new JMSObserver();      
  102.               
  103.         //把两个观察者加到被观察者列表中      
  104.         subject.addObserver(mailObserver);      
  105.         subject.addObserver(jmsObserver);      
  106.               
  107.         //执行业务操作      
  108.         subject.doBusiness();      
  109.     }      
  110. }    

 

 

posted @ 2012-09-26 14:30 鑫龙 阅读(287) | 评论 (0)编辑 收藏

Java RMI 指的是远程方法调用 (Remote Method Invocation)。它是一种机制,能够让在某个 Java 虚拟机上的对象调用另一个 Java 虚拟机中的对象上的方法。可以用此方法调用的任何对象必须实现该远程接口。
 
Java RMI不是什么新技术(在Java1.1的时代都有了),但却是是非常重要的底层技术。
大名鼎鼎的EJB都是建立在rmi基础之上的,现在还有一些开源的远程调用组件,其底层技术也是rmi。
 
在大力鼓吹Web Service、SOA的时代,是不是每个应用都应该选用笨拙的Web Service组件来实现,通过对比测试后,RMI是最简单的,在一些小的应用中是最合适的。
 
下面通过一个简单的例子来说明RMI的原理和应用,下面这个例子是一个简单HelloWorld,但已涵盖RMI的核心应用与开发模式。
 
/** 
* Created by IntelliJ IDEA. 
* User: leizhimin 
* Date: 2008-8-7 21:50:02 
* 定义一个远程接口,必须继承Remote接口,其中需要远程调用的方法必须抛出RemoteException异常 
*/ 
public interface IHello extends Remote { 

    /** 
     * 简单的返回“Hello World!"字样 
     * @return 返回“Hello World!"字样 
     * @throws java.rmi.RemoteException 
     */ 
    public String helloWorld() throws RemoteException; 

    /** 
     * 一个简单的业务方法,根据传入的人名返回相应的问候语 
     * @param someBodyName  人名 
     * @return 返回相应的问候语 
     * @throws java.rmi.RemoteException 
     */ 
    public String sayHelloToSomeBody(String someBodyName) throws RemoteException; 
}
 
/** 
* Created by IntelliJ IDEA. 
* User: leizhimin 
* Date: 2008-8-7 21:56:47 
* 远程的接口的实现 
*/ 
public class HelloImpl extends UnicastRemoteObject implements IHello { 
    /** 
     * 因为UnicastRemoteObject的构造方法抛出了RemoteException异常,因此这里默认的构造方法必须写,必须声明抛出RemoteException异常 
     * 
     * @throws RemoteException 
     */ 
    public HelloImpl() throws RemoteException { 
    } 

    /** 
     * 简单的返回“Hello World!"字样 
     * 
     * @return 返回“Hello World!"字样 
     * @throws java.rmi.RemoteException 
     */ 
    public String helloWorld() throws RemoteException { 
        return "Hello World!"; 
    } 

    /** 
     * 一个简单的业务方法,根据传入的人名返回相应的问候语 
     * 
     * @param someBodyName 人名 
     * @return 返回相应的问候语 
     * @throws java.rmi.RemoteException 
     */ 
    public String sayHelloToSomeBody(String someBodyName) throws RemoteException { 
        return "你好," + someBodyName + "!"; 
    } 
}
 
/** 
* Created by IntelliJ IDEA. 
* User: leizhimin 
* Date: 2008-8-7 22:03:35 
* 创建RMI注册表,启动RMI服务,并将远程对象注册到RMI注册表中。 
*/ 
public class HelloServer { 
    public static void main(String args[]) { 

        try { 
            //创建一个远程对象 
            IHello rhello = new HelloImpl(); 
            //本地主机上的远程对象注册表Registry的实例,并指定端口为8888,这一步必不可少(Java默认端口是1099),必不可缺的一步,缺少注册表创建,则无法绑定对象到远程注册表上 
            LocateRegistry.createRegistry(8888); 

            //把远程对象注册到RMI注册服务器上,并命名为RHello 
            //绑定的URL标准格式为:rmi://host:port/name(其中协议名可以省略,下面两种写法都是正确的) 
            Naming.bind("rmi://localhost:8888/RHello",rhello); 
//            Naming.bind("//localhost:8888/RHello",rhello); 

            System.out.println(">>>>>INFO:远程IHello对象绑定成功!"); 
        } catch (RemoteException e) { 
            System.out.println("创建远程对象发生异常!"); 
            e.printStackTrace(); 
        } catch (AlreadyBoundException e) { 
            System.out.println("发生重复绑定对象异常!"); 
            e.printStackTrace(); 
        } catch (MalformedURLException e) { 
            System.out.println("发生URL畸形异常!"); 
            e.printStackTrace(); 
        } 
    } 
}
 
/** 
* Created by IntelliJ IDEA. 
* User: leizhimin 
* Date: 2008-8-7 22:21:07 
* 客户端测试,在客户端调用远程对象上的远程方法,并返回结果。 
*/ 
public class HelloClient { 
    public static void main(String args[]){ 
        try { 
            //在RMI服务注册表中查找名称为RHello的对象,并调用其上的方法 
            IHello rhello =(IHello) Naming.lookup("rmi://localhost:8888/RHello"); 
            System.out.println(rhello.helloWorld()); 
            System.out.println(rhello.sayHelloToSomeBody("熔岩")); 
        } catch (NotBoundException e) { 
            e.printStackTrace(); 
        } catch (MalformedURLException e) { 
            e.printStackTrace(); 
        } catch (RemoteException e) { 
            e.printStackTrace();   
        } 
    } 
}
 
运行RMI服务端程序:
 
运行RMI客户端程序:
 
总结:
从上面的过程来看,RMI对服务器的IP地址和端口依赖很紧密,但是在开发的时候不知道将来的服务器IP和端口如何,但是客户端程序依赖这个IP和端口。
这也是RMI的局限性之一。这个问题有两种解决途径:一是通过DNS来解决,二是通过封装将IP暴露到程序代码之外。
RMI的局限性之二是RMI是Java语言的远程调用,两端的程序语言必须是Java实现,对于不同语言间的通讯可以考虑用Web Service或者公用对象请求代理体系(CORBA)来实现。

posted @ 2012-09-26 14:07 鑫龙 阅读(225) | 评论 (0)编辑 收藏

仅列出标题
共20页: First 12 13 14 15 16 17 18 19 20