チュートリアル =================================== intdash SDK for Python を使用するための基本的な使用手順を解説します。 このチュートリアルでは、特定のエッジの計測を取得し、その時系列データを取得します。 .. contents:: :local: エッジを取得する ---------------- クライアント生成後、参照したいエッジを取得します。エッジリソースへのアクセスには、 :doc:`../edges` を用います。 サーバーに登録されているエッジの一覧を参照する場合、 `list()` メソッドを使用します。 ``name`` パラメーターに文字列を指定すると、その文字列で始まる名前を持つエッジの一覧を取得します。 .. code-block:: python import intdash lst = edges.list(name="admin", limit=10) for edge in lst: print(edge, "\n") # uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e # name: admin # nickname: administrator # description: this is admin user # type: user # disabled: False # created_at: 2020-01-01 00:00:00.000000+00:00 # updated_at: 2020-01-01 00:00:00.000000+00:00 # last_login_at: 2020-01-01 00:00:00.000000+00:00 エッジが多数存在する場合には、 ``iterator`` オプションを指定しイテレータを生成します。 イテレーションごとに、エッジの一覧を取得します。 .. code-block:: python it = edges.list(limit=10, iterator=True) for lst in it: for edge in lst: print(edge, "\n") エッジに紐づく計測を取得する ---------------------------------------- 先程取得したエッジに紐づいている計測(Mesureament)を取得します。 計測リソースへのアクセスは、 :doc:`../measurements` を使用します。 計測へのアクセスオブジェクトを取得します。 .. code-block:: python measurements = client.measurements 計測の一覧の取得には `list()` を使用します。 先程取得したエッジのUUIDを ``edge_uuid`` に指定することで、計測の一覧を取得します。 .. code-block:: python from intdash import timeutils lst = measurements.list( edge_uuid=edge.uuid, start=timeutils.str2timestamp('2020-01-01 00:00:00.000000+00:00'), end=timeutils.str2timestamp('2020-01-02 00:00:00.000000+00:00'), limit=10 ) for m in lst: print(m, "\n") # uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd # name: measurement # description: this is updated description # edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e # duration: 0:00:00 # ended: False # basetime: 1970-01-01 00:00:00+00:00 # basetime_type: undefined # created_at: 2020-01-01 00:00:00.000000+00:00 # updated_at: 2020-01-01 00:00:00.000000+00:00 取得したい計測のUUIDが事前に分かっている場合は、 ``uuid`` を指定して `get()` を実行することにより計測を取得することもできます。 .. code-block:: python measurement = measurements.get( uuid="1cef9b31-a356-4415-8b71-84273bbeb1fd", ) print(measurement) # uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd # name: measurement # description: this is updated description # edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e # duration: 0:00:00 # ended: False # basetime: 1970-01-01 00:00:00+00:00 # basetime_type: undefined # created_at: 2020-01-01 00:00:00.000000+00:00 # updated_at: 2020-01-01 00:00:00.000000+00:00 時系列データを取得する ---------------------------------------- 先程取得した計測に紐づいている時系列データを取得します。時系列データリソースへのアクセスには、以下2つのうちいずれかを使用します。 レスポンス形式が異なるため、詳細な形式は各ドキュメントを参照してください。 * :doc:`../datapoints` * :doc:`../units` このチュートリアルでは、 `client.data_points` を使用した例を解説します。 .. code-block:: python data_points = client.data_points 計測に紐づく時系列データを取得するには、`list()` を使用します。 ``measurement_uuid`` を指定すると、その計測の時系列データの一覧を取得できます。 .. code-block:: python import intdash from intdash import timeutils it = data_points.list( measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd", start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"), end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"), id_queries = [ intdash.IdQuery( data_type=intdash.DataType.string ) ], limit = 1000, iterator = True ) for lst in it: for dp in lst: print(dp, "\n") # time: 2020-01-01T05:08:25.676942+00:00 # measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd # data_type: 10 # channel: 1 # data_id: str_test # data_payload: b'\x08test' .. warning:: データポイント形式を使用する場合、時系列データのペイロードは、 ``data_payload`` にバイナリ形式で格納されます。バイナリのフォーマットはデータ型によって異なります。各データ型を表すクラス(データ型クラス)は時系列データオブジェクトに定義してあります。詳細は、:doc:`../data` を参照してください。ペイロードに格納されたバイナリを対応するクラスに変換するには、各クラスの静的メソッド `from_payload()` を使用してください。 .. code-block:: python import intdash dps = data_points.list( measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd", start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"), end = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"), id_queries = [ intdash.IdQuery( data_type=intdash.DataType.string ) ], limit = 1000, iterator = False ) intdash.data.String.from_payload(dps[0].data_payload) # data_type: str # id: str_test # value: test 1つのエッジに紐づく時系列データを、複数の計測を横断して取得したい場合、 ``edge_name`` もしくは ``edge_uuid`` を指定します。 .. code-block:: python from intdash import timeutils it = data_points.list( edge_name = "admin", start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"), end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"), id_queries = [ intdash.IdQuery( data_type=intdash.DataType.string ) ] limit = 1000, iterator = True ) for dpslst in it: for dps in dpslst: print(dps, "\n") # time: 2020-01-01T05:08:25.676942+00:00 # measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd # data_type: 10 # channel: 1 # data_id: str_test # data_payload: b'\x08test' # time: 2020-01-01T06:10:25.879654+00:00 # measurement_uuid: 10bf1795-0ae4-4e5e-a88b-b2fa0822a393 # data_type: 10 # channel: 1 # data_id: str_test # data_payload: b'\x08test2' 時系列データをサーバーに保存する ---------------------------------------- 時系列データを保存する際は、時系列データリソースへのアクセスを行う `client.data_points` あるいは `client.units` のいずれかを使用します。 使用するアクセスオブジェクトにより、登録の手順が異なります。さらに詳細な情報が必要な場合、各アクセスオブジェクトのドキュメントを参照してください。 - **client.data_points** - `intdash.DataPoint` の配列としてデータを作成します。 - ``data_payload`` はバイナリデータを指定する必要があります。バイナリデータのフォーマットはデータ型ごとに異なります。バイナリフォーマットが不明な場合は :doc:`../data` を参照してください。データ型クラスのオブジェクトからバイナリ形式への変換には、to_payload()メソッドを使用してください。 .. code-block:: python import pandas as pd import intdash dps = [ intdash.DataPoint( elapsed_time = pd.Timedelta(seconds=0), data_type = intdash.DataType.string, channel = 1, data_payload = intdash.data.String(data_id='str_test', value='test1').to_payload(), ), intdash.DataPoint( elapsed_time = pd.Timedelta(seconds=1), data_type = intdash.DataType.string, channel = 1, data_payload = intdash.data.String(data_id='str_test', value='test2').to_payload(), ), intdash.DataPoint( elapsed_time = pd.Timedelta(seconds=2), data_type = intdash.DataType.string, channel = 1, data_payload = intdash.data.String(data_id='str_test', value='test3').to_payload(), ), ... ] 作成したデータのリストを保存するには、 `datapoints.store()` を実行します。 .. code-block:: python data_points = client.data_points data_points.store( measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd", data_points = dps, ) リアルタイムデータを送受信する ---------------------------------------- クライアントを使用して、 WebSocket エンドポイントへ接続します。 .. code-block:: python wsconn = client.connect_websocket() リアルタイムデータの送信には、 `open_upstreams()` メソッドを使用します。 このとき、開くストリームの仕様を指定する `intdash.UpstreamSpec` オブジェクトと、 送信データの生成に使用するイテレータを入力引数として渡します。 .. code-block:: python import pandas as pd from time import sleep import intdash from intdash import data upstream_specs = [ intdash.UpstreamSpec( src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e", dst_edge_uuids = None, resend = False, store = True, measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd", ), ] def generate_units(): basetime = pd.Timestamp.utcnow() yield intdash.Unit( elapsed_time = pd.Timestamp.utcnow() - basetime, channel = 1, data = data.Basetime(type = intdash.BasetimeType.ntp, basetime = basetime) ) while True: sleep(0.1) yield intdash.Unit( elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime), channel = 1, data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'), ) yield intdash.Unit( elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime), channel = 2, data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'), ) iterators = [generate_units()] wsconn.open_upstreams( specs = upstream_specs, iterators = iterators, ) リアルタイムデータの受信には、 `open_downstreams()` メソッドを使用します。 このとき、開くストリームの仕様を指定する `intdash.DownstreamSpec` オブジェクトと、 データ受信時に実行するコールバックメソッドを入力引数として渡します。 .. code-block:: python import intdash downstream_specs = [ intdash.DownstreamSpec( src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e", filters = [ intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1), intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2), ], ), ] def callback(unit): print(unit) callbacks = [callback] wsconn.open_downstreams( specs = downstream_specs, callbacks = callbacks, ) 処理が終了したら、接続を閉じ、計測を終了します。 .. code-block:: python wsconn.close() client.measurements.update( uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd", ended = True, ) 以上の処理は、適切にイベントループを使用して実行する必要があります。 .. code-block:: python from tornado import gen, ioloop @gen.coroutine def main(): try: wsconn = client.connect_websocket() wsconn.open_upstreams( specs = upstream_specs, iterators = iterators, ) wsconn.open_downstreams( specs = downstream_specs, callbacks = callbacks, ) yield gen.sleep(5) finally: wsconn.close() measurements.update( uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd", ended = True, ) ioloop.IOLoop.current().run_sync(lambda: main()) リアルタイムデータを送受信する その2 ---------------------------------------- 新たに追加されたメソッド `connect_iscp()` を使用して、intdashのリアルタイムAPIへ接続する方法を解説します。 これは、 `async/await` 構文での記述に対応した、リアルタイム接続用の新しいAPIです。 `connect_iscp()` には、コネクションが閉じられたときに呼び出されるコールバックである `on_close` を入力引数として渡します。 .. code-block:: python async def on_close(): print("connection closed!") conn = await client.connect_iscp(on_close=on_close) リアルタイムデータの送信には、 `open_upstream()` メソッドを使用します。 このとき、開くストリームの仕様を指定する `intdash.UpstreamSpec` オブジェクトと、 ACK が返却されたときに呼び出されるコールバックである `on_ack` を入力引数として渡します。 .. code-block:: python import pandas as pd import asyncio import intdash from intdash import data basetime = pd.Timestamp.utcnow() meas = client.measurements.create( edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e", basetime=basetime, basetime_type="ntp", ) spec = intdash.UpstreamSpec( src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e", dst_edge_uuids=None, resend=False, store=True, measurement_uuid=meas.uuid, ) async def on_ack(serial, result): print(f"ACK's serial number is {serial}. Result code is {result}.") upstream = await conn.open_upstream(spec=spec, on_ack=on_ack) d = intdash.data.Basetime( type=intdash.BasetimeType.ntp, basetime=basetime, ) await upstream.write( datapoint=intdash.DataPoint( elapsed_time=pd.Timestamp.utcnow() - basetime, channel=1, data_type=d.data_type, data_payload=d.to_payload(), ) ) while True: await asyncio.sleep(0.1) try: d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07') await upstream.write(datapoint=intdash.DataPoint( elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime), channel = 1, data_type=d.data_type, data_payload=d.to_payload(), )) d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07') await upstream.write(datapoint=intdash.DataPoint( elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime), channel = 2, data_type=d.data_type, data_payload=d.to_payload(), )) except intdash.ISCPConnClosedException: break 送信が終了したら、ストリームを閉じます。 このとき、必要に応じて計測を終了させます。 .. code-block:: python await upstream.close() client.measurements.update( uuid = meas.uuid, ended=True, ) リアルタイムデータの受信には、 `open_downstream()` メソッドを使用します。 このとき、開くストリームの仕様を指定する `intdash.DownstreamSpec` オブジェクトと、 データ受信時に呼び出されるコールバックである `on_msg` を入力引数として渡します。 .. code-block:: python import intdash spec = intdash.DownstreamSpec( src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e", filters=[ intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1), intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2), ], ) async def on_msg(datapoint): print(datapoint) downstream = await conn.open_downstream(spec=spec, on_msg=on_msg) 受信が終了したら、ストリームを閉じます。 .. code-block:: python await downstream.close() 送受信処理が終了したら、接続を閉じます。 .. code-block:: python await conn.close() 新メソッドはネイティブコルーチンを使用して実装されているため、 以上の処理は、適切にイベントループを使用して実行する必要があります。 .. code-block:: python async def coroutine(): conn = await client.connect_iscp(on_close=on_close) dspec = intdash.DownstreamSpec(...) downstream = await conn.open_downstream(spec=dspec, on_msg=on_msg) basetime = pd.Timestamp.utcnow() meas = client.measurements.create(...) uspec = intdash.UpstreamSpec(...) upstream = await conn.open_upstream(spec=uspec, on_ack=on_ack) d = intdash.data.Basetime(...) await upstream.write(...) async def deadline(upstream): await asyncio.sleep(10) await upstream.close() asyncio.ensure_future(deadline(upstream)) while True: try: d = data.CAN(...) await upstream.write(...) d = data.CAN(...) await upstream.write(...) except intdash.ISCPConnClosedException: break client.measurements.update(...) loop = asyncio.get_event_loop() loop.run_until_complete(coroutine()) loop.close() また、接続の開始やストリームのオープンには、 `async with` 構文を使用することができます。 .. code-block:: python async def coroutine(): async with await client.connect_iscp(on_close=on_close) as conn: dspec = intdash.DownstreamSpec(...) async with await conn.open_downstream(spec=dspec, on_msg=on_msg) as downstream: basetime = pd.Timestamp.utcnow() meas = client.measurements.create(...) uspec = intdash.UpstreamSpec(...) async with await conn.open_upstream(spec=uspec, on_ack=on_ack) as upstream: d = intdash.data.Basetime(...) await upstream.write(...) async def deadline(upstream): await asyncio.sleep(10) await upstream.close() asyncio.ensure_future(deadline(upstream)) while True: try: d = data.CAN(...) await upstream.write(...) d = data.CAN(...) await upstream.write(...) except intdash.ISCPConnClosedException: break client.measurements.update(...) loop = asyncio.get_event_loop() loop.run_until_complete(coroutine()) loop.close()