カスタム関数からのディメンション・データの追加
ディメンションとディメンション値を既存のデバイス・タイプに追加できます。 Pythonに精通している場合は、コードを使用して既存のデバイス・タイプにディメンション・データを追加できます。
サンプル・カスタム関数 に基づいてバッチ・データ・メトリック用のカスタム関数を開発し、ディメンションを追加することができます。 このサンプル関数は、 IoT 関数 の Preload 基本クラスを使用してディメンションを追加します。 関数を実行するたびに、ディメンションとその値を削除して再作成します。 複数のデバイス・タイプがディメンションを共有する場合は、各デバイス・タイプにカスタム関数を適用してディメンションを設定できます。
始める前に
チュートリアル: カスタム関数の追加 のステップを実行して、環境とパッケージのセットアップ、保管、インストール、テスト、登録、および関数の適用の方法を学習します。
デバイス・タイプのディメンションをシミュレートする場合は、サンプル・スクリプトを使用して、デバイス・タイプにディメンションを追加することができます。 このスクリプトでは、デバイス・タイプに make_dimenion および generate_dimension_data メソッドを使用してディメンション・データをシミュレートします。
カスタム関数を使用したディメンションの追加
ディメンション表をデータベースに追加し、ディメンションに値を割り当てるカスタム関数を作成します。 次のサンプル・コードを関数のテンプレートとして使用します。 カスタム関数を拡張して、ディメンションの値を割り当てることができます。 例えば、関数を拡張して、 .csv ファイルからディメンション値をロードすることができます。
サンプル関数には 2 つのクラスがあります。 両方のクラスはディメンション・テーブルを追加し、デバイス ID ごとにディメンションに値を割り当てます。
SampleDimensionPreload_randomクラスは、ランダム値を割り当てます。SampleDimensionPreload_presetクラスは、事前設定値を割り当てます。
サンプル・カスタム関数
以下のコードは、サンプルのカスタム関数です。 この関数は、事前設定値を持つ事前インストールされたディメンション・データを使用してディメンション・データを作成およびロードし、テスト・データをランダムに生成して割り当てます。 この関数は、ディメンション dimension_1を作成し、そのディメンションにランダム値を追加します。 この関数は古い値を削除し、ディメンション表を再ロードします。 ディメンション・データ値はハードコーディングされていますが、 .csv ファイルからデータをロードするか、 HTTP 要求を使用して拡張することができます。 以下のテンプレートでは、値は preload_valueにハードコーディングされています。
PACKAGE_URL 変数には、パッケージへの URL を指定します。 この URL は、pip インストールを介してアクセス可能でなければなりません。 git+https://<personal_access_token>@github.com/<user_id><path_to_repository>@<packageという形式を使用します。 コードが GitLab, でホストされている場合は、次のフォーマットを使用してください。 git+https://<deploy_token_username>:<deploy_token_password>@gitlab.com/<user_id><path_to_repository>@prod
import logging
import pandas as pd
import numpy as np
from iotfunctions.base import BasePreload
from iotfunctions import ui
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean, func
logger = logging.getLogger(__name__)
PACKAGE_URL = 'PACKAGE_URL'
class SampleDimensionPreload_random(BasePreload):
dim_table_name = None
def __init__(self, output_item ='dimension_preload_done'):
super().__init__(dummy_items=[],output_item = output_item)
def execute(self, df, start_ts = None,end_ts=None,entities=None):
entity_type = self.get_entity_type()
self.db = entity_type.db
schema = entity_type._db_schema
try:
self.dim_table_name = (entity_type.get_attributes_dict()['_dimension_table_name']).upper()
except:
self.dim_table_name = entity_type.logical_name + '_DIMENSION'
msg = 'Dimension table name: ' + str(self.dim_table_name)
logging.debug(msg)
ef = self.db.read_table(entity_type.logical_name, schema=schema, columns=[entity_type._entity_id])
ids = set(ef[entity_type._entity_id].unique())
msg = 'entities with device_id present:' + str(ids)
logger.debug(msg)
self.db.drop_table(self.dim_table_name, schema=schema)
entity_type.make_dimension(self.dim_table_name,
Column('dimension_1', String(50)), # add dimension_1
**{'schema': schema})
entity_type.register()
entity_type.generate_dimension_data(entities=ids)
return True
@classmethod
def build_ui(cls):
inputs = []
outputs=[]
outputs.append(ui.UIStatusFlag(name='output_item'))
return (inputs, outputs)
class SampleDimensionPreload_preset(BasePreload):
dim_table_name = None
def __init__(self, output_item='dimension_preload_done'):
super().__init__(dummy_items=[], output_item=output_item)
def execute(self, df, start_ts=None, end_ts=None, entities=None):
entity_type = self.get_entity_type()
self.db = entity_type.db
schema = entity_type._db_schema
try:
self.dim_table_name = (entity_type.get_attributes_dict()['_dimension_table_name']).upper()
except:
self.dim_table_name = entity_type.logical_name + '_DIMENSION'
msg = 'Dimension table name: ' + str(self.dim_table_name)
logging.debug(msg)
ef = self.db.read_table(entity_type.logical_name, schema=schema, columns=[entity_type._entity_id])
ids = set(ef[entity_type._entity_id].unique())
msg = 'entities with device_id present:' + str(ids)
logger.debug(msg)
self.db.drop_table(self.dim_table_name, schema=schema)
entity_type.make_dimension(self.dim_table_name,
Column('dimension_1', String(50)), # add dimension_1
**{'schema': schema})
entity_type.register()
preload_data = {}
preload_values = np.repeat('preload_value', len(ids))
preload_data['dimension_1'] = preload_values
preload_data[entity_type._entity_id] = list(ids)
df = pd.DataFrame(preload_data)
msg = 'Setting columns for dimensional table\n'
required_cols = self.db.get_column_names(table=self.dim_table_name, schema=schema)
missing_cols = list(set(required_cols) - set(df.columns))
msg = msg + 'required_cols ' + str(required_cols) + '\n'
msg = msg + 'missing_cols ' + str(missing_cols) + '\n'
logger.debug(msg)
self.write_frame(df=df, table_name=self.dim_table_name, if_exists='append')
kwargs = {
'dimension_table': self.dim_table_name,
'schema': schema,
}
entity_type.trace_append(created_by=self,
msg='Wrote dimension to table',
log_method=logger.debug,
**kwargs)
return True
@classmethod
def build_ui(cls):
inputs = []
outputs = []
outputs.append(ui.UIStatusFlag(name='output_item'))
return (inputs, outputs)
ローカル環境でカスタム関数をテストする場合、以下のサンプルのようなスクリプトを使用して SampleDimensionPreload_random クラスをテストすることができます。 プリセット値を使用する予定の場合は、クラス名 SampleDimensionPreload_random を SampleDimensionPreload_preset クラスに置き換えます。
スクリプトを使用する前に、データベース資格情報を取得します。
-
credentials_as.jsonファイルをダウンロードします。 - 変数をデータに置き換え、ファイルをローカル・ワークステーションに保存します。
- データベースにアクセスするためのデータベース・オブジェクトを作成する。 資格情報ファイルを外部リポジトリーにプッシュしないでください。 デフォルトを使用しない場合は、
schema。 - カスタム関数を登録します。 メソッド・シグニチャーまたは必要な入力を変更する場合は、
unregister_functionsを使用する必要があります。 - デバイス・タイプを追加します。 この例では、ディメンションを追加するデバイスが存在することを想定しています。 スクリプトは、このデバイス・タイプにカスタム関数を追加して、ローカルでテストします。 デバイス・タイプ名をデバイス・タイプの名前に更新します。
- ディメンション・テーブル名を取得し、テーブルにディメンション値を追加します。
- デバイス・タイプに対してローカルに定義されているメトリック計算の実行をテストします。 ローカル・テストでは、サーバー・ジョブ・ログは更新されず、メトリック・データはデータレイクに書き込まれません。 代わりに、メトリック・データは .csv 形式でローカル・ファイル・システムに書き込まれます。
- デバイス・データを表示します。
import json
import logging
from ai.function_dimension import (SampleDimensionPreload_random,
SampleDimensionPreload_preset)
from iotfunctions.db import Database
from iotfunctions.enginelog import EngineLogging
EngineLogging.configure_console_logging(logging.DEBUG)
schema = 'bluadmin'
with open('./scripts/credentials_as.json', encoding='utf-8') as F:
credentials = json.loads(F.read())
db = Database(credentials = credentials)
db.unregister_functions(['SampleDimensionPreload_random'])
db.register_functions([SampleDimensionPreload_random])
entity_name = 'entity_dimension_test_random'
entity_type = db.get_entity_type(name=entity_name)
try:
dim_table_name = (entity_type.get_attributes_dict()['_dimension_table_name']).lower()
except:
dim_table_name = entity_name + '_dimension'
entity_type._functions.extend([SampleDimensionPreload_random()])
entity_type.exec_local_pipeline(**{'_production_mode': False})
print ("Reading new dimension table")
print(dim_table_name)
df = db.read_dimension(dimension=dim_table_name, schema=schema)
print(df.head())
print("Finished reading the device dimension table")
以下のスクリプトは、デバイス・タイプにディメンションを追加し、ランダム値を追加して値をシミュレートします。 このスクリプトは、ディメンションを列として追加して作成します。 次元の種類は、整数、INTEGER、浮動小数点数、FLOAT、文字列、VARCHAR、 DateTime,、またはタイムスタンプです。
スクリプトを使用する前に、データベース資格情報を取得します。
-
credentials_as.jsonファイルをダウンロードします。 - 変数をデータに置き換え、ファイルをローカル・ワークステーションに保存します。
デフォルトを使用しない場合は、スクリプトで schema 値を設定します。 これらのディメンションのいずれかを使用すると、事前定義されたセットから値が選択されます。 その他のディメンション名は、ランダム値を生成します。
メトリック計算の実行をローカルでテストするには、 #
entity_type.exec_local_pipeline(**{'_production_mode': False}) 関数を使用します。 ローカル・テストでは、サーバー・ジョブ・ログは更新されず、メトリック・データはデータレイクに書き込まれません。 代わりに、メトリック・データは .csv 形式でローカル・ファイル・システムに書き込まれます。
import json
import logging
from iotfunctions.db import Database
from iotfunctions.enginelog import EngineLogging
from sqlalchemy import Column, String, Integer, Float
EngineLogging.configure_console_logging(logging.DEBUG)
schema = 'bluadmin'
with open('./scripts/credentials_as.json', encoding='utf-8') as F:
credentials = json.loads(F.read())
db = Database(credentials = credentials)
entity_name = 'entity_dimension_simulate'
entity_type = db.get_entity_type(name=entity_name)
try:
dim_table_name = (entity_type.get_attributes_dict()['_dimension_table_name']).lower()
except:
dim_table_name = entity_name + '_dimension'
# db.drop_table(dim_table_name, schema=schema)
Dimension name:Values
['company', 'company_id', 'company_code']: ['ABC', 'COMPANY', 'JDI']
['plant', 'plant_id', 'plant_code']: ['Zhongshun', 'Holtsburg', 'Midway']
['plant', 'plant_id', 'plant_code']: ['US', 'CA', 'UK', 'DE']
['firmware', 'firmware_version']: ['1.0', '1.12', '1.13', '2.1']
['manufacturer']: ['Rentech', 'GHI Industries']
['zone']: ['27A', '27B', '27C']
['status', 'status_code']: ['inactive', 'active']
['operator', 'operator_id', 'person', 'employee']: ['Fred', 'Joe', 'Mary', 'Steve', 'Henry', 'Jane', 'Hillary', 'Justin', 'Rod']
db.drop_table(dim_table_name, schema=schema)
entity_type.make_dimension(dim_table_name,
Column('company', String(50)),
Column('status', String(50)),
Column('operator', String(50)),
**{'schema': schema})
entity_type.register()
# entity_type.exec_local_pipeline(**{'_production_mode': False})
ef = db.read_table(entity_type.logical_name, schema=schema, columns=[entity_type._entity_id])
ids = set(ef[entity_type._entity_id].unique())
entity_type.generate_dimension_data(entities=ids)
ディメンション・データのシミュレート
デバイス・タイプのディメンションをシミュレートできます。 次のスクリプトと同様の Python スクリプトを使用して、ディメンションとその値を追加できます。 ランダム値が、値の配列から設定されます。 スクリプトには、以下のステップが含まれます。
- に接続する。
- データベースにデータベース・オブジェクトを作成する。
- ディメンション用の新しいデータベース列を追加し、ランダム値を追加します。