4. チュートリアル1C: API/SDKを使ってサーバーのデータを加工してみよう

3つめのチュートリアルでは、intdashサーバーに保存されているデータにAPIを使ってアクセスし、簡単な加工を行います。 さらに、加工したデータを再度intdashサーバーに格納し、Data Visualizer上で加工前後のデータを比較します。

../_images/stored-data-moving-average-tutorial.ja.png

4.1. ダッシュボードを変更する

4.1.1. 加工後のデータ用のデータ設定を登録する

加工後のデータをバインドできるように、加工後のデータに適したデータ設定を登録します。

  1. 以下のリンク先からデータ設定ファイルをダウンロードします。

    1_3_converted_sensor.dat

  2. データ設定ファイルを読み込む と同様の手順で登録します。

4.1.2. 加工後のデータが入ったスクリーンを読み込む

加工前と加工後の両方のデータを表示できるダッシュボードに変更します。

  1. 以下のリンク先からスクリーン定義ファイルをダウンロードします。

    1_3_converted_sensor.scrn

  2. ダッシュボードの設定ファイルを読み込む と同様の手順で登録します。すべてのパネルのすべてのデータの取得元として edge1 を指定してください。

4.2. CSVファイルをアップロードする

可視化したいCSVデータをintdashに格納します。CSVファイルのアップロード方法は、前のチュートリアルの CSVファイルをアップロードする と同一のため、ここでは説明を省略します。

事前に チュートリアル1A: サンプルデータを可視化してみよう を実施済みの方は、intdashサーバー上にデータが存在するため次の手順に進んで構いません。

4.3. データを取得して加工する

intdashサーバーに格納されたデータにAPIを用いてアクセスし、加工したうえで、結果を再度intdashのサーバーに格納します。 以降、具体的なソースコードを交えて解説します。

4.3.1. APIアクセス用の設定情報を設定する

まず最初に、以下のようにAPIアクセス用の設定情報を設定します。

  • INTDASH_URL : intdashサーバーのURL

  • API_TOKEN : ユーザーのAPIトークン

  • PROJECT_UUID : 使用するプロジェクトのUUID

HEADERS は上記情報により自動的に設定されます。

INTDASH_URL = "https://example.intdash.jp"
PROJECT_UUID = "00000000-0000-0000-0000-000000000000" # Null UUIDは、Global Projectに相当します
API_TOKEN = "*************************************"

HEADERS = {"X-Intdash-Token": API_TOKEN}

また、以降で使用する各種メソッドを定義しておきます。

import requests

def get(path, params={}):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
        "params": params,
    }
    resp = requests.get(**kwargs)
    resp.raise_for_status()
    return resp.json()

def post(path, body=None):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
    }
    if body is not None:
        kwargs["json"] = body
    resp = requests.post(**kwargs)
    resp.raise_for_status()
    return resp.json()

def put(path, body=None):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
    }
    if body is not None:
        kwargs["json"] = body
    resp = requests.put(**kwargs)
    resp.raise_for_status()

4.3.2. CSVアップロードに使用したエッジの情報を取得する

次に、CSVファイルをアップロードする際に使用したエッジの情報を取得します。

あるプロジェクトに所属しているエッジの情報の取得には、REST APIの「AUTH > Project Edges > List Project Edges」エンドポイントを使用します。

以降の説明では、 edge1 という名前を持つエッジを使用したものとして解説します。

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def get_edge_by_nickname(nickname: str) -> dict:
    page = 1
    while True:
        resp = get(
            path=f"/auth/projects/{PROJECT_UUID}/edges",
            params={"page": page},
        )
        for edge in resp["items"]:
            if edge["nickname"] == nickname:
                return edge

        if resp["page"]["next"]:
            page += 1
        else:
            return None

# -----------------------------------------
# 実行
# -----------------------------------------

edge1 = get_edge_by_nickname("edge1")
print(edge1)

4.3.3. CSVアップロードにより作成された計測を取得する

STARTEND により時間範囲を指定して、計測のリストを取得します。

あるプロジェクトに所属している計測の情報の取得には、REST APIの「MEAS > Measurements > List Project Measurements」エンドポイントを使用します。

import pandas as pd

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def get_measurements(edge_uuid: str, start: pd.Timestamp, end: pd.Timestamp) -> list[dict]:
    measurements = []
    page = 1
    while True:
        resp = get(
            path=f"/v1/projects/{PROJECT_UUID}/measurements",
            params={
                "edge_uuid": edge_uuid,
                "start": start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "end": end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "page": page,
            },
        )

        for meas in resp["items"]:
            measurements.append(meas)

        if resp["page"]["next"]:
            page += 1
        else:
            break

    return measurements

# -----------------------------------------
# 実行
# -----------------------------------------

START = "2020/07/01 00:00:00+09:00"
END = "2020/08/01 00:00:00+09:00"

measurements = get_measurements(
    edge_uuid=edge1["edge_uuid"],
    start=pd.Timestamp(START),
    end=pd.Timestamp(END),
)

measurement1 = measurements[0]
print(measurement1)

4.3.4. 時系列データを取得する

計測に格納されている時系列データをintdashサーバーから取得します。

ここでは、データタイプ、チャンネル番号、データIDなどを指定することにより、目的のデータポイントだけを取得します。

今回 CSVファイルをアップロードする でアップロードしたデータは、 iSCPv1 Float型、チャンネル番号 1 のデータとして保存されているため、 DATA_TYPE = "11"CHANNEL = 1 を指定します。

また、CSVファイルをアップロードした場合、データIDとしてCSVファイルの列名が使用されるため、取得対象のデータIDとして sp_ACCX , sp_ACCY 等の列名を指定します。 ここでは、加速度データのみ( sp_ACCX , sp_ACCY , sp_ACCZ )を取得することにします。

あるプロジェクトに所属しているデータポイントの取得には、REST APIの「MEAS > Data Points > List Project Data Points」エンドポイントを使用します。

取得するデータを指定するには、 idq パラメータを使用します。 このエンドポイントは、レスポンスが Transfer-Encoding: chunked 形式で、改行区切りのJSONによって返却されますので、 他のエンドポイントと比較して若干特殊なパース処理をしている点にご注意ください。

import json

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def get_data_points(meas_uuid: str, idq: list[str]) -> list[dict]:
    resp = requests.get(
        url=INTDASH_URL + f"/api/v1/projects/{PROJECT_UUID}/data",
        headers=HEADERS,
        params={
            "name": meas_uuid,
            "idq": idq,
            "time_format": "ns",
        },
        stream=True,
    )
    resp.raise_for_status()

    data_points = []
    received = ""
    for chunk in resp.iter_content(chunk_size=None, decode_unicode=True):
        received += chunk
        for line in received.split("\n"):
            try:
                decoded = json.loads(line)
                data_points.append(decoded)
            except ValueError:
                received = line
                break

    return data_points

# -----------------------------------------
# 実行
# -----------------------------------------

DATA_TYPE = "11"
CHANNEL = 1

data_points = get_data_points(
    meas_uuid=measurement1["uuid"],
    idq=[
        f"{DATA_TYPE}:{CHANNEL}/sp_ACCX",
        f"{DATA_TYPE}:{CHANNEL}/sp_ACCY",
        f"{DATA_TYPE}:{CHANNEL}/sp_ACCZ",
    ],
)
print(len(data_points))
print(data_points[0])

4.3.5. 取得したデータを可視化する

取得した時系列データを、 pandas.DataFrame に変換し、 plot メソッドにより可視化します。

まず、取得したデータを pandas.DataFrame 形式に変換します。 事前にCSVアップロードにより保存されたデータは、iSCPv1 Float 型として保存されているため、デコードして値に戻します。 APIから返却されたデータポイントのフォーマットについては API仕様書 を参照してください。 ペイロードに格納されたバイナリのフォーマットについては、 詳説 iSCP 1.0 を参照してください。

df = pd.DataFrame(
    [{
        "time": pd.Timestamp(dp["time"]).isoformat()+"Z",
        dp["data"]["i"]: dp["data"]["d"],
    } for dp in data_points]
).groupby("time").last().reindex(columns=["sp_ACCX", "sp_ACCY", "sp_ACCZ"])

print(df)

次に、 plot メソッドを用いてデータを描画します。 描画した結果、以下のように表示されます。

from matplotlib import pyplot as plt

df.plot(figsize=(15,6), grid=True, ylim=[-10, 10])
plt.xticks(rotation=-90)
plt.show()
plt.close()
../_images/plot_original.png

図 28 CSVアップロードにより作成されたデータ

4.3.6. 取得したデータを加工する

次に、取得したデータに対して加工処理を実行し、データの変化を確認します。 本チュートリアルでは、加工処理として移動平均を計算してみます。

df_converted = df.rolling(5).mean()

df_converted.plot(figsize=(15,6), grid=True, ylim=[-10, 10])
plt.xticks(rotation=-90)
plt.show()
plt.close()
../_images/plot_converted.png

図 29 移動平均のグラフ

移動平均を実行したグラフは、元のグラフよりなめらかになっていることが分かります。

4.3.7. 加工済みのデータをintdashにアップロードする

最後に、加工済みのデータを新しい計測としてintdashにアップロードします。

まず、計測を作成します。あるプロジェクトへの計測の作成には、REST APIの「MEAS > Measurements > Create Project Measurement」エンドポイントを使用します。

基準時刻(Base Time、計測が開始された時刻)は、最初のデータポイントが持つ時刻にします。

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def create_measurement(edge_uuid: str, base_time: pd.Timestamp):
    return post(
        path=f"/v1/projects/{PROJECT_UUID}/measurements",
        body={
            "basetime": base_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
            "basetime_type": "manual",
            "edge_uuid": edge_uuid,
        },
    )

# -----------------------------------------
# 実行
# -----------------------------------------

base_time = pd.Timestamp(df_converted.index[0])
new_meas = create_measurement(edge1["edge_uuid"], base_time)
print(new_meas)

ある計測へのデータポイントの保存は以下の手順で行います。

  1. REST APIの「MEAS > Measurements > Replace Project Measurement Sequence」エンドポイントを使い、新しいシーケンスを作成する。

  2. REST APIの「MEAS > Measurements > Create Project Measurement Sequence Chunk」エンドポイントで、上記のシーケンス内にチャンクを作成する(このチャンク内にデータポイントグループが作成され、データポイントグループ内にデータポイントが作成されます)。

なお、アップロードするデータは、iSCPv1 互換形式の iSCPv2 Float64型、チャンネル番号 1 のデータとして保存するため、 ここでは DATA_TYPE = "float64"CHANNEL = 1 を使用します。 アップロードに使用するデータポイントのペイロードフォーマットの詳細は iSCPv2 のプロトコル仕様書 を参照してください。 iSCPv2 の iSCPv1 との互換形式の詳細については、 こちらの解説ドキュメント を参照してください。

import uuid
import base64
import struct

# -----------------------------------------
# 関数の定義
# -----------------------------------------

CHANNEL = 1
DATA_TYPE = "float64"
names_converted = ["cov_ACCX", "cov_ACCY", "cov_ACCZ"]

def store_data_points(meas_uuid: str, df: pd.DataFrame):
    seq_uuid = str(uuid.uuid4())
    put(
        path=f"/v1/projects/{PROJECT_UUID}/measurements/{meas_uuid}/sequences/{seq_uuid}",
        body={},
    )

    resp = post(
        path=f"/v1/projects/{PROJECT_UUID}/measurements/sequences/chunks",
        body={
            "meas_uuid": meas_uuid,
            "sequence_uuid": seq_uuid,
            "chunks": [{
                "sequence_number": 1,
                "data_point_groups": [{
                    "data_id": {"name": f"v1/{CHANNEL}/{name}", "type": DATA_TYPE},
                    "data_points": [{
                        "elapsed_time": (pd.Timestamp(time)-base_time).value,
                        "payload": base64.b64encode(struct.pack(">d", value)).decode(),
                    } for time, value in cols.items()],
                } for name, (_, cols) in zip(names_converted, df.items())],
            }],
        },
    )

    for item in resp["items"]:
        if item["result"] != "ok":
            print(f"サーバーでの保存処理に失敗しました: シーケンス番号={item['sequence_number']}")

# -----------------------------------------
# 実行
# -----------------------------------------

store_data_points(new_meas["uuid"], df_converted)

最後に、計測のステータスを完了に変更します。

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def complete_measurement(meas_uuid: str):
    put(
        path=f"/v1/projects/{PROJECT_UUID}/measurements/{meas_uuid}/complete",
    )

# -----------------------------------------
# 実行
# -----------------------------------------

complete_measurement(new_meas["uuid"])

これでアップロードは完了です。

4.4. 加工したデータを表示する

intdashサーバーに格納した加工前のデータと加工後のデータの両方を、Data Visualizerを用いて可視化してみます。 本手順は アップロードしたデータをダッシュボードで可視化する と同一の手順のため、ここでは説明を省略します。

../_images/original-and-converted-sensor-values.png

図 30 加工前と加工後のデータの可視化