チュートリアル

intdash SDK for Python を使用するための基本的な使用手順を解説します。 このチュートリアルでは、特定のエッジの計測を取得し、その時系列データを取得します。

エッジを取得する

クライアント生成後、参照したいエッジを取得します。エッジリソースへのアクセスには、 Edges - エッジへのアクセスオブジェクト を用います。

サーバーに登録されているエッジの一覧を参照する場合、 list() メソッドを使用します。 name パラメーターに文字列を指定すると、その文字列で始まる名前を持つエッジの一覧を取得します。

import intdash

lst = edges.list(name="admin", limit=10)
for edge in lst:
   print(edge, "\n")

# uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# name: admin
# nickname: administrator
# description: this is admin user
# type: user
# disabled: False
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
# last_login_at: 2020-01-01 00:00:00.000000+00:00

エッジが多数存在する場合には、 iterator オプションを指定しイテレータを生成します。 イテレーションごとに、エッジの一覧を取得します。

it = edges.list(limit=10, iterator=True)
for lst in it:
    for edge in lst:
        print(edge, "\n")

エッジに紐づく計測を取得する

先程取得したエッジに紐づいている計測(Mesureament)を取得します。 計測リソースへのアクセスは、 Measurements - 計測へのアクセスオブジェクト を使用します。

計測へのアクセスオブジェクトを取得します。

measurements = client.measurements

計測の一覧の取得には list() を使用します。 先程取得したエッジのUUIDを edge_uuid に指定することで、計測の一覧を取得します。

from intdash import timeutils

lst = measurements.list(
    edge_uuid=edge.uuid,
    start=timeutils.str2timestamp('2020-01-01 00:00:00.000000+00:00'),
    end=timeutils.str2timestamp('2020-01-02 00:00:00.000000+00:00')
    limit=10
  )

for m in lst:
    print(m, "\n")

# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00

取得したい計測のUUIDが事前に分かっている場合は、 uuid を指定して get() を実行することにより計測を取得することもできます。

measurement = measurements.get(
    uuid="1cef9b31-a356-4415-8b71-84273bbeb1fd",
)
print(measurement)

# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00

時系列データを取得する

先程取得した計測に紐づいている時系列データを取得します。時系列データリソースへのアクセスには、以下2つのうちいずれかを使用します。 レスポンス形式が異なるため、詳細な形式は各ドキュメントを参照してください。

このチュートリアルでは、 client.data_points を使用した例を解説します。

data_points = client.data_points

計測に紐づく時系列データを取得するには、list() を使用します。 measurement_uuid を指定すると、その計測の時系列データの一覧を取得できます。

import intdash
from intdash import timeutils

it = data_points.list(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = True
)

for lst in it:
    for dp in lst:
        print(dp, "\n")

# time: 2020-01-01T05:08:25.676942000Z
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'

警告

データポイント形式を使用する場合、時系列データのペイロードは、 data_payload にバイナリ形式で格納されます。バイナリのフォーマットはデータ型によって異なります。各データ型を表すクラス(データ型クラス)は時系列データオブジェクトに定義してあります。詳細は、時系列データオブジェクト を参照してください。ペイロードに格納されたバイナリを対応するクラスに変換するには、各クラスの静的メソッド from_payload() を使用してください。

import intdash

dps = datapoints.list(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = False
)

intdash.data.String.from_payload(dps[0].data_payload)
#  data_type: str
#  id: str_test
#  value: test

1つのエッジに紐づく時系列データを、複数の計測を横断して取得したい場合、 edge_name もしくは edge_uuid を指定します。

from intdash import timeutils

it = datapoints.list(
    edge_name = "admin",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = True
)

for dpslst in it:
    for dps in dpslst:
        print(dps, "\n")

# time: 2020-01-01T05:08:25.676942000Z
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'

# time: 2020-01-01T06:10:25.879654000Z
# measurement_uuid: 10bf1795-0ae4-4e5e-a88b-b2fa0822a393
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test2'

時系列データをサーバーに保存する

時系列データを保存する際は、時系列データリソースへのアクセスを行う client.data_points あるいは client.units のいずれかを使用します。 使用するアクセスオブジェクトにより、登録の手順が異なります。さらに詳細な情報が必要な場合、各アクセスオブジェクトのドキュメントを参照してください。

  • client.data_points

    • intdash.DataPoint の配列としてデータを作成します。

    • data_payload はバイナリデータを指定する必要があります。バイナリデータのフォーマットはデータ型ごとに異なります。バイナリフォーマットが不明な場合は 時系列データオブジェクト を参照してください。データ型クラスのオブジェクトからバイナリ形式への変換には、to_payload()メソッドを使用してください。

import pandas as pd
import intdash

dps = [
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=0),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test1').to_payload(),
  ),
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=1),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test2').to_payload(),
  ),
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=2),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test3').to_payload(),
  ),
  ...
]

作成したデータのリストを保存するには、 datapoints.store() を実行します。

datapoints = client.data_points

datapoints.store(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    data_points = dps,
)

リアルタイムデータを送受信する

クライアントを使用して、 WebSocket エンドポイントへ接続します。

wsconn = client.connect_websocket()

リアルタイムデータの送信には、 open_upstreams() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.UpstreamSpec オブジェクトと、 送信データの生成に使用するイテレータを入力引数として渡します。

import pandas as pd
from time import sleep

import intdash
from intdash import data

specs = [
    intdash.UpstreamSpec(
        src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
        dst_edge_uuids = None,
        resend = False,
        store = True,
        measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    ),
]

def gen():
    basetime = pd.Timestamp.utcnow()

    while True:
        sleep(0.1)

        yield intdash.Unit(
          elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
          channel = 1,
          data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
        )
        yield intdash.Unit(
          elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
          channel = 2,
          data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
        )

iterators = [gen()]

wsconn.open_upstreams(
    specs = specs,
    iterators = iterators,
)

リアルタイムデータの受信には、 open_downstreams() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.DownstreamSpec オブジェクトと、 データ受信時に実行するコールバックメソッドを入力引数として渡します。

import intdash

specs = [
    intdash.DownstreamSpec(
        src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
        filters = [
            intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
            intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
        ],
    ),
]

def callback(unit):
    print(unit)

callbacks = [callback]

wsconn.open_downstreams(
    specs = specs,
    callbacks = callbacks,
)

処理が終了したら、接続を閉じ、計測を終了します。

wsconn.close()

measurements.update(
    uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    ended=True,
)