IBM Event Streams for IBM Cloud 的資料儲存庫連接器

使用 IBM® Event Streams for IBM Cloud® 作為歷程資料儲存的訊息匯流排。 Event Streams 以 Apache Kafka為建置基礎,它是一個開放程式碼、可調式、高傳輸量傳訊系統,提供低延遲平台來處理近即時資料資訊來源。

裝置事件

IBM Event Streams for IBM Cloud 連接器支援從 IoT 工具遞送原始裝置事件。 每一個儲存訊息有效負載皆由分割區索引鍵和訊息索引鍵識別。

訊息有效負載是提交至 IoT 工具之事件有效負載的確切副本。

轉遞事件的分割區索引鍵是透過將 IoT 工具 6 個字元組織 ID 與來源裝置的 typeIddeviceId 連結而形成。 有效負載欄位 (例如併入 timestampeventId) 不會用來形成分割區索引鍵。 此配置可確保來自特定裝置的所有事件會傳送至相同的分割區,以便事件按照它們傳送的順序處理。

訊息索引鍵是下列格式的 JSON 物件:

{ 
  "orgId": "myOrgId",
  "deviceType": "myTypeId",
  "deviceId": "myDeviceId",
  "eventType": "myEventId",
  "format": "myMessageFormat",
  "timestamp": "whenEventWasProcessed"
}

狀態通知

IBM Event Streams for IBM Cloud 連接器支援從 IoT 工具遞送狀態通知。 每一個儲存訊息有效負載皆由分割區索引鍵和訊息索引鍵識別。

訊息有效負載是您透過 MQTT 接收的相同 JSON 編碼通知,具有近乎即時的狀態變更通知訂閱。

轉遞狀態通知的分割區索引鍵是透過將 IoT 工具 6 個字元組織 ID 與來源裝置的 typeIddeviceId 連結而形成。 有效負載欄位 (例如 timestampeventId) 不會用來形成分割區索引鍵。 此配置可確保來自特定裝置的所有事件會傳送至相同的分割區,以便事件按照它們傳送的順序處理。

訊息索引鍵是下列格式的 JSON 物件:

{
  "objectType": "device",
  "typeId": "myTypeId",
  "instanceId": "myDeviceId",
  "logicalInterfaceId": "myLogicalInterfaceId"
}
附註: 針對 Maximo® Monitor 8.8 以及更新版本所建立的裝置不使用邏輯或實體介面。

配置

透過使用 Python SDK ,您可以在幾行程式碼中設定 Event Streams 連結。 如需如何將 IoT 資料轉遞至 Event Streams的相關資訊,請參閱 適用於 DSC 的Python SDK 文件

您還可以使用下列程式碼範例來配置兩個目的地和轉遞規則以確保所有事件和所有狀態都會轉遞。

import wiotp.sdk.application

options = wiotp.sdk.application.parseEnvVars()
appClient = wiotp.sdk.application.ApplicationClient(options)

# Configure the binding
serviceBinding = {
  "name": "myeventstreams", 
  "type": "eventstreams", 
  "credentials": {
    "api_key": "myapikey",
    "user": "myusername",
    "password": "mypassword",
    "kafka_admin_url": "myadminurl",
    "kafka_brokers_sasl": ["broker1", "broker2", "broker3", "broker4"]
  }
}
service = appClient.serviceBindings.create(serviceBinding)
# Set up the connector
connector = appClient.dsc.create(name="connector1", serviceId=service.id)
# Set up destinations
connector.destinations.create(name="events", partitions=3)
connector.destinations.create(name="state", partitions=3)
# Set up rules
rule1 = createdConnector.rules.createEventRule(name="allevents", destinationName="events", typeId="*", eventId="*")
rule2 = createdConnector.rules.createStateRule(name="allstate", destinationName="state", logicalInterfaceId="*")

下列各節更詳細地說明如何使用 IoT 工具 API 來配置 Event Streams 歷程特性。

步驟 1:設定服務連結

若要配置服務連結至 Event Streams,您需要下列資訊:

  • username
  • password
  • api key
  • admin URL
  • 分配管理系統的清單

使用此資訊,您可以呼叫 API 來建立新的 Event Streams 服務連結。 如需相關資訊,請參閱本端 API 文件。 如需存取本端說明文件的相關資訊,請參閱 API

服務連結完成時,會傳回 serviceId。 設定連接器實例時,您需要此 36 字元 UUID。

步驟 2:設定連接器

如果要配置連接器,則需要下列資訊:

  • 建立服務連結時產生的 serviceId
  • 連接器的 nameoptional 說明
  • 選用的 timezone ,用於置換所有時間戳記到世界標準時間 (UTC) 的預設轉換。

提示: 您可以啟用連接器,或建立它並設為停用狀態以便在稍後日期啟用。

使用此資訊,您可以呼叫 API ,以在本端 API 文件中建立新的 Event Streams 連接器。 如需存取本端說明文件的相關資訊,請參閱 API

步驟 3:設定目的地

您必須在連接器上至少登錄一個目的地,然後才能將裝置事件或狀態通知遞送至 Event Streams。 該目的地用來給將使用的主題定義分割區名稱和數量。

如果您建立目的地,則提供下列參數作為:

  • name -主題名稱。
  • partitions -與主題搭配使用的分割區數目。

您可以使用 API 來設定目的地,以建立新的目的地。 如需相關資訊,請參閱本端 API 文件。 如需存取本端說明文件的相關資訊,請參閱 API

步驟 4:設定轉遞規則

最終步驟是使用 API 來建立新的轉遞規則,以配置哪些資料將轉遞至 Event Streams 。 如需相關資訊,請參閱本端 API 文件。 如需存取本端說明文件的相關資訊,請參閱 API

配置規則以將事件或狀態轉遞至目的地:

  • 轉遞事件。
    1. type 設為 event
    2. 使用 typeIddeviceId 來提供選取器。 透過選取器識別的裝置會轉遞其事件。
  • 轉遞狀態。
    1. type 設為 state
    2. 使用 logicalInterfaceId 來提供選取器。 實作邏輯介面的裝置 (即針對 Maximo Monitor 早於 8.8版本所建立的裝置) 會轉遞其狀態。