IBM Support

使用IBM Java Toolbox for i 访问IBM i 数据队列

Technical Blog Post


Abstract

使用IBM Java Toolbox for i 访问IBM i 数据队列

Body

进程通信是操作系统内核中非常重要的部分。对于IBM i而言,数据队列是一种非常重要的进程通信方式。一方面,数据队列功能强大,不受编程语言的限制,并能同时支持进程的同步通信与异步通信;另一方面,数据队列使用灵活,其数据消息不受任何格式束缚,换句话说,用户可以根据业务需求自定义数据格式。作为IBM i的Java 应用程序编程接口,IBM Java Toolbox for i支持IBM i数据队列的相关操作,这样,Java应用程序与其它IBM i程序(诸如C,RPG,COBOL程序)之间的通信就变得简单、容易。本文的目标是:从面向程序设计的角度,指导读者如何使用IBM Java Toolbox for i,实现基于IBM i数据队列的进程通信。有关IBM Java Toolbox for i的基础知识,请参见另一篇技术文档“Toolbox for Java 和 JTOpen”。

从结构上,本文主要分为3部分。第1部分简要介绍数据队列的基础知识,包括数据队列的基本概念,工作方式,使用场景,以及与其它通信方式的区别。第2部分则集中介绍IBM Java Toolbox for i针对数据队列的编程支持。在这一部分中,我们将结合生产者与消费者问题相关的样例,指导读者如何使用IBM Java Toolbox for i完成进程间的通信。最后一部分是总结。

本文的样例主要包含两部分。第1部分是IBM i数据队列的相关基础操作,包括队列创建,入队,出队,清空队列,队列删除共计5种。第2部分是进阶篇,实现的是生产者与消费者的原型。其中,生产者进程的任务是向产品队列中不断添加产品,而消费者一直处于监听状态,只要发现队列不为空,则从队列中取走产品。


IBM i数据队列的工作原理
与Unix,Window不同,IBM i是一种面向对象的操作系统。从这个角度而言,数据队列是一种系统对象,对应的类型为*DTAQ。IBM i数据队列的设计目标是提供一种高效的进程通信方式。

狭义上讲,队列是一种先进先出(或FIFO)的数据结构,如图1。它只允许在对头进行删除操作(称为出队),而在队尾进行插入操作(入队)。
图 1. 队列的工作方式

图像 而IBM i数据队列则是一种广义上的队列,或者说它是队列以及队列的变种。按数据队列的出队策略划分,IBM i支持3种类型的数据队列:
  • 先进先出(或FIFO)—— 第一条(最旧)队列消息,位于队头最先出队。这与狭义的FIFO队列等价。
  • 先进后出(或LIFO)—— 最后一条(最新)队列消息,位于队头最先出队。它实际上是一种栈结构。
  • 按索引出队 —— 每条队列消息对应一个索引。与索引规则匹配的队列消息出队。其中,按索引检索的规则包括操作符 =,>,>=,<,<= 等。

除数据队列之外,IBM i还支持其它各种数据存储对象,包括数据库文件,消息队列,数据区(Data Area),用户空间(User Space)等等。但是他们的使用方式与应用场景不同。数据队列的主要目标是进程通信,队列消息的格式不受限制,用户可根据业务需求自由定义;数据库文件的服务对象是数据库相关的增删改查等操作,数据记录有严格的数据格式;消息队列存储的对象是IBM i消息,消息队列本身并不具备队列的数据结构;数据区对象的存储空间有限,通常用于存放IBM i作业之间的共享数据;用户空间对象通常用来存储用户的自定义信息,方便在进程间共享数据。

IBM i支持CL与API两种方式创建与维护数据队列,常用操作如下:

从CL命令的角度,常用的2种操作如下:

  • Create Data Queue (CRTDTAQ) —— 创建数据队列。
  • Delete Data Queue (DLTDTAQ) —— 删除数据队列。

从IBM i API的角度,常用的3种操作如下:

  • Send Data Queue (QSNDDTAQ) —— 发送数据至数据队列。
  • Receive Data Queue (QRCVDTAQ) —— 读取数据队列。
  • Clear Data Queue (QCLRDTAQ) —— 清空数据队列。

为了让读者更好地理解上述数据队列相关CL命令与API的使用方法,接下来,我们给出一些样例。

1. 使用CL命令Create Data Queue(CRTDTAQ)创建一个容量为1KB的数据队列MYDATAQ:

CRTDTAQ DTAQ(MYLIB/MYDATAQ) MAXLEN(1024)

2. 使用Send Data Queue (QSNDDTAQ) API发送数据至数据队列:

call         'QSNDDTAQ'             
parm      'MYDATAQ'     dtaq_name
parm      '*LIBL'               dtaq_lib 
parm      1024                 dtaq_len 
parm                                dtaq_data

3. 使用Receive Data Queue (QRCVDTAQ) API读取数据队列:

call      'QRCVDTAQ'             
parm   'MYDATAQ'     @DQ_Name 
parm   '*LIBL'               @DQ_Lib
parm                             @DQ_Length
parm                             @DQ_Data 
Parm   -1                      @Wait_Sec

4. 使用Clear Data Queue (QCLRDTAQ) API清空数据队列:

call         'QCLRDTAQ'             
parm      'MYDATAQ'     @DQ_Name
parm      '*LIBL'               @DQ_Lib 

5. 使用CL命令Delete Data Queue(DLTDTAQ) 删除数据队列:

DLTDTAQ DTAQ(MYLIB/@MYDATAQ)

这里,需要说明的是,数据队列支持多进程的入队与出队操作。当多个进程同时向一个数据队列发送数据时,入队顺序依照IBM i作业的优先级而定。同样,当多个进程同时从一个数据队列读取数据时,只有优先级最高的才能取到第一个队列消息,紧接着是优先级次高的进程读取第二个队列消息,以此类推。

IBM Java Toolbox for i对IBM i数据队列的编程支持
从程序设计的角度,IBM Java Toolbox for i使用DataQueue及其相关类来支持IBM i数据队列的相关操作。

如前所述,按出队策略划分,IBM i支持3种类型的数据队列,分别为先进先出FIFO,后进先出LIFO,按索引。其中,前两种又可归结为按顺序出队。这样,IBM i数据队列又可划分为两种:按顺序与索引,对应的Java类为DataQueue与KeyedDataQueue,两者的基类为BaseDataQueue,如图2所示。其中,作为DataQueue的帮助类,QueueEntry表示顺序数据队列的读缓存冲区,DataQueueAttributes描述队列属性。KeyedDataQueueEntry则作为KeyedDataQueue的帮助类,表示索引数据队列的读缓存冲区。
图 2. 数据队列相关的类图


图像 前面,我们从CL命令与IBM i API的角度描述了数据队列的创建,入队,出队,清空,删除5种操作,并给出对应的C语言实现。使用IBM Java Toolbox for i与数据队列相关的API,也能实现同样的效果,如清单2所示:
清单 2. 数据队列相关操作的Java语言实现
// 构造AS400对象,建立Java应用程序与IBM i服务器的连接。
AS400 sys = new AS400(system, usr, pwd);
// 构造DataQueue
DataQueue dq = new DataQueue(sys, "/QSYS.LIB/MYLIB.LIB/MYDTAQ.DTAQ");
// 创建数据队列
dq.create(1024);
// 入队
dq.write(inputData); //data是一个字节数组,表示入队数据。

// 出队
DataQueueEntry dqData = dq.read();
byte[] outputData = dqData.getData();
// 清空数据队列
dq.clear();
// 删除数据队列
dq.delete();

// 作为好的编程习惯,释放连接
sys.disconnectService(AS400.DATAQUEUE);

接下来,为接近实际应用,我们演示如何使用IBM Java Toolbox for i与数据队列相关的API,创建一个产品队列,并模拟生产者与消费者问题。

前面提到,编程人员需要自己定义IBM i数据队列的消息格式。这里,我们定义的产品结构(即队列消息的数据格式)如下:
表 1. 产品队列的数据格式
数据格式       数据类型 
产品编号       4字节数字 
产品名称       长度为20的字符串 
产品描述       长度为100的字符串                                                                                                                                                                                            

 生产者与消费者进程对应的代码分别如清单3与清单4所示:
清单 3. 生产者线程
import com.ibm.as400.access.AS400;
import com.ibm.as400.access.AS400Bin4;
import com.ibm.as400.access.AS400Text;
import com.ibm.as400.access.BinaryFieldDescription;
import com.ibm.as400.access.CharacterFieldDescription;
import com.ibm.as400.access.CommandCall;
import com.ibm.as400.access.DataQueue;
import com.ibm.as400.access.Record;
import com.ibm.as400.access.RecordFormat;

public class Producer {
   AS400 as400_;
   DataQueue dq;
   RecordFormat dataFormat;

   public Producer(AS400 as400) {
      as400_ = as400;
      // 创建产品队列。
      dq = new DataQueue(as400_, "/QSYS.LIB/JAVADEMO.LIB/MYDTAQ.DTAQ");
      try {
         dq.create(1024);
      } catch (Exception e) {
      }
      // 设置产品对应的数据格式
      BinaryFieldDescription productNumber = new BinaryFieldDescription(
            new AS400Bin4(), "PRODUCT_NUMBER");
      CharacterFieldDescription productName = new CharacterFieldDescription(
            new AS400Text(20), "PRODUCT_NAME");
      CharacterFieldDescription productDescription = new CharacterFieldDescription(
            new AS400Text(100), "PRODUCT_DESCRIPTION");

      dataFormat = new RecordFormat();

      dataFormat.addFieldDescription(productNumber);
      dataFormat.addFieldDescription(productName);
      dataFormat.addFieldDescription(productDescription);
   }

   public void produce(Integer number, String name, String description) {
      CommandCall crtlib = new CommandCall(as400_);
      try {
         crtlib.run("CRTLIB JAVADEMO");
      } catch (Exception e) {
         e.printStackTrace();
      }

      Record data = new Record(dataFormat);
      data.setField("PRODUCT_NUMBER", number);
      data.setField("PRODUCT_NAME", name);
      data.setField("PRODUCT_DESCRIPTION", description);
      try {
    // 产品入队
         dq.write(data.getContents());
         System.out.println("Producer Thread - product, [number: " + number
               + ", name: " + name + ", description: " + description);
      } catch (Exception e) {
         e.printStackTrace();
      }

   }

   public static void main(String[] args) throws Exception {
      // 构造AS400对象,建立Java应用程序与IBM i服务器的连接。
      AS400 as400 = new AS400(hostname, user, name);
      Producer producer = new Producer(as400);

      for (int i = 1; i < 100; i++) {
         producer.produce(i, "Product" + i, "ProductDescription" + i);
         Thread.sleep(2000);
      }
   }
}

在生产者进程中,我们使用指定数据格式,构造产品记录,然后写入产品队列,相关运行结果见图3所示。关于程序中使用到的RecordFormat,FieldDescription,Record等相关类说明,请参见:IBM i信息中心
图 3. 生产者线程运行结果

图像  

清单 4. 消费者线程
import com.ibm.as400.access.AS400;
import com.ibm.as400.access.AS400Bin4;
import com.ibm.as400.access.AS400Text;
import com.ibm.as400.access.BinaryFieldDescription;
import com.ibm.as400.access.CharacterFieldDescription;
import com.ibm.as400.access.DataQueue;
import com.ibm.as400.access.DataQueueEntry;
import com.ibm.as400.access.Record;
import com.ibm.as400.access.RecordFormat;

public class Consumer {
   AS400 as400_;
   DataQueue dq;
   RecordFormat dataFormat;

   public Consumer(AS400 as400) {
      as400_ = as400;
      // 产品队列。
      dq = new DataQueue(as400_, "/QSYS.LIB/JAVADEMO.LIB/MYDTAQ.DTAQ");
      // 设置产品对应的数据格式
      BinaryFieldDescription productNumber = new BinaryFieldDescription(
            new AS400Bin4(), "PRODUCT_NUMBER");
      CharacterFieldDescription productName = new CharacterFieldDescription(
            new AS400Text(20), "PRODUCT_NAME");
      CharacterFieldDescription productDescription = new CharacterFieldDescription(
            new AS400Text(100), "PRODUCT_DESCRIPTION");

      dataFormat = new RecordFormat();

      dataFormat.addFieldDescription(productNumber);
      dataFormat.addFieldDescription(productName);
      dataFormat.addFieldDescription(productDescription);
   }

   public void consume() {
      try {
    // 一旦队列中有数据,则立即出队,否则等待。
         while (true) {
            DataQueueEntry DQData = dq.read(0);
            if (DQData != null) {
               Record data = dataFormat.getNewRecord(DQData.getData());

               Integer number = (Integer) data.getField("PRODUCT_NUMBER");
               String name = (String) data.getField("PRODUCT_NAME");
               String description = (String) data
                     .getField("PRODUCT_DESCRIPTION");
               System.out.println("Consumer Thread - Product, [number: "
                     + number + ", name: " + name + ", description: "
                     + description);
            } else {
               System.out
                     .println("Nothing to process, will check again in 3 seconds");
               Thread.sleep(3000);
            }
         }
      } catch (Exception e) {
         e.printStackTrace();
      }

   }

   public static void main(String[] args) {
      // 构造AS400对象,建立Java应用程序与IBM i服务器的连接。
      AS400 as400 = new AS400(hostname, user, name);
      Consumer consumer = new Consumer(as400);
      consumer.consume();
   }

}

消费者进程一直处于监听状态,只要产品队列不为空,则立即使用指定的数据格式,从产品队列读取产品信息。
图 4. 消费者线程运行结果


图像

至此,我们已通过两个样例演示,利用IBM Java Toolbox for i实现IBM i数据队列的相关操作。

总结
IBM i数据队列的设计目标是提供一种快捷、高效的IBM i进程通信方式。本文介绍了IBM i数据队列的基本概念,工作原理,使用场景,以及如何使用IBM Java Toolbox for i实现相关操作。

参考资源

[{"Business Unit":{"code":"BU009","label":"Systems - Cognitive"},"Product":{"code":"SWG60","label":"IBM i"},"Component":"","Platform":[{"code":"PF025","label":"Platform Independent"}],"Version":"","Edition":""}]

UID

ibm11145068