内容


用 ACE Framework 实现进程间通信

Comments

对于大多数程序员而言,进程间通信 (IPC) 就等于使用 Socket API。Socket API 原来是为 UNIX® 平台开发的,用于在 TCP/IP 协议之上提供应用层接口。它支持许多函数,表 1 列出部分函数。

表 1. Socket API 包含的一部分 C 例程
C 函数名提供的功能
socket分配新的套接字句柄
bind把套接字句柄与本地或远程地址关联起来
listen被动地监听到达的客户机连接请求
connect客户机使用 connect 例程开始 TCP 握手;服务器使用 accept 例程接受连接请求
send传输数据
recv接收数据

使用基本的 Socket API 有几个问题。首先,尽管它们的使用非常普遍,但是这个 API 的遗留 C 函数是不可移植的。例如,Windows® 中的 socket 方法返回一个 SOCKET 类型的句柄,而在 UNIX 中相同的函数返回一个整数。一些方法(比如 closesocket)只在 Windows 中存在,更不用提那些不兼容的头文件。另外,基本 API 有许多只在运行时出现的错误;例如,地址或协议不匹配或未初始化的数据成员。

Adaptive Communication Environment (ACE) Framework 定义了一组包装器来解决这些问题。本文讨论 ACE 为相同或不同主机计算机之间的 IPC 提供的一些基于 C++ 的面向对象类。

用于网络编程的 ACE 类

表 2 给出 ACE 为 TCP/IP 连接定义的一些基本类。

表 2. ACE 中用于网络编程的类
ACE 类提供的功能
ACE_AddrACE 中的基类;用于网络寻址
ACE_INET_Addr派生自 ACE_Addr;用于 Internet 域寻址
ACE_SOCKACE 套接字包装器门面(facade)层次结构(ACE_SOCK_AcceptorACE_SOCK_Connector 等)的基类
ACE_SOCK_Acceptor用于被动地建立连接;概念上与 Berkeley Software Distribution (BSD) 的 accept()listen() 例程相似
ACE_SOCK_Connector在流对象和远程主机之间建立连接;概念上与 BSD 的 connect() 例程相似
ACE_SOCK_Stream用于处理面向 TCP 连接的数据传输的类

对于 User Datagram Protocol (UDP) 通信,使用 ACE_SOCK_Dgram 类及其变体。声明这些类的头文件位于安装目录中的 $ACE_ROOT/ace/include 目录。这些头文件分别名为 SOCK_Acceptor.h、SOCK_Listener.h、SOCK_Stream.h 等等。

创建客户端连接

我们先来看看 清单 1 中的代码。

清单 1. 用于从客户端建立连接的 ACE API
ACE_INET_Addr server(458, “tintin.cstg.in”);
ACE_SOCK_Stream client_stream; 
ACE_SOCK_Connector client_connector;
if (-1 == connector.connect(client_stream, server)) { 
  printf (“Failure to establish connection!\n”);
  return;
}
client_stream.send_n(“Hello World”, 12, 0); 
client_stream.close( ); // close when done

客户机需要两段信息:要连接的主机名和端口号。这些信息封装在 ACE_INET_Addr 类中。还有初始化 ACE_INET_Addr 类的其他方式:hostname: port nohostip: port no 是比较流行的做法。例如,可以像下面这样声明服务器:

ACE_INET_Addr server ("tintin.cstg.in:458")

或:

ACE_INET_Addr server ("132.132.2.61:458")

下一步是在客户机数据流和服务器之间建立连接,这由 ACE_SOCK_Connector 类负责。最后,ACE_SOCK_Stream 类代表客户机执行实际的消息传递。ACE_SOCK_Stream 类提供 send_nrecv_n 方法,分别用于向远程机器传输数据和从远程机器接收信息。注意,发送和接收方法不会传输部分消息:要么传输完整的消息,要么根本不传输(在这种情况下,例程返回 -1)。

用 ACE_SOCK_Connector 实现更好的错误处理

有许多原因会导致 connect 方法返回 -1。例如,服务器可能正忙于处理多个客户机,负载太重了;服务器可能正在重新启动,稍后才能开始接受请求;服务器也可能在比较慢的操作系统或硬件上运行。无论是哪种情况,在放弃之前再尝试一段时间是有意义的。connect 方法接受一个超时参数,如果在指定的时间段内未能成功地连接,它就会放弃。清单 2 说明这个过程。

清单 2. 包含超时的客户端 ACE API
…
ACE_SOCK_Connector client_connector;
ACE_Time_Value timeout(5); 
if (-1 == connector.connect(client_stream, server, &timeout)) { 
  printf (“Failure to establish connection!\n”);
  return;
}
…

建立服务器端连接

清单 3 说明 IPC 的服务器端的工作方式。

清单 3. 使用 ACE API 建立服务器端连接
ACE_INET_Addr server(458); 
ACE_SOCK_Acceptor client_responder(server);
ACE_SOCK_Stream client_stream;
ACE_Time_Value timeout(5);
ACE_INET_Addr client; 
if (-1 == client_responder.accept(client_stream, &client, &timeout)) { 
    printf(“Connection not established with client!\n”);
    return;
} else { 
    printf(“Client details: Host Name: %s Port Number: %d\n”, 
         client.get_host_name(), client.get_port_number());
}
// Reading 40 bytes of data from the client
char buffer[128];
if (-1 == client_stream.recv_n(buffer, 40, 0)) { 
   printf(“Error in reading client data!\n”);   
   return;
} else { 
   printf(“Client message: %s\n”, buffer);
}
client_stream.close();

在服务器端,需要应该运行服务的端口号。同样,用端口号创建 ACE_INET_Addr 对象(不需要主机名:在默认情况下,它是当前主机)。ACE_SOCK_Acceptor 类与遗留的 BSD listen 例程相似,用于接受客户机对服务器的连接请求。以相似的方式使用 ACE_SOCK_Stream:代表服务器把消息传递给客户机。看一下 accept 例程调用中超时的使用方法 — [client_responder.accept (client_stream, &client)]:服务器会一直阻塞这个调用。但是,使用超时意味着如果在指定的时间间隔内没有客户机连接服务器,服务器可以输出相应的消息并停止接受请求。

最后,accept 方法接受第四个参数。清单 4 给出 ace/include/SOCK_Acceptor.h 中 accept 方法的声明。

清单 4. accept 方法的声明
  /**
   * Accept a new ACE_SOCK_Stream connection using the QoS
   * information in @a qos_params.  A @a timeout of 0 means block
   * forever, a @a timeout of {0, 0} means poll.  @a restart == true means
   * "restart if interrupted," i.e., if errno == EINTR.  Note that
   * @a new_stream inherits the "blocking mode" of @c this
   * ACE_SOCK_Acceptor, i.e., if @c this acceptor factory is in
   * non-blocking mode, the @a new_stream will be in non-blocking mode
   * and vice versa.
   */
  int accept (ACE_SOCK_Stream &new_stream,
              ACE_Accept_QoS_Params qos_params,
              ACE_Addr *remote_addr = 0,
              ACE_Time_Value *timeout = 0,
              bool restart = true,
              bool reset_new_handle = false) const;

如果 restart 标志设置为 True(默认设置),那么在某些 UNIX 信号造成中断时,会重新启动例程。您必须判断自己的应用程序是否需要这个特性。

使用 ACE 实现基于 UDP 的 IPC

下面快速回顾一下 UDP。UDP 是一种面向数据包的协议,这意味着如果客户机向服务器发送 5 个 1KB 的信息块,服务器可能收到零到 5 个块。但是,服务器收到的任何块都是完整的 1KB 块 — 要么接收整个块,要么根本不接收。UDP 不保证数据到达或到达的次序,所以第四个块有可能在第二个块之前到达。另外,数据报可能在传输过程中丢失;但是,UDP 会尽可能传输数据。清单 5 定义基于 UDP 的基本客户机通信框架。

清单 5. 使用 ACE 实现 UDP 连接的客户机代码
ACE_INET_Addr server(458, “tintin.cstg.in”);
ACE_INET_Addr client(9000);
ACE_SOCK_Dgram client_data(client);

char* message = “Hello World!\n”;
size_t sent_data_length = client_data.send(message, 
                                              strlen(message) + 1, server);
if (sent_data_length == -1) 
  printf(“Error in data transmission\n”);

client_data.close();

同样,为服务器端口信息创建一个 ACE_INET_Addr 对象。与 TCP 的情况不同,对于客户机还需要一个 ACE_INET_Addr 对象,它指定将发送和接收数据报的端口。UDP 的代码不需要任何连接器或接受器类。必须创建一个 ACE_SOCK_Dgram 类型的类并在数据传输期间显式地指定服务器地址,这是 UDP 连接另一个有意思的特点:服务器地址不必是惟一的。同一个客户机实际上可以与多个远程对等计算机通信。基于 TCP 的连接是一对一的,而 UDP 有三种模式:单播(一对一连接)、广播(数据报发送给网络中的每个远程对等计算机)和多播(数据报只发送给网络中的一部分远程对等计算机)。

在讨论广播和多播之前,先看看简单的基于 UDP 的服务器框架(清单 6)。

清单 6. 使用 ACE 实现 UDP 连接的服务器代码
ACE_INET_Addr server(458);
ACE_SOCK_Dgram server_data(server);
ACE_INET_Addr client(“haddock.cstg.in”, 9000);

size_t sent_data_length = client_data.send(message, 
                                              strlen(message) + 1, server);
if (sent_data_length == -1) 
  printf(“Error in data transmission\n”);

server_data.close();

与 TCP 不同,用于 UDP 通信的客户机代码和服务器代码非常相似。最后,注意 ACE_SOCK_Dgram 的销毁方法并不自动地关闭底层 UDP 套接字。必须显式地调用 close 方法;否则,会出现对象泄漏。

面向连接的基于数据报的通信

对于两个对等计算机之间长期存在的通信,反复提供服务器地址是不合适的。ACE 提供了 ACE_SOCK_CODgram 类(这里的 COD 代表 Connection-oriented Datagram),它可以避免反复提供对等计算机地址。只在调用 open 方法时提供对等计算机地址一次。后续调用只需要两个参数:字符缓冲区及其长度。清单 7 给出代码。

清单 7. 使用 ACE 实现基于 COD 连接的客户机代码
ACE_INET_Addr server(458, “tintin.cstg.in”);
ACE_INET_Addr client(9000);
ACE_SOCK_CODgram client_data(client);

if (0 != client_data.open(server)) { 
   printf(“Unable to establish connection with remote host\n”);
   return;
}

char* message = “Hello World!\n”;
size_t sent_data_length = client_data.send(message, 
                                              strlen(message) + 1);
if (sent_data_length == -1) 
  printf(“Error in data transmission\n”);

message = “Initiating”;
client_data.send(message, strlen(message) + 1); 

client_data.close();

使用 ACE 实现广播

对于向多个进程广播的 send 操作,可以使用 ACE_SOCK_Dgram_Bcast 类(派生自 ACE_SOCK_Dgram)。不需要显式地指定 IP 广播地址 — ACE_SOCK_Dgram_Bcast 类负责提供正确的 IP 地址 — 您只需提供适当的远程主机端口号。清单 8 中的代码与 清单 5 相似,但是 send 例程现在只需要应该广播到的端口号。

清单 8. 使用 ACE 实现广播的客户机代码
ACE_INET_Addr local_host(4158, “tintin.cstg.in”);
ACE_SOCK_Dgram_Bcast local_broadcast_dgram(local_host);
int remote_port = 7671; 

char* message = “Hello World!\n”;

size_t sent_data_length = local_broadcast_dgram.send(
                                         message, strlen(message) + 1, remote_port);

if (sent_data_length == -1) 
  printf(“Error in data transmission\n”);

local_broadcast_dgram.close();

所有监听 7671 端口的远程机器都可以使用 ACE_SOCK_Dgram::recv 处理消息。

使用 ACE 实现多播

对于多播操作,使用 ACE_SOCK_Dgram_Mcast 类(也派生自 ACE_SOCK_Dgram)。ACE_SOCK_Dgram_Mcast 类可以对一组对等计算机收发消息。希望加入这个多播组的对等计算机必须显式地预订这个组。清单 9 演示如何预订多播消息或取消预订。

清单 9. 预订从对等计算机集接收多播消息的客户机代码
#define MULTICAST_ADDR “132.132.2.7”

ACE_INET_Addr multicast_addr(4158, MULTICAST_ADDR);
ACE_SOCK_Dgram_Mcast multicast_dgram;

// Subscribe
if (-1 == multicast_dgram.subscribe(multicast_addr)) { 
   printf(“Error subscribing to multicast address\n”);
   return;
} 

// Do Processing
…

// Unsubscribe
if (-1 == multicast_dgram.unsubscribe(multicast_addr)) { 
   printf(“Error unsubscribing to multicast address\n”);
   return;
}

已经预订某一多播组的对等计算机会收到任何成员发送给这个组的所有消息。如果网络中的任何主机希望向这个多播组发送消息,但是对接收响应不感兴趣,那么使用 ACE_SOCK_Dgram 发送消息也可以。清单 10 演示主机如何使用多播数据报接收和发送消息。在真实的应用程序中,接收/发送代码通常放在循环中,从而持续地发送/接收消息。注意 recv 方法中对 ACE_INET_Addr 的使用方法:这有助于捕捉正在传输数据的对等计算机。

清单 10. 预订对对等计算机集发送/接收多播消息的客户机代码
#define MULTICAST_ADDR “132.132.2.7”

ACE_INET_Addr multicast_addr(4158, MULTICAST_ADDR);
ACE_SOCK_Dgram_Mcast multicast_dgram;

//.. Subscribe
// Do Processing for a 256 byte message 
char buffer[1024];
if (-1 == multicast_dgram.send(buffer, 256, multicast_addr)) { 
  printf(“Failure in message transmission\n”);
} 

ACE_INET_Addr peer_address; 
if (-1 == multicast_dgram.recv(buffer, 256, peer_address)) { 
  printf(“Failure in message reception\n”);
} else {
  printf(“Message %s received from Host %s : Port %d\n”, 
    buffer, peer_address.get_host_name(),
    peer_address.get_port_number()); 
} 
// .. Unsubscribe

结束语

本文讨论了如何使用 ACE Framework 实现基于 TCP/IP 和 UDP 的通信。还有使用 ACE 实现 IPC 的其他方法,比如共享内存或 UNIX 风格的套接字寻址(ACE_LSOCK* 组中的类)。在 参考资料 中可以找到更高级信息的链接。


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=AIX and UNIX
ArticleID=448030
ArticleTitle=用 ACE Framework 实现进程间通信
publish-date=11192009