チュートリアル

intdash SDK for Python を使用するための基本的な使用手順を解説します。 このチュートリアルでは、特定のエッジの計測を取得し、その時系列データを取得します。

エッジを取得する

クライアント生成後、参照したいエッジを取得します。エッジリソースへのアクセスには、 Edges - エッジへのアクセスオブジェクト を用います。

サーバーに登録されているエッジの一覧を参照する場合、 list() メソッドを使用します。 name パラメーターに文字列を指定すると、その文字列で始まる名前を持つエッジの一覧を取得します。

import intdash

lst = edges.list(name="admin", limit=10)
for edge in lst:
   print(edge, "\n")

# uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# name: admin
# nickname: administrator
# description: this is admin user
# type: user
# disabled: False
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
# last_login_at: 2020-01-01 00:00:00.000000+00:00

エッジが多数存在する場合には、 iterator オプションを指定しイテレータを生成します。 イテレーションごとに、エッジの一覧を取得します。

it = edges.list(limit=10, iterator=True)
for lst in it:
    for edge in lst:
        print(edge, "\n")

エッジに紐づく計測を取得する

先程取得したエッジに紐づいている計測(Mesureament)を取得します。 計測リソースへのアクセスは、 Measurements - 計測へのアクセスオブジェクト を使用します。

計測へのアクセスオブジェクトを取得します。

measurements = client.measurements

計測の一覧の取得には list() を使用します。 先程取得したエッジのUUIDを edge_uuid に指定することで、計測の一覧を取得します。

from intdash import timeutils

lst = measurements.list(
    edge_uuid=edge.uuid,
    start=timeutils.str2timestamp('2020-01-01 00:00:00.000000+00:00'),
    end=timeutils.str2timestamp('2020-01-02 00:00:00.000000+00:00')
    limit=10
  )

for m in lst:
    print(m, "\n")

# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00

取得したい計測のUUIDが事前に分かっている場合は、 uuid を指定して get() を実行することにより計測を取得することもできます。

measurement = measurements.get(
    uuid="1cef9b31-a356-4415-8b71-84273bbeb1fd",
)
print(measurement)

# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00

時系列データを取得する

先程取得した計測に紐づいている時系列データを取得します。時系列データリソースへのアクセスには、以下2つのうちいずれかを使用します。 レスポンス形式が異なるため、詳細な形式は各ドキュメントを参照してください。

このチュートリアルでは、 client.data_points を使用した例を解説します。

data_points = client.data_points

計測に紐づく時系列データを取得するには、list() を使用します。 measurement_uuid を指定すると、その計測の時系列データの一覧を取得できます。

import intdash
from intdash import timeutils

it = data_points.list(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = True
)

for lst in it:
    for dp in lst:
        print(dp, "\n")

# time: 2020-01-01T05:08:25.676942+00:00
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'

警告

データポイント形式を使用する場合、時系列データのペイロードは、 data_payload にバイナリ形式で格納されます。バイナリのフォーマットはデータ型によって異なります。各データ型を表すクラス(データ型クラス)は時系列データオブジェクトに定義してあります。詳細は、時系列データオブジェクト を参照してください。ペイロードに格納されたバイナリを対応するクラスに変換するには、各クラスの静的メソッド from_payload() を使用してください。

import intdash

dps = data_points.list(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = False
)

intdash.data.String.from_payload(dps[0].data_payload)
#  data_type: str
#  id: str_test
#  value: test

1つのエッジに紐づく時系列データを、複数の計測を横断して取得したい場合、 edge_name もしくは edge_uuid を指定します。

from intdash import timeutils

it = data_points.list(
    edge_name = "admin",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = True
)

for dpslst in it:
    for dps in dpslst:
        print(dps, "\n")

# time: 2020-01-01T05:08:25.676942+00:00
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'

# time: 2020-01-01T06:10:25.879654+00:00
# measurement_uuid: 10bf1795-0ae4-4e5e-a88b-b2fa0822a393
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test2'

時系列データをサーバーに保存する

時系列データを保存する際は、時系列データリソースへのアクセスを行う client.data_points あるいは client.units のいずれかを使用します。 使用するアクセスオブジェクトにより、登録の手順が異なります。さらに詳細な情報が必要な場合、各アクセスオブジェクトのドキュメントを参照してください。

  • client.data_points

    • intdash.DataPoint の配列としてデータを作成します。

    • data_payload はバイナリデータを指定する必要があります。バイナリデータのフォーマットはデータ型ごとに異なります。バイナリフォーマットが不明な場合は 時系列データオブジェクト を参照してください。データ型クラスのオブジェクトからバイナリ形式への変換には、to_payload()メソッドを使用してください。

import pandas as pd
import intdash

dps = [
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=0),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test1').to_payload(),
  ),
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=1),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test2').to_payload(),
  ),
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=2),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test3').to_payload(),
  ),
  ...
]

作成したデータのリストを保存するには、 datapoints.store() を実行します。

data_points = client.data_points

data_points.store(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    data_points = dps,
)

リアルタイムデータを送受信する

クライアントを使用して、 WebSocket エンドポイントへ接続します。

wsconn = client.connect_websocket()

リアルタイムデータの送信には、 open_upstreams() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.UpstreamSpec オブジェクトと、 送信データの生成に使用するイテレータを入力引数として渡します。

import pandas as pd
from time import sleep

import intdash
from intdash import data

specs = [
    intdash.UpstreamSpec(
        src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
        dst_edge_uuids = None,
        resend = False,
        store = True,
        measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    ),
]

def gen():
    basetime = pd.Timestamp.utcnow()

    yield intdash.Unit(
      elapsed_time = pd.Timestamp.utcnow() - basetime,
      channel = 1,
      data = data.Basetime(type = intdash.BasetimeType.ntp, basetime = basetime)
    )

    while True:
        sleep(0.1)

        yield intdash.Unit(
          elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
          channel = 1,
          data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
        )
        yield intdash.Unit(
          elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
          channel = 2,
          data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
        )

iterators = [gen()]

wsconn.open_upstreams(
    specs = specs,
    iterators = iterators,
)

リアルタイムデータの受信には、 open_downstreams() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.DownstreamSpec オブジェクトと、 データ受信時に実行するコールバックメソッドを入力引数として渡します。

import intdash

specs = [
    intdash.DownstreamSpec(
        src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
        filters = [
            intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
            intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
        ],
    ),
]

def callback(unit):
    print(unit)

callbacks = [callback]

wsconn.open_downstreams(
    specs = specs,
    callbacks = callbacks,
)

処理が終了したら、接続を閉じ、計測を終了します。

wsconn.close()

measurements.update(
    uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    ended=True,
)

リアルタイムデータを送受信する その2

新たに追加されたメソッド connect_iscp() を使用して、intdashのリアルタイムAPIへ接続する方法を解説します。 これは、 async/await 構文での記述に対応した、リアルタイム接続用の新しいAPIです。

connect_iscp() には、コネクションが閉じられたときに呼び出されるコールバックである on_close を入力引数として渡します。

async def on_close():
    print("connection closed!")

conn = await client.connect_iscp(on_close=on_close)

リアルタイムデータの送信には、 open_upstream() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.UpstreamSpec オブジェクトと、 ACK が返却されたときに呼び出されるコールバックである on_ack を入力引数として渡します。

import pandas as pd
import asyncio

import intdash
from intdash import data

basetime = pd.Timestamp.utcnow()
meas = client.measurements.create(
    edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
    basetime=basetime,
    basetime_type="ntp",
)

spec = intdash.UpstreamSpec(
    src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
    dst_edge_uuids=None,
    resend=False,
    store=True,
    measurement_uuid=meas.uuid,
)

async def on_ack(serial, result):
    print(f"ACK's serial number is {serial}. Result code is {result}.")

upstream = await conn.open_upstream(spec=spec, on_ack=on_ack)

d = intdash.data.Basetime(
    type=intdash.BasetimeType.ntp,
    basetime=basetime,
)
await upstream.write(
    datapoint=intdash.DataPoint(
        elapsed_time=pd.Timestamp.utcnow() - basetime,
        channel=1,
        data_type=d.data_type,
        data_payload=d.to_payload(),
    )
)

while True:
    await asyncio.sleep(0.1)

    try:
        d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07')
        await upstream.write(datapoint=intdash.DataPoint(
            elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
            channel = 1,
            data_type=d.data_type,
            data_payload=d.to_payload(),
        ))

        d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07')
        await upstream.write(datapoint=intdash.DataPoint(
            elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
            channel = 2,
            data_type=d.data_type,
            data_payload=d.to_payload(),
        ))

    except intdash.ISCPConnClosedException:
        break

送信が終了したら、ストリームを閉じます。 このとき、必要に応じて計測を終了させます。

await upstream.close()
client.measurements.update(
    uuid = meas.uuid,
    ended=True,
)

リアルタイムデータの受信には、 open_downstream() メソッドを使用します。 このとき、開くストリームの仕様を指定する intdash.DownstreamSpec オブジェクトと、 データ受信時に呼び出されるコールバックである on_msg を入力引数として渡します。

import intdash

spec = intdash.DownstreamSpec(
    src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
    filters=[
        intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
        intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
    ],
)

async def on_msg(datapoint):
    print(datapoint)

downstream = await conn.open_downstream(spec=spec, on_msg=on_msg)

受信が終了したら、ストリームを閉じます。

await downstream.close()

送受信処理が終了したら、接続を閉じます。

await conn.close()

新メソッドはネイティブコルーチンを使用して実装されているため、 以上の処理は、適切にイベントループを使用して実行する必要があります。

async def coroutine():
    conn = await client.connect_iscp(on_close=on_close)

    dspec = intdash.DownstreamSpec(...)
    downstream = await conn.open_downstream(spec=dspec, on_msg=on_msg)

    basetime = pd.Timestamp.utcnow()
    meas = client.measurements.create(...)

    uspec = intdash.UpstreamSpec(...)
    upstream = await conn.open_upstream(spec=uspec, on_ack=on_ack)

    d = intdash.data.Basetime(...)
    await upstream.write(...)

    async def deadline(upstream):
        await asyncio.sleep(10)
        await upstream.close()

    asyncio.ensure_future(deadline(upstream))

    while True:
        try:
            d = data.CAN(...)
            await upstream.write(...)

            d = data.CAN(...)
            await upstream.write(...)

        except intdash.ISCPConnClosedException:
            break

    client.measurements.update(...)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
loop.close()

また、接続の開始やストリームのオープンには、 async with 構文を使用することができます。

async def coroutine():
    async with await client.connect_iscp(on_close=on_close) as conn:

        dspec = intdash.DownstreamSpec(...)
        async with await conn.open_downstream(spec=dspec, on_msg=on_msg) as downstream:

            basetime = pd.Timestamp.utcnow()
            meas = client.measurements.create(...)

            uspec = intdash.UpstreamSpec(...)
            async with await conn.open_upstream(spec=uspec, on_ack=on_ack) as upstream:
                d = intdash.data.Basetime(...)
                await upstream.write(...)

                async def deadline(upstream):
                    await asyncio.sleep(10)
                    await upstream.close()

                asyncio.ensure_future(deadline(upstream))

                while True:
                    try:
                        d = data.CAN(...)
                        await upstream.write(...)

                        d = data.CAN(...)
                        await upstream.write(...)

                    except intdash.ISCPConnClosedException:
                        break

            client.measurements.update(...)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
loop.close()