RX-RPC的客户端与服务器端的通信解析
Rx是扩展远程过程调用(Extended Remote Procedure Call)的缩写,顾名思义,它就是RPC协议的一个扩展。与之不同的是,它允许任意大小的数据请求以及给端到端(end-to-end)的认证和安全提供了通用支持。另外,这个协议能使允许客户端和服务器端自适应它们关联性能之间的不同,连接丢失以及网络延迟。并能发送或接收在单个请求和响应里的整个数据对象来完成大数据量的传输。
注:Rx是基于R package模型(LWP和XDR)而开发出来的。
LWP:lightweight process
XDR:external data representation
Rx通信例子
rxdemo 代码是由两类文件组成, 也就是一些是由程序员写的和一些用Rxgen工具根据人写的代码生成的文件. 前一组的文件中有:
² rxdemo.xg 这是RPC接口定义文件,提供了rpc调用的高级别定义.
² rxdemo_client.c: 这是rxdemo 客户端程序, 里面执行上面rxdemo.xg文件里的调用.
² rxdemo_server.c: 这是rxdemo 服务器端程序, 实现rxdemo.xg 文件里定义的操作.
自动生成的文件有:
² rxdemo.h: 包含rxdemo.xg 里的常量定义和为Rx服务定义的RPC操作码的信息.
² rxdemo.cs.c: 客户端的桩(stub)文件,实现在rxdemo.xg里为RPC例程定义的所有参数的编组和提取.
² rxdemo.ss.c: 也是类似的桩文件,是服务器侧的RPC调用的参数的编组和提取,也提供了一个分派(dispatcher)函数.
² rxdemo.xdr.c: 这个模块定义了转换复杂的用户自定义的数据结构成网络字节顺序的例程(这些数据结构作为rxdemo.xg 里面定义的Rx RPC调用的参数传递给服务器), 以便保证不同内存的客户端和服务器端之间能够正确的通信.
接口文件: rxdemo.xg
这个文件作为本例子的RPC接口定义文. 里面定义了大量的常量, 包括Rx服务的端口和空安全对象(security object:没有经过加密)的索引. 里面有RXDEMO_MAX和RXDEMO_MIN 两个常量, 被用来定义Rx监听线程运行的最大及最小个数. 里面还定义了一些错误号.最后, 它提供了RPC函数的声明, 也就是 Add() and GetFile(). 注意构建实际的函数定义的时候, Rxgen 将把文件中的package行的值, 也就是"RXDEMO ", 附加给函数声明. 因而, 在桩文件里生成的函数就分别变成RXDEMO_Add() 和RXDEMO_GetFile()了.
1/**//*
2
3* Interface for an example Rx server/client application, using both *
4
5* standard and streamed calls. *
6
7*/
8
9package RXDEMO_
10
11%#include <rx/rx.h>
12
13%#include <rx/rx_null.h>
14
15%#define RXDEMO_SERVER_PORT 8000 /*Service port to advertise*/
16
17%#define RXDEMO_SERVICE_PORT 0 /*User server's port*/
18
19%#define RXDEMO_SERVICE_ID 4 /*Service ID*/
20
21Example Server and Client 130 August 28, 1991 10:38
22
23Rx Specification
24
25%#define RXDEMO_NULL_SECOBJ_IDX 0 /*Index of null security object*/
26
27/**//*
28
29* Maximum number of requests that will be handled by this service
30
31* simultaneously. This number will be guaranteed to execute in
32
33* parallel if other service's results are being processed.
34
35*/
36
37%#define RXDEMO_MAX 3
38
39/**//*
40
41* Minimum number of requests that are guaranteed to be handled
42
43* simultaneously.
44
45*/
46
47%#define RXDEMO_MIN 2
48
49/**//*
50
51* Index of the "null" security class in the sample service.
52
53*/
54
55%#define RXDEMO_NULL 0
56
57/**//*
58
59* Maximum number of characters in a file name (for demo purposes).
60
61*/
62
63%#define RXDEMO_NAME_MAX_CHARS 64
64
65/**//*
66
67* Define the max number of bytes to transfer at one shot.
68
69*/
70
71%#define RXDEMO_BUFF_BYTES 512
72
73/**//*
74
75* Values returned by the RXDEMO_GetFile() call.
76
77* RXDEMO_CODE_SUCCESS : Everything went fine.
78
79* RXDEMO_CODE_CANT_OPEN : Can't open named file.
80
81* RXDEMO_CODE_CANT_STAT : Can't stat open file.
82
83* RXDEMO_CODE_CANT_READ : Error reading the open file.
84
85* RXDEMO_CODE_WRITE_ERROR : Error writing the open file.
86
87*/
88
89%#define RXDEMO_CODE_SUCCESS 0
90
91%#define RXDEMO_CODE_CANT_OPEN 1
92
93%#define RXDEMO_CODE_CANT_STAT 2
94
95%#define RXDEMO_CODE_CANT_READ 3
96
97%#define RXDEMO_CODE_WRITE_ERROR 4
98
99/**//*
100
101* ------------ Interface calls defined for this service ------------
102
103*/
104
105Add(IN int a,
106
107int b,
108
109OUT int *result) = 1;
110
111
112
113GetFile(IN string a_nameToRead<RXDEMO_NAME_MAX_CHARS>,
114
115OUT int *a_result) split = 2;
116
117
客户端程序: rxdemo_client.c
rxdemo 客户端程序调用rxdemo.xg文件里定义的操作. 并定义了GetIPAddress() 函数, 根据指定的主机名字符串返回该主机的IP地址.
#include <sys/types.h>
#include <netdb.h>
#include <stdio.h>
#include "rxdemo.h"
static char pn[] = "rxdemo"; /**//*Program name*/
static u_long GetIpAddress(a_hostName)
char *a_hostName;
{ /**//*GetIPAddress*/
static char rn[] = "GetIPAddress"; /**//*Routine name*/
struct hostent *hostEntP; /**//*Ptr to host descriptor*/
u_long hostIPAddr; /**//*Host IP address*/
hostEntP = gethostbyname(a_hostName);
if (hostEntP == (struct hostent *)0) {
printf("[%s:%s] Host '%s' not found\n",
pn, rn, a_hostName);
exit(1);
}
if (hostEntP->h_length != sizeof(u_long)) {
printf("[%s:%s] Wrong host address length (%d bytes instead of %d)",
pn, rn, hostEntP->h_length, sizeof(u_long));
exit(1);
}
bcopy(hostEntP->h_addr, (char *)&hostIPAddr, sizeof(hostIPAddr));
return(hostIPAddr);
} /**//*GetIpAddress*/
客户端的main函数代码是在解析完命令行参数后,开始初始化Rx.
main(argc, argv)
int argc;
char **argv;
{ /**//*Main*/
struct rx_connection *rxConnP; /**//*Ptr to server connection*/
struct rx_call *rxCallP; /**//*Ptr to Rx call descriptor*/
u_long hostIPAddr; /**//*IP address of chosen host*/
int demoUDPPort; /**//*UDP port of Rx service*/
struct rx_securityClass *nullSecObjP; /**//*Ptr to null security object*/
int operand1, operand2; /**//*Numbers to add*/
int sum; /**//*Their sum*/
int code; /**//*Return code*/
char fileName[64]; /**//*Buffer for desired file's name*/
long fileDataBytes; /**//*Num bytes in file to get*/
char buff[RXDEMO_BUFF_BYTES+1]; /**//*Read buffer*/
int currBytesToRead; /**//*Num bytes to read in one iteration*/
int maxBytesToRead; /**//*Max bytes to read in one iteration*/
int bytesReallyRead; /**//*Num bytes read off Rx stream*/
int getResults; /**//*Results of the file fetch*/
printf("\n%s: Example Rx client process\n\n", pn);
if ((argc < 2) || (argc > 3)) {
printf("Usage: rxdemo <HostName> [PortToUse]");
exit(1);
}
hostIPAddr = GetIpAddress(argv[1]);
if (argc > 2)
demoUDPPort = atoi(argv[2]);
else
demoUDPPort = RXDEMO_SERVER_PORT;
/**//*
* Initialize the Rx facility.
*/
code = rx_Init(htons(demoUDPPort));
if (code) {
printf("** Error calling rx_Init(); code is %d\n", code);
exit(1);
}
/**//*
* Create a client-side null security object.
*/
nullSecObjP = rxnull_NewClientSecurityObject();
if (nullSecObjP == (struct rx_securityClass *)0) {
printf("%s: Can't create a null client-side security object!\n", pn);
exit(1);
}
/**//*
* Set up a connection to the desired Rx service, telling it to use
* the null security object we just created.
*/
printf("Connecting to Rx server on `%s', IP address 0x%x, UDP port %d\n",
argv[1], hostIPAddr, demoUDPPort);
rxConnP = rx_NewConnection(hostIPAddr,
RXDEMO_SERVER_PORT,
RXDEMO_SERVICE_ID,
nullSecObjP,
RXDEMO_NULL_SECOBJ_IDX);
if (rxConnP == (struct rx_connection *)0) {
printf("rxdemo: Can't create connection to server!\n");
exit(1);
}
else
printf(" ---> Connected.\n");
rx_Init()函数调用初始化Rx库并且定义了PRC服务的UDP端口 (网络字节顺序). rxnull_NewClientSecurityObject() 调用创建一个客户端侧的的Rx安全对象,它对于Rx调用不执行任何认证. 一旦一个客户端认证对象被创建后, 该程序调用rx_NewConnection()函数, 该函数要指定一个主机, UDP 端口, Rx 服务ID, 以及需要和提供RPC服务的rxdemo服务器实体建立联系的安全信息. 随着Rx连接准备好后,本程序就能执行RPC操作了。首先要调用的是RXDEMO_Add()函数:
/**//*
* Perform our first, simple remote procedure call.
*/
operand1 = 1; operand2 = 2;
printf("Asking server to add %d and %d: ",
operand1, operand2);
code = RXDEMO_Add(rxConnP, operand1, operand2, &sum);
if (code) {
printf("\n** Error in the RXDEMO_Add RPC: code is %d\n", code);
exit(1);
}
printf("Reported sum is %d\n", sum);
RXDEMO_Add()的第一个参数是一个在前面已经建立好的Rx连接的指针. 客户端侧的RXDEMO_Add() 函数的内容已经根据rxdemo.xg 接口文件被生成并驻留在文件rxdemo.cs.c 里,看样子它就是一个正常的C程序函数.
第二个RPC调用RXDEMO_GetFile()是流操作函数则更加复杂一些,Rx的更多的内部工作被暴露在这个调用中. 但首先要考虑的细节是对于这个流操作函数要被自动地拆分成两个新的Rx调用(在rxdemo.xg里定义该函数接口时使用了 split 关键字). 那么,创建Rx调用的处理就必须受到添加到main函数里。
/**//*
* Set up for our second, streamed procedure call.
*/
printf("Name of file to read from server: ");
scanf("%s", fileName);
maxBytesToRead = RXDEMO_BUFF_BYTES;
printf("Setting up an Rx call for RXDEMO_GetFile");
rxCallP = rx_NewCall(rxConnP);
if (rxCallP == (struct rx_call *)0) {
printf("** Can't create call\n");
exit(1);
}
printf("done\n");
一旦Rx调用结构体已经被创建, 我们就可以使用它来进行RPC通信了. 在接口文件rxdemo.cs.c中,声明了已经拆分的两个函数, 它们的函数体的内容基本都是Rxgen创建的. 第一个, StartRXDEMO_GetFile(), 负责组装RPC调用的参数和发行这个RPC. 第二个, EndRXDEMO_GetFile(), 则是从流缓冲中提取RPC调用的返回结果. 下面的代码片段是StartRXDEMO_GetFile()把调用参数传递给服务器.
/**//*
* Sending IN parameters for the streamed call.
*/
code = StartRXDEMO_GetFile(rxCallP, fileName);
if (code) {
printf("** Error calling StartRXDEMO_GetFile(); code is %d\n", code);
exit(1);
}
一旦所有的参数都被传送出去了, 服务器将通过接受到的Rx调用结构体开始着手传递流数据返回给客户端. 首先服务器返回给客户端一个流数据的大小存储在一个longword 中. 所以,下面的代码就是先从流缓冲里取出这个值。
/**//*
* Begin reading the data being shipped from the server in response to
* our setup call. The first longword coming back on the Rx call is
* the number of bytes to follow. It appears in network byte order,
* so we have to fix it up before referring to it.
*/
xdrrx_create(&rxCallP, call, XDR_DECODE);
if (!xdr_long(&rxCallP, & fileDataBytes))
return BULK_ERROR;
客户端如果知道了服务器返回的字节数后, 它将运行一个循环从Rx调用的流缓冲中读出这些数据, 使用的函数是rx_Read(). 在这个例子中, 所做的就是把每次读到的缓冲信息给打印出来.
/**//*
* Read the file bytes via the Rx call, a buffer at a time.
*/
printf("[File contents (%d bytes) fetched over the Rx call appear below]\n\n",
fileDataBytes);
while (fileDataBytes > 0) {
currBytesToRead = (fileDataBytes > maxBytesToRead ?
maxBytesToRead : fileDataBytes);
bytesReallyRead = rx_Read(rxCallP, buff, currBytesToRead);
if (bytesReallyRead != currBytesToRead) {
printf("\nExpecting %d bytes on this read, got %d instead\n",
currBytesToRead, bytesReallyRead);
exit(1);
}
/**//*
* Null-terminate the chunk before printing it.
*/
buff[currBytesToRead] = 0;
printf("%s", buff);
/**//*
* Adjust the number of bytes left to read.
*/
fileDataBytes -= currBytesToRead;
} /**//*Read one bufferful of the file*/
在这个循环结束后, Rx 流已经用完了所有的缓冲区. 就要使用第二个函数EndRXDEMO_GetFile()来结束Rx调用, 并接受来自服务器的输出参数.
/**//*
* Finish off the Rx call, getting the OUT parameters.
*/
printf("\n\n[End of file data]\n");
code = EndRXDEMO_GetFile(rxCallP, &getResults);
if (code) {
printf("** Error getting file transfer results; code is %d\n", code);
exit(1);
}
随着两个Rx调用的完成, 就可以使用下面的代码来关闭Rx调用,退出客户端.
/**//*
* Finish off the Rx call.
*/
code = rx_EndCall(rxCallP, code);
if (code)
printf("Error in calling rx_EndCall(); code is %d\n", code);
printf("\n\nrxdemo complete.\n");
RX-RPC的客户端与服务器端的通信解析(流程图) 点击这里看完整图片