チュートリアル¶
intdash SDK for Python を使用するための基本的な使用手順を解説します。 このチュートリアルでは、特定のエッジの計測を取得し、その時系列データを取得します。
エッジを取得する¶
クライアント生成後、参照したいエッジを取得します。エッジリソースへのアクセスには、 Edges - エッジへのアクセスオブジェクト を用います。
サーバーに登録されているエッジの一覧を参照する場合、 list() メソッドを使用します。
name
パラメーターに文字列を指定すると、その文字列で始まる名前を持つエッジの一覧を取得します。
import intdash
lst = edges.list(name="admin", limit=10)
for edge in lst:
print(edge, "\n")
# uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# name: admin
# nickname: administrator
# description: this is admin user
# type: user
# disabled: False
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
# last_login_at: 2020-01-01 00:00:00.000000+00:00
エッジが多数存在する場合には、 iterator
オプションを指定しイテレータを生成します。
イテレーションごとに、エッジの一覧を取得します。
it = edges.list(limit=10, iterator=True)
for lst in it:
for edge in lst:
print(edge, "\n")
エッジに紐づく計測を取得する¶
先程取得したエッジに紐づいている計測(Mesureament)を取得します。 計測リソースへのアクセスは、 Measurements - 計測へのアクセスオブジェクト を使用します。
計測へのアクセスオブジェクトを取得します。
measurements = client.measurements
計測の一覧の取得には list() を使用します。
先程取得したエッジのUUIDを edge_uuid
に指定することで、計測の一覧を取得します。
from intdash import timeutils
lst = measurements.list(
edge_uuid=edge.uuid,
start=timeutils.str2timestamp('2020-01-01 00:00:00.000000+00:00'),
end=timeutils.str2timestamp('2020-01-02 00:00:00.000000+00:00')
limit=10
)
for m in lst:
print(m, "\n")
# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
取得したい計測のUUIDが事前に分かっている場合は、 uuid
を指定して get() を実行することにより計測を取得することもできます。
measurement = measurements.get(
uuid="1cef9b31-a356-4415-8b71-84273bbeb1fd",
)
print(measurement)
# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
時系列データを取得する¶
先程取得した計測に紐づいている時系列データを取得します。時系列データリソースへのアクセスには、以下2つのうちいずれかを使用します。 レスポンス形式が異なるため、詳細な形式は各ドキュメントを参照してください。
このチュートリアルでは、 client.data_points を使用した例を解説します。
data_points = client.data_points
計測に紐づく時系列データを取得するには、list() を使用します。
measurement_uuid
を指定すると、その計測の時系列データの一覧を取得できます。
import intdash
from intdash import timeutils
it = data_points.list(
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
id_queries = [
intdash.IdQuery(
data_type=intdash.DataType.string
)
]
limit = 1000,
iterator = True
)
for lst in it:
for dp in lst:
print(dp, "\n")
# time: 2020-01-01T05:08:25.676942000Z
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'
警告
データポイント形式を使用する場合、時系列データのペイロードは、 data_payload
にバイナリ形式で格納されます。バイナリのフォーマットはデータ型によって異なります。各データ型を表すクラス(データ型クラス)は時系列データオブジェクトに定義してあります。詳細は、時系列データオブジェクト を参照してください。ペイロードに格納されたバイナリを対応するクラスに変換するには、各クラスの静的メソッド from_payload() を使用してください。
import intdash
dps = datapoints.list(
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
end = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
id_queries = [
intdash.IdQuery(
data_type=intdash.DataType.string
)
]
limit = 1000,
iterator = False
)
intdash.data.String.from_payload(dps[0].data_payload)
# data_type: str
# id: str_test
# value: test
1つのエッジに紐づく時系列データを、複数の計測を横断して取得したい場合、 edge_name
もしくは edge_uuid
を指定します。
from intdash import timeutils
it = datapoints.list(
edge_name = "admin",
start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
id_queries = [
intdash.IdQuery(
data_type=intdash.DataType.string
)
]
limit = 1000,
iterator = True
)
for dpslst in it:
for dps in dpslst:
print(dps, "\n")
# time: 2020-01-01T05:08:25.676942000Z
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'
# time: 2020-01-01T06:10:25.879654000Z
# measurement_uuid: 10bf1795-0ae4-4e5e-a88b-b2fa0822a393
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test2'
時系列データをサーバーに保存する¶
時系列データを保存する際は、時系列データリソースへのアクセスを行う client.data_points あるいは client.units のいずれかを使用します。 使用するアクセスオブジェクトにより、登録の手順が異なります。さらに詳細な情報が必要な場合、各アクセスオブジェクトのドキュメントを参照してください。
client.data_points
intdash.DataPoint の配列としてデータを作成します。
data_payload
はバイナリデータを指定する必要があります。バイナリデータのフォーマットはデータ型ごとに異なります。バイナリフォーマットが不明な場合は 時系列データオブジェクト を参照してください。データ型クラスのオブジェクトからバイナリ形式への変換には、to_payload()メソッドを使用してください。
import pandas as pd
import intdash
dps = [
intdash.DataPoint(
elapsed_time = pd.Timedelta(seconds=0),
data_type=intdash.DataType.string,
channel = 1,
data_payload = intdash.data.String(data_id='str_test', value='test1').to_payload(),
),
intdash.DataPoint(
elapsed_time = pd.Timedelta(seconds=1),
data_type=intdash.DataType.string,
channel = 1,
data_payload = intdash.data.String(data_id='str_test', value='test2').to_payload(),
),
intdash.DataPoint(
elapsed_time = pd.Timedelta(seconds=2),
data_type=intdash.DataType.string,
channel = 1,
data_payload = intdash.data.String(data_id='str_test', value='test3').to_payload(),
),
...
]
作成したデータのリストを保存するには、 datapoints.store() を実行します。
datapoints = client.data_points
datapoints.store(
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
data_points = dps,
)
リアルタイムデータを送受信する¶
クライアントを使用して、 WebSocket エンドポイントへ接続します。
wsconn = client.connect_websocket()
リアルタイムデータの送信には、 open_upstreams() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.UpstreamSpec オブジェクトと、 送信データの生成に使用するイテレータを入力引数として渡します。
import pandas as pd
from time import sleep
import intdash
from intdash import data
specs = [
intdash.UpstreamSpec(
src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
dst_edge_uuids = None,
resend = False,
store = True,
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
),
]
def gen():
basetime = pd.Timestamp.utcnow()
while True:
sleep(0.1)
yield intdash.Unit(
elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
channel = 1,
data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
)
yield intdash.Unit(
elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
channel = 2,
data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
)
iterators = [gen()]
wsconn.open_upstreams(
specs = specs,
iterators = iterators,
)
リアルタイムデータの受信には、 open_downstreams() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.DownstreamSpec オブジェクトと、 データ受信時に実行するコールバックメソッドを入力引数として渡します。
import intdash
specs = [
intdash.DownstreamSpec(
src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
filters = [
intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
],
),
]
def callback(unit):
print(unit)
callbacks = [callback]
wsconn.open_downstreams(
specs = specs,
callbacks = callbacks,
)
処理が終了したら、接続を閉じ、計測を終了します。
wsconn.close()
measurements.update(
uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
ended=True,
)