4. チュートリアル2C: スマートフォンから送信されたデータをリアルタイムに加工処理してみよう

このチュートリアルでは、スマートフォンからのデータをSDKでリアルタイムに取得したり、加工したりする方法を解説します。

加工後のデータを再度intdashに送信することによって、加工前後のデータを同時に可視化できます。 このチュートリアルにより、以下を知ることができます。

  • intdash MotionからのリアルタイムデータをSDKにより取得する

  • 取得したデータをリアルタイムに加工処理する

  • 加工後のデータをSDKからintdashに送信し、Data Visualizerで表示する

../_images/motion-live-moving-average-tutorial.ja.png

4.1. SDKからのデータ送信に使用するエッジアカウントを準備する

このチュートリアルでは、SDKからリアルタイムデータを送信します。 リアルタイムデータを送信する際に使用するエッジアカウントを事前に準備します。

手順は エッジアカウントの作成 と同様です。 こちらの手順を参照して、 sdk_edge というエッジアカウントを新たに作成してください。

本手順においても、UUIDとクライアントシークレットをメモしておく必要はありません。

4.2. Data Visualizerの設定を行う

Data Visualizerで可視化用ダッシュボードを作成します。

4.2.1. ダッシュボードをインポートする

  1. データ設定ファイルとScreenファイルを、以下からダウンロードします。

  2. Data Visualizer画面左側の[Data Settings]( data-settings ) > [Add Group] でグループを作成し、データ設定ファイルをインポートします。

  3. 画面下部のScreen list( screen-list )ボタン > [Import]をクリックして、スクリーン定義ファイルをインポートします。

    インポートしたダッシュボードは、リストの一番右側に追加されますので、クリックして表示させます。

    ../_images/dashboard-realtime-processing.png

    図 14 ダッシュボード

4.2.2. 加工前のデータを表示させるビジュアルパーツを設定する

上側のパネル「Acceleration raw」は、Edge Switcherで指定されたエッジからのデータを表示する設定になっているため、画面下部のEdge Switcher (edge-switcher-icon) をクリックし、 edge1 を選択します。

../_images/edge-switcher-stream-process-select-edge1.png

図 15 Edge Switcherでedge1を選択

注釈

Edge Switcher (edge-switcher-icon) ボタンが表示されていない場合、画面左側の misc-icon MiscメニューでEdge Switcher機能をオンにしてください。

4.2.3. 加工後のデータを表示させるビジュアルパーツを設定する

  1. 下側のパネル「Acceleration Converted」を右クリックし、[Panel Settings]を開きます。

  2. バインドされているデータ「1_sp_ACCX」をクリックします。

    ../_images/panel-settings-sdk-edge.png

    図 16 バインドされているデータ

  3. [Select Edge]で、 sdk_edge (加工後のデータを送信するエッジ)を指定します。

    ../_images/bind-edge-sdk-edge.png

    図 17 パネルにデータ送信元を設定

    注釈

    • 目的のエッジが表示されない場合は、ブラウザーでData Visualizerを再読み込みしてください。

    • [Select Data]はすでに設定されているため、変更の必要はありません。

  4. 同様に、「1_sp_ACCY」「1_sp_ACCZ」についても、 sdk_edge を指定します。

  5. データ送信元エッジの設定が完了したら、[x]をクリックしてPanel Settingsを閉じます。

4.3. SDKでintdash Motionのデータをリアルタイムに加工する

intdash Motionから送信されているリアルタイムデータをSDKを使用して取得し、 リアルタイム加工処理を行ってからintdashに送り返す方法について解説します。

4.3.1. APIアクセス用の設定情報を設定する

最初に、以下のようにAPIアクセス用の設定情報を設定します。

  • INTDASH_URL : intdashサーバーのURL

  • API_TOKEN : ユーザーのAPIトークン

  • PROJECT_UUID : 使用するプロジェクトのUUID

HEADERS は上記情報により自動的に設定されます。

INTDASH_URL = "https://example.intdash.jp"
PROJECT_UUID = "00000000-0000-0000-0000-000000000000" # Null UUIDは、Global Projectに相当します
API_TOKEN = "*************************************"

HEADERS = {"X-Intdash-Token": API_TOKEN}

また、以降で使用する各種メソッドを定義しておきます。

import requests

def get(path, params={}):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
        "params": params,
    }
    resp = requests.get(**kwargs)
    resp.raise_for_status()
    return resp.json()

def post(path, body=None):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
    }
    if body is not None:
        kwargs["json"] = body
    resp = requests.post(**kwargs)
    resp.raise_for_status()
    return resp.json()

def put(path, body=None):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
    }
    if body is not None:
        kwargs["json"] = body
    resp = requests.put(**kwargs)
    resp.raise_for_status()

4.3.2. SDKからのデータ送信に使用するエッジの情報を取得する

次に、SDKからデータを送り返す際に使用するエッジの情報を取得します。

あるプロジェクトに所属しているエッジの情報の取得には、REST APIの「AUTH > Project Edges > List Project Edges」エンドポイントを使用します。

ここでは、 sdk_edge という名前を持つエッジの情報を取得します。

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def get_edge_by_nickname(nickname: str) -> dict:
    page = 1
    while True:
        resp = get(
            path=f"/auth/projects/{PROJECT_UUID}/edges",
            params={"page": page},
        )
        for edge in resp["items"]:
            if edge["nickname"] == nickname:
                return edge

        if resp["page"]["next"]:
            page += 1
        else:
            return None

# -----------------------------------------
# 実行
# -----------------------------------------

edge1 = get_edge_by_nickname("edge1")
print(edge1)

sdk_edge = get_edge_by_nickname("sdk_edge")
print(sdk_edge)

4.3.3. リアルタイム加工処理の内容を定義する

次に、リアルタイム加工処理の内容を実装した関数を定義します。

import struct

import numpy as np
import iscp

# -----------------------------------------
# 移動平均の算出関数の定義
# -----------------------------------------

WINDOW_SIZE = 10

def to_moving_average(new: float, history: np.array) -> float:
    history[:-1], history[-1] = history[1:], new
    return np.nanmean(history)


# -----------------------------------------
# センサーデータを格納したデータポイントから値を抽出して移動平均に変換、新しいデータポイントを生成する関数の定義
# -----------------------------------------

ax_hist = np.nan * np.empty((1, WINDOW_SIZE)).T
ay_hist = np.nan * np.empty((1, WINDOW_SIZE)).T
az_hist = np.nan * np.empty((1, WINDOW_SIZE)).T

def convert_data_point(data_id, data_point):
    (ax, ay, az) = [float(v)*1e-6 for v in struct.unpack("<iii", data_point.payload)]
    ax_ma = to_moving_average(ax, ax_hist)
    ay_ma = to_moving_average(ay, ay_hist)
    az_ma = to_moving_average(az, az_hist)

    return [
        (iscp.DataID(name="v1/1/sp_ACCX", type="float64"), iscp.DataPoint(elapsed_time=data_point.elapsed_time, payload=struct.pack(">d", ax_ma))),
        (iscp.DataID(name="v1/1/sp_ACCY", type="float64"), iscp.DataPoint(elapsed_time=data_point.elapsed_time, payload=struct.pack(">d", ay_ma))),
        (iscp.DataID(name="v1/1/sp_ACCZ", type="float64"), iscp.DataPoint(elapsed_time=data_point.elapsed_time, payload=struct.pack(">d", az_ma))),
    ]

4.3.4. データの送受信を行う非同期関数を定義する

リアルタイムAPIに接続し、ダウンストリームとアップストリームを行う非同期関数を定義します。 iscp ライブラリの処理は asyncio を使用した非同期処理によって実行されるため、非同期関数を定義して実行する必要があります。

import requests
from urllib.parse import urlparse
import asyncio

url = urlparse(INTDASH_URL)
WEBSOCKET_ADDRESS = url.netloc
ENABLE_TLS = url.scheme == "https"

DATA_TYPE_ISCPV2_GENERAL_SENSOR = "general_sensor"
DATA_ID_ACC = "0001"
CHANNEL = 1

async def run_iscp(meas_uuid: str, sdk_edge_uuid: str, edge1_uuid: str):

    # コネクションを開く
    conn_config = {
        "address": WEBSOCKET_ADDRESS,
        "connector": iscp.WebSocketConnector(enable_tls=ENABLE_TLS),
        "token_source": lambda: API_TOKEN,
        "node_id": sdk_edge_uuid,
        "project_uuid": PROJECT_UUID,
    }
    async with await iscp.Conn.connect(**conn_config) as conn:

        # アップストリームを開く
        ustream_config = {
            "session_id": meas_uuid,
            "persist": True,
            "flush_policy": iscp.Immediately(),
        }
        async with await conn.open_upstream(**ustream_config) as ustream:

            # ダウンストリームを開く
            dstream_config = {
                "filters": [iscp.DownstreamFilter(
                    source_node_id=edge1_uuid,
                    data_filters=[
                        iscp.DataFilter(
                            name=f"v1/{CHANNEL}/{DATA_ID_ACC}",
                            type=DATA_TYPE_ISCPV2_GENERAL_SENSOR,
                        ),
                    ],
                )]
            }
            async with await conn.open_downstream(**dstream_config) as dstream:

                # 下り方向でメタデータを受信するループ
                async def start_receive_metadatas():
                    async for meta in dstream.metadatas():
                        if type(meta.metadata) == iscp.BaseTime:
                            bt = meta.metadata

                            # 上り方向へ書き込み
                            await conn.send_base_time(
                                persist=True,
                                base_time=iscp.BaseTime(
                                    session_id=meas_uuid,
                                    name=bt.name,
                                    priority=bt.priority,
                                    elapsed_time=bt.elapsed_time,
                                    base_time=bt.base_time,
                                ),
                            )

                # 下り方向からチャンクを受信するループ
                async def start_receive_chunks():
                    async for chunk in dstream.chunks():
                        for group in chunk.data_point_groups:
                            for point in group.data_points:

                                # データポイントの変換
                                converted_data_points = convert_data_point(group.data_id, point)

                                # 上り方向へ書き込み
                                for new_data_id, new_point in converted_data_points:
                                    await ustream.write_data_points(new_data_id, new_point)

                await asyncio.gather(start_receive_metadatas(), start_receive_chunks())

4.3.5. 返送データを格納するための計測を作成する

送り返したデータを格納するための計測をintdashサーバーに作成します。

あるプロジェクトへの計測の作成には、REST APIの「MEAS > Measurements > Create Project Measurement」エンドポイントを使用します。

基準時刻(Base Time、計測が開始された時刻)は、データ送信時に自動的に設定されるため、このタイミングでは undefined としてUnix時刻の起点を設定します。

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def create_measurement(edge_uuid: str):
    return post(
        path=f"/v1/projects/{PROJECT_UUID}/measurements",
        body={
            "edge_uuid": edge_uuid,
            "basetime_type": "undefined",
            "basetime": "1970-01-01T00:00:00.000000Z",
        },
    )

# -----------------------------------------
# 実行
# -----------------------------------------

new_meas = create_measurement(sdk_edge["edge_uuid"])
print(new_meas)

4.3.6. リアルタイム加工処理を開始する

事前に定義したメソッドを使用して、リアルタイム加工処理を開始します。 Jupyter Notebookでは直接 await により非同期関数を実行できるため以下のように記載されていますが、 Jupyter Notebook以外の環境では、適切に asyncio を使用して非同期関数 run_iscp を実行してください。

await run_iscp(
    meas_uuid=new_meas["uuid"],
    sdk_edge_uuid=sdk_edge["edge_uuid"],
    edge1_uuid=edge1["edge_uuid"],
)

4.4. 加工前後のデータをData Visualizerで可視化する

Data Visualizerで表示を確認します。

intdash Motionから送信されている加速度のデータと、intdash SDKで加工した結果(移動平均)が大きな遅延なく表示されることが確認できます。

../_images/dashboard-realtime-processing-with-data.png

図 18 加速度とその移動平均を表示しているダッシュボード

注釈

リアルタイム加工処理を停止するには、Jupyter Notebook上で、[Kernel] → [Interrupt Kernel]を選択してください。