關於 Aggregation 範例

Aggregation 範例會示範如何使用 AggregateControl、AggregateRequest 及 AggregateReply 節點來進行簡單的四路聚集作業。其包含下列三個訊息流程來實作四路聚集作業:FanOut、RequestReplyApp 及 FanIn。

FanOut 訊息流程

此流程會接收送入的要求訊息,產生四種不同的要求訊息,透過要求/回覆將其送出,然後開始追蹤聚集作業:

聚集 FanOut 流程
AggregateControl 節點的 Control 端沒有接線,且 MQInput 節點的交易模式設為。您可以在延伸 Aggregation 範例中找到此設計的更多理由。

FanOut 流程包含 AggregateControl 及 AggregateRequest 節點,可用來啟動聚集處理程序。AggregateControl 節點會將要求訊息向下傳播至與其 Out 端連接的四條分支(未定義順序)。每一條分支都有 BuildRequest Compute 節點,以產生個別要求。下列 ESQL 程式碼會用在 BuildRequest1 Compute 節點中:

CREATE COMPUTE MODULE FanOut_CreateRequest1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputLocalEnvironment = InputLocalEnvironment;
CALL CopyQuarter(InputRoot, OutputRoot, 0);
RETURN TRUE;
END;
END MODULE;

CopyQuarter 程序會從輸入訊息複製訊息標頭,然後擷取四分之一的 <SaleList> 元素。在供您使用的範例中,提供了八個 <SaleList> 元素,每一個要求訊息都包含了兩個 <SaleList> 元素。 下列範例顯示此程序的 ESQL 程式碼:

CREATE PROCEDURE CopyQuarter(IN input REFERENCE,
							 IN output REFERENCE,
							 IN jumps INTEGER)
BEGIN
	CALL CopyMessageHeaders(input, output);
	CREATE LASTCHILD OF output DOMAIN 'XMLNSC';
	CREATE LASTCHILD OF output.XMLNSC NAME 'SaleEnvelope';
	DECLARE xmlIn REFERENCE TO input.XMLNSC.SaleEnvelope;
	DECLARE xmlOut REFERENCE TO output.XMLNSC.SaleEnvelope;
	IF LASTMOVE(xmlOut) <> TRUE THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('could not create output message');
	END IF;

	DECLARE invoices INTEGER CAST (xmlIn.Header.SaleListCount AS INTEGER);
	DECLARE quarter INTEGER invoices/4;
	IF invoices <> (quarter*4) THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('not divisible by 4', invoices);
	END IF;	
	
	IF jumps > 3 THEN
		THROW USER EXCEPTION CATALOG 'BIPv610' MESSAGE 2959 VALUES ('too many jumps', jumps);
	END IF;		
	
	DECLARE count INTEGER 1;
	DECLARE copyRef REFERENCE TO xmlIn.SaleList[(jumps*quarter)+count];
	WHILE count <= quarter DO
		SET xmlOut.SaleList[count] = copyRef;
		MOVE copyRef NEXTSIBLING;
		SET count = count + 1;
	END WHILE;
END;

會先進行與輸入項目的狀態相關的一些起始驗證(<SaleList> 元素的數目必須可被四整除,並依 0、1、2 或 3 來選取所需的四分之一),才能將適當數量的 <SaleList> 元素從輸入訊息複製到輸出訊息中。

CopyMessageHeaders 程序(如同在 CopyQuarter 程序中所呼叫的)是以針對新的 Compute 節點產生之 ESQL 中所提供的標準 CopyMessageHeaders 程序為基礎。為了充分重複使用,此 CopyMessageHeaders 程序被上移至 ESQL 檔案的範圍,讓所有 Compute 節點都能呼叫相同的程序。

這個重新設定範圍的動作有一個重要的連帶作用,使得該程序必須有所變更。在 Compute 節點中,OutputRoot 參照含有特殊的內容,可在將訊息樹狀結構元素從 InputRoot 複製到 OutputRoot 時,自動確保會保留網域資訊。然而在此情況下,會將 OutputRoot 當作參照傳遞至外部程序,所以必須明確地保留網域資訊。新增 CREATE LASTCHILD 指令即可達成這個保留的目標:

CREATE PROCEDURE CopyMessageHeaders(IN input REFERENCE,
IN output REFERENCE)
BEGIN
DECLARE I INTEGER 2;
DECLARE J INTEGER CARDINALITY(input.*[]);
WHILE I < J DO
CREATE LASTCHILD OF output DOMAIN FIELDNAME(input.*[I]); -- preserve domain information
SET output.*[I] = input.*[I];
SET I = I + 1;
END WHILE;
END;

在 BuildRequest Compute 節點將 Compute 節點設為傳送 LocalEnvironment 及 Message 以產生要求訊息之後,MQOutput 節點會將它輸出至 AGGR_SAMPLE_REQUEST 佇列。(為簡化此範例,四個要求都會放在相同的佇列中,但是對真正的應用程式而言,此實務可能並不符合實際需求。)每個 AggregateRequest 節點都會將一個資料夾名稱指定成配置參數,以供 AggregateReply 節點在將各種回覆附加在聚集的回覆訊息中時使用。AggregateRequest1 節點會使用輸入訊息的第一個四分之一,AggregateRequest2 節點會使用第二個四分之一,依此類推。

MQOutput 節點的設定是指定 AGGR_SAMPLE_REPLY 做為要求訊息上的 ReplyTo 佇列;RequestReplyApp 訊息流程會使用此設定。

這四個要求訊息都輸出之後,AggregateControl 節點會將聚集的狀態儲存在整合節點內部,如下列步驟所示:

若要瞭解達成此實務的其他方法,請參閱延伸範例

整個流程都必須在交易(MQInput 節點上的交易模式設為 YES)之下執行,因為在收到任何回覆之前完成最後一項作業(儲存聚集作業狀態)是最有效率的。

RequestReplyApp 訊息流程

請使用這個流程來模擬後端服務應用程式(通常用來處理聚集作業所提出的要求訊息)。在實際的系統中,這些後端服務應用程式可以是其他訊息流程或現有的應用程式,但是 Aggregation 範例不需要到這麼複雜的程度,所以此流程只包含正確要求/回覆處理程序所需的最少程序。此流程會讀取 FanOut 流程中之 MQOutput 節點所寫入的相同佇列,並輸出至 FanIn 流程中之輸入節點所讀取的佇列,它在這兩個流程之間提供傳訊的橋樑。訊息會放在其回覆目的地佇列中(同 FanOut 流程中之 MQOutput 節點的設定)。

回覆流程
RequestReplyApp 流程在分配管理系統保存檔 (BAR) 中多指定了三個實例,導致總計有四個執行緒,這可確保以盡可能快的速度處理全部四個要求。

FanIn 訊息流程

此流程會從 RequestReplyApp 流程接收所有的回覆,並將其聚集在單一輸出訊息中。MQOutput 節點不能 輸出 AggregateReply 節點的輸出訊息,所以新增了 Compute 節點來將資料轉換成可以寫出至佇列的格式。
聚集收合流程

FanIn 訊息流程也有三個額外的實例,原因與 RequestReplyApp 流程相同。「整合」節點會在內部儲存前三個送入的回覆,並且會更新所儲存的聚集狀態。處理完第四個回覆時,會擷取已儲存的那三個回覆,並將這四個回覆訊息都建置在輸出訊息中。此訊息並非處於可輸出至佇列的狀態,所以 BuildReply Compute 節點會呼叫下列 ESQL:

CREATE COMPUTE MODULE FanIn_BuildReply
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		SET OutputRoot.Properties = InputRoot.Properties;
		CREATE NEXTSIBLING OF OutputRoot.Properties DOMAIN 'MQMD';
		SET OutputRoot.MQMD.Version = MQMD_CURRENT_VERSION;
		CREATE LASTCHILD OF OutputRoot DOMAIN 'XMLNSC';
		CREATE LASTCHILD OF OutputRoot.XMLNSC NAME 'ComIbmAggregateReplyBody';
		DECLARE next INTEGER 1;
		DECLARE repliesIn REFERENCE TO InputRoot.ComIbmAggregateReplyBody.*[next];
		DECLARE repliesOut REFERENCE TO OutputRoot.XMLNSC.ComIbmAggregateReplyBody;
		WHILE next <= 4 DO -- 4-way aggregation
			CREATE LASTCHILD OF repliesOut NAME FIELDNAME(repliesIn);
			SET repliesOut.*[next].ReplyIdentifier = CAST(repliesIn.Properties.ReplyIdentifier AS CHAR);
			SET repliesOut.*[next].SaleEnvelope = repliesIn.XMLNSC.SaleEnvelope;
			MOVE repliesIn NEXTSIBLING;
			SET next = next + 1;
		END WHILE;
		RETURN TRUE;
	END;
END MODULE;

ESQL 會先新增初步的 MQMD,再從輸入訊息中的 ComIbmAggregateReplyBody 複製資料至輸出訊息中的 XML 樹狀結構,並同時維護聚集要求 ID 和資料夾。沒有指定回覆順序。

測試訊息

用來驅動聚集訊息流程的測試訊息是一個 XML 訊息,其中包含客戶的詳細發票資料。它在八個個別的 <SaleList> 元素中大約包含了 8 KB 的資料。

<SaleEnvelope>
<Header>
<SaleListCount>8</SaleListCount>
</Header>
<SaleList>
<Invoice>
<Initial>K</Initial><Initial>A</Initial>
<Surname>Braithwaite</Surname>
<Item><Code>00</Code><Code>01</Code><Code>02</Code>
<Description>Twister</Description>
<Category>Games</Category>
<Price>00.30</Price><Quantity>01</Quantity>
</Item>
<Item><Code>02</Code><Code>03</Code><Code>01</Code>
<Description>The Times Newspaper</Description>
<Category>Books and Media</Category>
<Price>00.20</Price><Quantity>01</Quantity>
</Item>
<Balance>00.50</Balance><Currency>Sterling</Currency>
</Invoice>
<Invoice>
<Initial>T</Initial><Initial>J</Initial>
<Surname>Dunnwin</Surname>
<Item><Code>04</Code><Code>05</Code><Code>01</Code>
<Description>The Origin of Species</Description>
<Category>Books and Media</Category>
<Price>22.34</Price><Quantity>02</Quantity>
</Item>
<Item><Code>06</Code><Code>07</Code><Code>01</Code>
<Description>Microscope</Description>
<Category>Miscellaneous</Category>
<Price>36.20</Price><Quantity>01</Quantity>
</Item>
<Balance>81.84</Balance><Currency>Euros</Currency>
</Invoice>
</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<SaleList>....</SaleList>
<Trailer>
<CompletionTime>12.00.00</CompletionTime>
</Trailer>
</SaleEnvelope>

回到範例首頁