3. チュートリアル2B: スマートフォンから送信されたデータをPCに保存してみよう

このチュートリアルでは、intdashサーバーに保存されたデータをAPIからダウンロードしてローカルに保存します。 センサーデータはCSVファイル、映像はJPEG画像ファイルとして保存します。このチュートリアルにより、以下を知ることができます。

  • intdashサーバーに保存されているセンサーデータをCSVファイルとしてダウンロードする方法

  • intdashサーバーに保存されている映像データをJPEG画像ファイルとしてダウンロードする方法

../_images/save-as-csv-mjpeg-tutorial.png

以降では、あらかじめ チュートリアル2A: スマートフォンから送信されたデータをリアルタイム可視化してみよう を実行して、データがサーバーに保存されているものとして解説します。

3.1. APIを使用するための事前準備を行う

まずは、APIによりデータを取得するための事前準備を行います。

3.1.1. APIアクセス用の設定情報を設定する

最初に、以下のようにAPIアクセス用の設定情報を設定します。

  • INTDASH_URL : intdashサーバーのURL

  • API_TOKEN : ユーザーのAPIトークン

  • PROJECT_UUID : 使用するプロジェクトのUUID

HEADERS は上記情報により自動的に設定されます。

INTDASH_URL = "https://example.intdash.jp"
PROJECT_UUID = "00000000-0000-0000-0000-000000000000" # Null UUIDは、Global Projectに相当します
API_TOKEN = "*************************************"

HEADERS = {"X-Intdash-Token": API_TOKEN}

また、以降で使用する各種メソッドを定義しておきます。

import requests

def get(path, params={}):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
        "params": params,
    }
    resp = requests.get(**kwargs)
    resp.raise_for_status()
    return resp.json()

def post(path, body=None):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
    }
    if body is not None:
        kwargs["json"] = body
    resp = requests.post(**kwargs)
    resp.raise_for_status()
    return resp.json()

def put(path, body=None):
    kwargs = {
        "url": INTDASH_URL + "/api" + path,
        "headers": HEADERS,
    }
    if body is not None:
        kwargs["json"] = body
    resp = requests.put(**kwargs)
    resp.raise_for_status()

3.1.2. intdash Motionからのデータ送信に使用したエッジの情報を取得する

次に、CSVファイルをアップロードする際に使用したエッジの情報を取得します。

あるプロジェクトに所属しているエッジの情報の取得には、REST APIの「AUTH > Project Edges > List Project Edges」エンドポイントを使用します。

以降の説明では、 edge1 という名前を持つエッジを使用したものとして解説します。

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def get_edge_by_nickname(nickname: str) -> dict:
    page = 1
    while True:
        resp = get(
            path=f"/auth/projects/{PROJECT_UUID}/edges",
            params={"page": page},
        )
        for edge in resp["items"]:
            if edge["nickname"] == nickname:
                return edge

        if resp["page"]["next"]:
            page += 1
        else:
            return None

# -----------------------------------------
# 実行
# -----------------------------------------

edge1 = get_edge_by_nickname("edge1")
print(edge1)

3.1.3. intdash Motionからのデータ送信により作成された計測を取得する

STARTEND により時間範囲を指定して、計測のリストを取得します。

あるプロジェクトに所属している計測の情報の取得には、REST APIの「MEAS > Measurements > List Project Measurements」エンドポイントを使用します。

import pandas as pd

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def get_measurements(edge_uuid: str, start: pd.Timestamp, end: pd.Timestamp) -> list[dict]:
    measurements = []
    page = 1
    while True:
        resp = get(
            path=f"/v1/projects/{PROJECT_UUID}/measurements",
            params={
                "edge_uuid": edge_uuid,
                "start": start.tz_convert('utc').strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "end": end.tz_convert('utc').strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "page": page,
            },
        )

        for meas in resp["items"]:
            measurements.append(meas)

        if resp["page"]["next"]:
            page += 1
        else:
            break

    return measurements

# -----------------------------------------
# 実行
# -----------------------------------------

START = "2023/06/01 00:00:00+09:00"
END = "2023/07/01 00:00:00+09:00"

measurements = get_measurements(
    edge_uuid=edge1["edge_uuid"],
    start=pd.Timestamp(START),
    end=pd.Timestamp(END),
)

measurement1 = measurements[0]
print(measurement1)

3.2. センサーデータをCSVファイルとして保存する

事前準備で取得した計測に格納されたセンサーデータを、CSVファイルとして保存する方法を解説します。

3.2.1. 時系列データを取得する

intdashサーバーから、取得した計測に格納されている時系列データを取得します。

なお、intdash Motionから送信されたセンサーデータは、 iSCPv1 General Sensor型のデータとして保存されているため、ここでは DATA_TYPE = "3" を使用します。 さらに、デフォルトではチャンネル番号として 1 が使用されるため、ここでは CHANNEL = 1 を使用します。 intdash Motionの設定項目で、他のチャンネルに設定を変更している場合は、適宜値を変更してください。

今回は、送信された時系列データのうち、加速度データ( ID 0001 )と回転速度データ( ID 0004 )を対象として取得します。

あるプロジェクトに所属しているデータポイントの取得には、REST APIの「MEAS > Data Points > List Project Data Points」エンドポイントを使用します。

取得するデータを指定するには、 idq パラメータを使用します。 このエンドポイントは、レスポンスが Transfer-Encoding: chunked 形式で、改行区切りのJSONによって返却されますので、 他のエンドポイントと比較して若干特殊なパース処理をしている点にご注意ください。

import json

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def get_data_points(meas_uuid: str, idq: list[str]) -> list[dict]:
    resp = requests.get(
        url=INTDASH_URL + f"/api/v1/projects/{PROJECT_UUID}/data",
        headers=HEADERS,
        params={
            "name": meas_uuid,
            "idq": idq,
            "time_format": "ns",
        },
        stream=True,
    )
    resp.raise_for_status()

    data_points = []
    received = ""
    for chunk in resp.iter_content(chunk_size=None, decode_unicode=True):
        received += chunk
        for line in received.split("\n"):
            try:
                decoded = json.loads(line)
                data_points.append(decoded)
            except ValueError:
                received = line
                break

    return data_points

# -----------------------------------------
# 実行
# -----------------------------------------

DATA_TYPE = "3"
CHANNEL = 1

data_points = get_data_points(
    meas_uuid=measurement1["uuid"],
    idq=[
        f"{DATA_TYPE}:{CHANNEL}/0001",
        f"{DATA_TYPE}:{CHANNEL}/0004",
    ],
)
print(len(data_points))
print(data_points[0])

3.2.2. 取得したデータをDataFrameに変換する

取得した時系列データを、 pandas.DataFrame に変換します。

事前にintdash Motionから送信されたセンサーデータは、iSCPv1 General Sensor 型として保存されているため、デコードして値に戻します。

APIから返却されたデータポイントのフォーマットは API仕様書 を参照してください。 ペイロードに格納されたバイナリーのフォーマットについては、 詳説 iSCP 1.0 を参照してください。

import struct
import base64

# -----------------------------------------
# 関数の定義
# -----------------------------------------

def convert(data_point: dict) -> list[dict]:
    match data_point["data"]["i"]:
        case 1: # ACC
            names = ["sp_ACCX", "sp_ACCY", "sp_ACCZ"]
            factor = 1e-6
        case 4: # ROTATION RATE
            names = ["sp_Yaw", "sp_Pitch", "sp_Roll"]
            factor = 1e-5

    bin = base64.b64decode(data_point["data"]["d"])
    values = [float(v)*factor for v in struct.unpack("<iii", bin)]
    return [{
        "time": data_point["time"],
        "name": name,
        "value": value,
    } for (name, value) in zip(names, values)]

# -----------------------------------------
# 実行
# -----------------------------------------

converted = sum([convert(dp) for dp in data_points], [])
df = pd.DataFrame(
    [{
        "time": pd.Timestamp(dp["time"]).isoformat()+"Z",
        dp["name"]: dp["value"],
    } for dp in converted]
).groupby("time").last()

print(df)

3.2.3. DataFrameをCSV形式で保存する

DataFrameの機能を使用して、DataFrameをCSVファイルとして保存します。

df.to_csv("./sample.csv")

保存されたデータは、以下のような形式になります。

../_images/saved-csv.png

図 13 保存されたCSVファイルの形式

3.3. 映像データをJPEG画像ファイルとして保存する

次に、センサーデータを取り出した計測と同じ計測から、映像データを取り出してJPEG画像ファイルとして保存する方法を解説します。

3.3.1. 時系列データを取得する

intdash Motionから送信された映像データは、 iSCPv1 JPEG型のデータとして保存されているため、ここでは DATA_TYPE = "9" を使用します。 さらに、デフォルトではチャンネル番号として 1 が使用されるため、ここでは CHANNEL = 1 を使用します。 intdash Motionの設定項目で、他のチャンネルに設定を変更している場合は、適宜値を変更してください。

JPEG型データでは、IDは jpeg に固定されているため、この値を使用します。

DATA_TYPE = "9"
CHANNEL = 1

data_points = get_data_points(
    meas_uuid=measurement1["uuid"],
    idq=[
        f"{DATA_TYPE}:{CHANNEL}/jpeg",
    ],
)
print(len(data_points))

3.3.2. 取得したデータをJPEG画像ファイルとして書き出す

取得した時系列データを、JPEG画像ファイルに変換します。

事前にintdash Motionから送信されたデータは、iSCPv1 JPEG型として保存されているため、取得したデータポイントからJPEGバイナリーの部分を取り出してファイルに書き出します。

APIから返却されたデータポイントのフォーマットは API仕様書 を参照してください。 ペイロードに格納されたバイナリーのフォーマットについては、 詳説 iSCP 1.0 を参照してください。

各画像ファイルのファイル名には、そのフレームが持つタイムスタンプ値を使用します。

import os
import base64

save_dir = "./images"
os.mkdir(save_dir)

for dp in data_points:
    with open(f"{save_dir}/{pd.Timestamp(dp['time']).value}.jpg", "wb") as f:
        f.write(base64.b64decode(dp["data"]["d"]))

これで、スマートフォンのセンサーデータをローカルに保存することができました。