September 28, 2007
By lymons
Linux中处理来自共享对象的同步事件
怎么利用设计模式来更有效的使用共享内存
级别:中等
Sachin Agrawal (sachin_agrawal@in.ibm.com), Senior Software Engineer, IBM Software Labs, India Swati P. Udas (swatudas@in.ibm.com), Software Engineer, IBM
10 Nov 2005
在高级语言例如C++中有效的使用共享内存并不是一件浅显易懂的事情,但是它也能克服这些内在的困难。这篇文章描述了在Linux上使用共享内存的两个C++设计模式并包含了样例代码,以及给读者打开了更有效的进程间通信的一扇门。
在面向对象系统中,当一个对象接收到一个消息时它能够发送一套事件。这些事件主要在同步模式下被处理。这个调用进程或者线程在发送消息调用完成之前,发送给对象一个消息和处理事件。然而,如果这个对象送出这些被更多的进程共享以及驻留在内存里的事件,情况就稍微的发生了一些变化。
这篇文章用两个C++的设计模式详细的描述了上述的情况,并且用一些例子程序阐明了解决方案。
- 我们首先描述了没有使用共享内存的例子程序。
- 其次作了一些改动去使用共享内存,这里使用的是第一种设计模式
- 最后,阐述了怎么完成进程间通信,使用的是第二中设计模式
你能应用这些设计模式中的全部概念悼任何的机器构架,操作系统和编译器上。我们使用的是32位Intel®构架的RedHat Linux 7.1发行版 ,使用GNU C++编译器的版本是3.2.3。
没有共享内存
让我们开始一个没有使用共享内存的例子程序:
|
1
Listing 1. common.h
2
3
4
5
#ifndef __COMMON_H__
6
#define __COMMON_H__
7
8
class IObjectWithEvents
9

{
10
public:
11
class IEventSink
12
{
13
public:
14
virtual void OnEvent(pid_t pid, const char * msg) = 0;
15
};
16
17
static IObjectWithEvents * getInstance();
18
19
virtual bool AddEventHandler(IEventSink * pEI) = 0;
20
virtual void SendMessage() = 0;
21
};
22
23
#endif //__COMMON_H__
24
在I
O
bjectWithEvents
类接口中包含了定义了
OnEvent()
方法的
I
E
ventSink
这个嵌入类,这是一个接受发送者
pid
和字符串消息的事件处理器。
getInstance()
方法返回一个共享内存中的对象的引用,
AddEventHandler()
是注册一个事件处理器,
SendMessage()
发送一个消息到对象上,没有任何共享内存的引用,下面的
Listing 2
中列出了
IobjectWithEvents
的程序代码:
1
Listing 2. shm-client1.cpp
2
3
4
5
#include <iostream>
6
#include <sys/types.h>
7
#include <unistd.h>
8
9
#include "common.h"
10
11
#define HERE __FILE__ << ":" << __LINE__ << " "
12
13
using namespace std;
14
15
class EventSink : public IObjectWithEvents::IEventSink
16

{
17
public:
18
void OnEvent(pid_t pid, const char * msg)
19
{
20
cout << HERE << "Message from pid(" << pid << ")\t : " << msg << endl;
21
}
22
};
23
24
int main()
25

{
26
IObjectWithEvents * powe = IObjectWithEvents::getInstance();
27
28
EventSink sink;
29
powe->AddEventHandler(&sink);
30
31
powe->SendMessage();
32
return 0;
33
}
EventSink
类提供了事件处理器的实现,在主函数中显示了发送消息和处理事件的标准顺序。
Listing3
中列出了
ObjectWithEvents
的典型实现代码:
1
Listing 3. ObjectWithEvents.h
2
3
4
5
#include "common.h"
6
7
class ObjectWithEvents : public IObjectWithEvents
8

{
9
public:
10
// We assume singleton design pattern for illustration
11
static ObjectWithEvents * ms_pObjectWithEvents;
12
13
ObjectWithEvents();
14
15
//the implementation for IObjectWithEvents
16
void FireEvent();
17
virtual bool AddEventHandler(IEventSink * pEI);
18
virtual void SendMessage();
19
20
//Collection for maintaining events
21
enum
{ MAX_EVENT_HANDLERS = 16, };
22
long m_npEI;
23
IEventSink * m_apEI[MAX_EVENT_HANDLERS];
24
pid_t m_alPID[MAX_EVENT_HANDLERS];
25
};
26
27
28
29
30
31
Listing 4. ObjectWithEvents.cpp
32
33
34
35
#include <iostream>
36
#include <sys/types.h>
37
#include <sys/shm.h>
38
#include <unistd.h>
39
#include <pthread.h>
40
41
#include "ObjectWithEvents.h"
42
43
using namespace std;
44
45
ObjectWithEvents * ObjectWithEvents::ms_pObjectWithEvents = NULL;
46
47
IObjectWithEvents * IObjectWithEvents::getInstance()
48

{
49
// the following commented code is for illustration only.
50
/**//*
51
if (NULL == ObjectWithEvents::ms_pObjectWithEvents)
52
{
53
ObjectWithEvents::ms_pObjectWithEvents = new ObjectWithEvents();
54
}
55
*/
56
57
return ObjectWithEvents::ms_pObjectWithEvents;
58
}
59
60
ObjectWithEvents::ObjectWithEvents() : m_npEI(0)
61

{
62
}
63
64
void ObjectWithEvents::FireEvent()
65

{
66
// iterate through the collection
67
for (long i = 0; i < m_npEI; i++)
68
{
69
//Recheck for NULL
70
if (0 != m_apEI[i])
71
{
72
// Fire the event
73
m_apEI[i]->OnEvent(m_alPID[i], "");
74
}
75
}
76
77
return;
78
}
79
80
bool ObjectWithEvents::AddEventHandler(IEventSink * pEI)
81

{
82
// NULL check
83
if (NULL == pEI)
84
{
85
return false;
86
}
87
88
// check if there is space for this event handler
89
if (MAX_EVENT_HANDLERS == m_npEI)
90
{
91
return false;
92
}
93
94
// Add this event handler to the collection
95
m_alPID[m_npEI] = getpid();
96
m_apEI[m_npEI++] = pEI;
97
98
return true;
99
}
100
101
void ObjectWithEvents::SendMessage()
102

{
103
//Some processing
104
//And then fire the event
105
106
FireEvent();
107
108
return;
109
}
110
你能使用下面的命令行来编译这些例子程序:
g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp
当你运行shm_client时,将得到下面的输出:
$ ./shm_client shm_client1.cpp:16 Message from pid(3920)
:
使用共享内存:没有事件缓存
现在,对于在共享内存中实例ObjectWithEvents的实现作了以下的修改。
1
Listing 5. Changes to ObjectWithEvents.cpp
2
3
4
5
// To add a declaration for the "new" operator:
6
7
class ObjectWithEvents : public IObjectWithEvents
8

{
9
public:
10
void * operator new(unsigned int);
11
};
12
13
14
// To include an additional header for the Initializer class:
15
16
#include "Initializer.h"
17
18
19
// To overload the operator "new":
20
21
void * ObjectWithEvents::operator new(unsigned int)
22

{
23
return ms_pObjectWithEvents;
24
}
25
26
27
// Then, FireEvent is completely changed:
28
29
void ObjectWithEvents::FireEvent()
30

{
31
// We need to serialize all access to the collection by more than one process
32
int iRetVal = Initializer::LockMutex();
33
34
if (0 != iRetVal)
35
{
36
return;
37
}
38
39
pid_t pid = getpid();
40
41
// iterate through the collection and fire only events belonging to the current process
42
for (long i = 0; i < m_npEI; i++)
43
{
44
// Check whether the handler belongs to the current process.
45
if (pid != m_alPID[i])
46
{
47
continue;
48
}
49
50
//Recheck for NULL
51
if (0 != m_apEI[i])
52
{
53
m_apEI[i]->OnEvent(pid, "");
54
}
55
}
56
57
// release the mutex
58
if ((0 == iRetVal) && (0 != Initializer::UnlockMutex()))
59
{
60
// Deal with error.
61
}
62
63
return;
64
}
65
66
67
// The following are changes to ObjectWithEvents::AddEventHandler():
68
69
// 1. Before accessing the collection, we lock the mutex:
70
71
int bRetVal = Initializer::LockMutex();
72
73
if (0 != bRetVal)
74

{
75
return false;
76
}
77
78
// 2. After accessing the collection, we release the mutex:
79
80
if ((0 == bRetVal) && (0 != Initializer::UnlockMutex()))
81

{
82
// Deal with error.
83
}
84
在共享内存中的示例化对象,定义了一个叫做Initializer
的额外的类
1
Listing 6. Initializer.h
2
3
4
5
#ifndef __Initializer_H__
6
#define __Initializer_H__
7
8
class Initializer
9

{
10
public :
11
int m_shmid;
12
static Initializer ms_Initializer;
13
Initializer();
14
15
static pthread_mutex_t ms_mutex;
16
static int LockMutex();
17
static int UnlockMutex();
18
};
19
20
#endif // __Initializer_H__
21
Initializer
定义了共享内存
id
的变量
m_shmid
和对于同步事件处理器的一个信号量变量
ms_mutex.
LockMutex()
锁定互斥体,
UnlockMutex()
则解锁互斥体。
Listing7
列出了
Initializer
的实现代码:
1
Listing 7. Initializer.cpp
2
3
4
5
#include <iostream>
6
#include <sys/types.h>
7
#include <sys/shm.h>
8
#include <unistd.h>
9
#include <pthread.h>
10
11
#include "Initializer.h"
12
#include "ObjectWithEvents.h"
13
14
using namespace std;
15
16
Initializer Initializer::ms_Initializer;
17
18
pthread_mutex_t Initializer::ms_mutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP;
19
20
Initializer::Initializer() : m_shmid(-1)
21

{
22
bool bCreated = false;
23
key_t key = 0x1234;
24
25
m_shmid = shmget(key,sizeof(ObjectWithEvents), 0666);
26
27
if (-1 == m_shmid)
28
{
29
if(ENOENT != errno)
30
{
31
cerr<<"Critical Error"<<endl;
32
return;
33
}
34
35
m_shmid = shmget(key, sizeof(ObjectWithEvents), IPC_CREAT|0666);
36
37
if (-1 == m_shmid )
38
{
39
cout << " Critical Error " << errno<< endl;
40
return;
41
}
42
43
bCreated = true;
44
}
45
46
47
ObjectWithEvents::ms_pObjectWithEvents = (ObjectWithEvents*)shmat(m_shmid,NULL,0);
48
49
if (NULL == ObjectWithEvents::ms_pObjectWithEvents)
50
{
51
cout << " Critical Error " << errno << endl;
52
return;
53
}
54
55
if (true == bCreated)
56
{
57
ObjectWithEvents * p = new ObjectWithEvents();
58
}
59
60
// Create a mutex with no initial owner.
61
62
63
pthread_mutex_init(&ms_mutex, NULL);
64
65
66
}
67
68
int Initializer::LockMutex()
69
{
70
// Request ownership of mutex.
71
72
pthread_mutex_lock(&ms_mutex);
73
74
if(EDEADLK == errno)
75
{
76
cout << "DeadLock" << endl;
77
return -1;
78
}
79
80
return 0;
81
}
82
83
int Initializer::UnlockMutex()
84
{
85
return pthread_mutex_unlock(&ms_mutex);
86
}
87
如果共享内存不存在的话则创建它,并在共享内存里做成共享对象。如果共享内存已经存在的话,则略过构造共享对象。Initializer::m_shmid纪录标识符ObjectWithEvents::ms_pObjectWithEvents
并记录共享对象的引用。
即使在所有的进程从这个共享内存分离(detach)也不释放共享内存。让你用ipcrm命令显示的销毁它并能用ipcs命令快速察看共享内存的信息。用下面的命令编译成可执行程序:
g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp
控制台上会有下面那样的输出信息:
Listing 8. The console dump
$ ./shm_client
shm_client1.cpp:16 Message from pid(4332) :
$ ipcs
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
0x00001234 327686 sachin 666 136 0
$ ./shm_client
shm_client1.cpp:16 Message from pid(4333) :
$ ipcrm -m 327686
ObjectWithEvents
实例中有一个能从很多进程中收集事件的收集器。它仅仅能发送当前进程注册的事件。设计模式中表明了以下的两点:
- 任何访问事件收集器都要被互斥体对象保护
- 在发送之前事件都要通过进程ID进行过滤。
IPC的共享内存和事件缓存
现在,让我们看看进程间通信的共享内存和事件缓存。如果事件被缓存到共享对象里,则他们在稍后会被过滤。接收的进程将根据事件查询共享对象。然后,通过一个同步模型,进程间的通信能被接受到,这是开发下面的设计模式的主要动机。
在
IobjectWithEvents
中,
像下面那样增加一对儿方法:
1
Listing 9. Adding methods to IobjectWithEvents
2
3
4
5
class IObjectWithEvents
6

{
7
public:
8
virtual bool EnqueueEvent(const char * msg) = 0;
9
virtual bool PollForEvents() = 0;
10
};
EnqueueEvent()
简单的增加到共享对象中的事件缓存中,并且PollForEvents()
将接收这个缓存。
shm_client1将像下面那样被使用EnqueueEvent
()
:
powe->EnqueueEvent("Message from shm_client1");
shm_client2(实质上是shm_client1的拷贝)将像下面那样使用PollForEvents()
:
powe->EnqueueEvent("Message from shm_client2"); powe->PollForEvents();
ObjectWithEvents
的实现代码是下面那样:
1
Listing 10. Additions to ObjectWithEvents
2
3
4
5
class ObjectWithEvents : public IObjectWithEvents
6

{
7
public:
8
virtual bool EnqueueEvent(const char * msg);
9
virtual bool PollForEvents();
10
11
//The event cache
12
enum
{ MAX_EVENTS = 16, MAX_EVENT_MSG = 256, };
13
long m_nEvents;
14
pid_t m_alPIDEvents[MAX_EVENTS];
15
char m_aaMsgs[MAX_EVENTS][MAX_EVENT_MSG];
16
};
17
18
19
新的构造函数变成:
ObjectWithEvents::ObjectWithEvents() : m_npEI(0), m_nEvents(0) { }
EnqueueEvent()
存储事件(例如,每一个被发送的事件的消息和进程号)到一个队列中,PollForEvents()
则迭代整个队列并为队列中的事件一个一个的调用OnEvent().
1
Listing 11. EnqueueEvent
2
3
4
5
bool ObjectWithEvents::EnqueueEvent(const char * msg)
6

{
7
if (NULL == msg)
8
{
9
return false;
10
}
11
12
if (MAX_EVENTS == m_nEvents)
13
{
14
//IEventSink collection full
15
return false;
16
}
17
18
int bRetVal = Initializer::LockMutex();
19
20
if (0 != bRetVal)
21
{
22
return false;
23
}
24
25
m_alPIDEvents[m_nEvents] = getpid();
26
strncpy(m_aaMsgs[m_nEvents++], msg, MAX_EVENT_MSG - 1);
27
28
if ((0 == bRetVal) && (0 != Initializer::UnlockMutex()))
29
{
30
// Deal with error.
31
}
32
33
return true;
34
}
35
36
bool ObjectWithEvents::PollForEvents()
37

{
38
if (0 == m_nEvents)
39
{
40
return true;
41
}
42
43
int bRetVal = Initializer::LockMutex();
44
45
if (0 != bRetVal)
46
{
47
return false;
48
}
49
50
pid_t pid = getpid();
51
52
for (long i = 0; i < m_npEI; i++)
53
{
54
// Does the handler belongs to current process ?
55
56
if (pid != m_alPID[i])
57
{
58
continue;
59
}
60
61
//Recheck for NULL
62
63
if (0 == m_apEI[i])
64
{
65
continue;
66
}
67
68
for (long j = 0; j < m_nEvents; j++)
69
{
70
m_apEI[i]->OnEvent(m_alPIDEvents[j], m_aaMsgs[j]);
71
}
72
}
73
74
if ((0 == bRetVal) && (0 != Initializer::UnlockMutex()))
75
{
76
// Deal with error.
77
}
78
79
return true;
80
}
81
82
83
现在使用下面的命令变成新命令:
g++ -g -o shm_client1 shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp
g++ -g -o shm_client2 shm_client2.cpp ObjectWithEvents.cpp Initializer.cpp
在你的控制台上将像下面那样输出消息:
Listing 12. Output from shm_client1 and shm_client2
$ ./shm_client1
$ ./ipcs
------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
00001234 360454 sachin 666 4300 0
$ ./shm_client2
shm_client2.cpp:16 Message from pid(4454) : Message from shm_client1
shm_client2.cpp:16 Message from pid(4456) : Message from shm_client2
下载
Description
|
Name
|
Size
|
Download method
|
Shared memory sample code
|
sync_code.zip
|
4KB
|
FTP
|
- Posted on: Fri, Sep 28 2007 7:15 PM