3. チュートリアル2B: スマートフォンから送信されたデータをPCに保存してみよう
このチュートリアルでは、intdashサーバーに保存されたデータをAPIからダウンロードしてローカルに保存します。 センサーデータはCSVファイル、映像はJPEG画像ファイルとして保存します。このチュートリアルにより、以下を知ることができます。
intdashサーバーに保存されているセンサーデータをCSVファイルとしてダウンロードする方法
intdashサーバーに保存されている映像データをJPEG画像ファイルとしてダウンロードする方法
以降では、あらかじめ チュートリアル2A: スマートフォンから送信されたデータをリアルタイム可視化してみよう を実行して、データがサーバーに保存されているものとして解説します。
3.1. APIを使用するための事前準備を行う
まずは、APIによりデータを取得するための事前準備を行います。
3.1.1. APIアクセス用の設定情報を設定する
最初に、以下のようにAPIアクセス用の設定情報を設定します。
INTDASH_URL
: intdashサーバーのURLAPI_TOKEN
: ユーザーのAPIトークンPROJECT_UUID
: 使用するプロジェクトのUUID
HEADERS
は上記情報により自動的に設定されます。
INTDASH_URL = "https://example.intdash.jp"
PROJECT_UUID = "00000000-0000-0000-0000-000000000000" # Null UUIDは、Global Projectに相当します
API_TOKEN = "*************************************"
HEADERS = {"X-Intdash-Token": API_TOKEN}
また、以降で使用する各種メソッドを定義しておきます。
import requests
def get(path, params={}):
kwargs = {
"url": INTDASH_URL + "/api" + path,
"headers": HEADERS,
"params": params,
}
resp = requests.get(**kwargs)
resp.raise_for_status()
return resp.json()
def post(path, body=None):
kwargs = {
"url": INTDASH_URL + "/api" + path,
"headers": HEADERS,
}
if body is not None:
kwargs["json"] = body
resp = requests.post(**kwargs)
resp.raise_for_status()
return resp.json()
def put(path, body=None):
kwargs = {
"url": INTDASH_URL + "/api" + path,
"headers": HEADERS,
}
if body is not None:
kwargs["json"] = body
resp = requests.put(**kwargs)
resp.raise_for_status()
3.1.2. intdash Motionからのデータ送信に使用したエッジの情報を取得する
次に、CSVファイルをアップロードする際に使用したエッジの情報を取得します。
あるプロジェクトに所属しているエッジの情報の取得には、REST APIの「AUTH > Project Edges > List Project Edges」エンドポイントを使用します。
以降の説明では、 edge1
という名前を持つエッジを使用したものとして解説します。
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def get_edge_by_nickname(nickname: str) -> dict:
page = 1
while True:
resp = get(
path=f"/auth/projects/{PROJECT_UUID}/edges",
params={"page": page},
)
for edge in resp["items"]:
if edge["nickname"] == nickname:
return edge
if resp["page"]["next"]:
page += 1
else:
return None
# -----------------------------------------
# 実行
# -----------------------------------------
edge1 = get_edge_by_nickname("edge1")
print(edge1)
3.1.3. intdash Motionからのデータ送信により作成された計測を取得する
START
と END
により時間範囲を指定して、計測のリストを取得します。
あるプロジェクトに所属している計測の情報の取得には、REST APIの「MEAS > Measurements > List Project Measurements」エンドポイントを使用します。
import pandas as pd
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def get_measurements(edge_uuid: str, start: pd.Timestamp, end: pd.Timestamp) -> list[dict]:
measurements = []
page = 1
while True:
resp = get(
path=f"/v1/projects/{PROJECT_UUID}/measurements",
params={
"edge_uuid": edge_uuid,
"start": start.tz_convert('utc').strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"end": end.tz_convert('utc').strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"page": page,
},
)
for meas in resp["items"]:
measurements.append(meas)
if resp["page"]["next"]:
page += 1
else:
break
return measurements
# -----------------------------------------
# 実行
# -----------------------------------------
START = "2023/06/01 00:00:00+09:00"
END = "2023/07/01 00:00:00+09:00"
measurements = get_measurements(
edge_uuid=edge1["edge_uuid"],
start=pd.Timestamp(START),
end=pd.Timestamp(END),
)
measurement1 = measurements[0]
print(measurement1)
3.2. センサーデータをCSVファイルとして保存する
事前準備で取得した計測に格納されたセンサーデータを、CSVファイルとして保存する方法を解説します。
3.2.1. 時系列データを取得する
intdashサーバーから、取得した計測に格納されている時系列データを取得します。
なお、intdash Motionから送信されたセンサーデータは、 iSCPv1 General Sensor型のデータとして保存されているため、ここでは DATA_TYPE = "3"
を使用します。
さらに、デフォルトではチャンネル番号として 1
が使用されるため、ここでは CHANNEL = 1
を使用します。
intdash Motionの設定項目で、他のチャンネルに設定を変更している場合は、適宜値を変更してください。
今回は、送信された時系列データのうち、加速度データ( ID 0001
)と回転速度データ( ID 0004
)を対象として取得します。
あるプロジェクトに所属しているデータポイントの取得には、REST APIの「MEAS > Data Points > List Project Data Points」エンドポイントを使用します。
取得するデータを指定するには、 idq
パラメータを使用します。
このエンドポイントは、レスポンスが Transfer-Encoding: chunked
形式で、改行区切りのJSONによって返却されますので、
他のエンドポイントと比較して若干特殊なパース処理をしている点にご注意ください。
import json
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def get_data_points(meas_uuid: str, idq: list[str]) -> list[dict]:
resp = requests.get(
url=INTDASH_URL + f"/api/v1/projects/{PROJECT_UUID}/data",
headers=HEADERS,
params={
"name": meas_uuid,
"idq": idq,
"time_format": "ns",
},
stream=True,
)
resp.raise_for_status()
data_points = []
received = ""
for chunk in resp.iter_content(chunk_size=None, decode_unicode=True):
received += chunk
for line in received.split("\n"):
try:
decoded = json.loads(line)
data_points.append(decoded)
except ValueError:
received = line
break
return data_points
# -----------------------------------------
# 実行
# -----------------------------------------
DATA_TYPE = "3"
CHANNEL = 1
data_points = get_data_points(
meas_uuid=measurement1["uuid"],
idq=[
f"{DATA_TYPE}:{CHANNEL}/0001",
f"{DATA_TYPE}:{CHANNEL}/0004",
],
)
print(len(data_points))
print(data_points[0])
3.2.2. 取得したデータをDataFrameに変換する
取得した時系列データを、 pandas.DataFrame
に変換します。
事前にintdash Motionから送信されたセンサーデータは、iSCPv1 General Sensor 型として保存されているため、デコードして値に戻します。
APIから返却されたデータポイントのフォーマットは API仕様書 を参照してください。 ペイロードに格納されたバイナリーのフォーマットについては、 詳説 iSCP 1.0 を参照してください。
import struct
import base64
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def convert(data_point: dict) -> list[dict]:
match data_point["data"]["i"]:
case 1: # ACC
names = ["sp_ACCX", "sp_ACCY", "sp_ACCZ"]
factor = 1e-6
case 4: # ROTATION RATE
names = ["sp_Yaw", "sp_Pitch", "sp_Roll"]
factor = 1e-5
bin = base64.b64decode(data_point["data"]["d"])
values = [float(v)*factor for v in struct.unpack("<iii", bin)]
return [{
"time": data_point["time"],
"name": name,
"value": value,
} for (name, value) in zip(names, values)]
# -----------------------------------------
# 実行
# -----------------------------------------
converted = sum([convert(dp) for dp in data_points], [])
df = pd.DataFrame(
[{
"time": pd.Timestamp(dp["time"]).isoformat()+"Z",
dp["name"]: dp["value"],
} for dp in converted]
).groupby("time").last()
print(df)
3.2.3. DataFrameをCSV形式で保存する
DataFrameの機能を使用して、DataFrameをCSVファイルとして保存します。
df.to_csv("./sample.csv")
保存されたデータは、以下のような形式になります。
3.3. 映像データをJPEG画像ファイルとして保存する
次に、センサーデータを取り出した計測と同じ計測から、映像データを取り出してJPEG画像ファイルとして保存する方法を解説します。
3.3.1. 時系列データを取得する
intdash Motionから送信された映像データは、 iSCPv1 JPEG型のデータとして保存されているため、ここでは DATA_TYPE = "9"
を使用します。
さらに、デフォルトではチャンネル番号として 1
が使用されるため、ここでは CHANNEL = 1
を使用します。
intdash Motionの設定項目で、他のチャンネルに設定を変更している場合は、適宜値を変更してください。
JPEG型データでは、IDは jpeg
に固定されているため、この値を使用します。
DATA_TYPE = "9"
CHANNEL = 1
data_points = get_data_points(
meas_uuid=measurement1["uuid"],
idq=[
f"{DATA_TYPE}:{CHANNEL}/jpeg",
],
)
print(len(data_points))
3.3.2. 取得したデータをJPEG画像ファイルとして書き出す
取得した時系列データを、JPEG画像ファイルに変換します。
事前にintdash Motionから送信されたデータは、iSCPv1 JPEG型として保存されているため、取得したデータポイントからJPEGバイナリーの部分を取り出してファイルに書き出します。
APIから返却されたデータポイントのフォーマットは API仕様書 を参照してください。 ペイロードに格納されたバイナリーのフォーマットについては、 詳説 iSCP 1.0 を参照してください。
各画像ファイルのファイル名には、そのフレームが持つタイムスタンプ値を使用します。
import os
import base64
save_dir = "./images"
os.mkdir(save_dir)
for dp in data_points:
with open(f"{save_dir}/{pd.Timestamp(dp['time']).value}.jpg", "wb") as f:
f.write(base64.b64decode(dp["data"]["d"]))
これで、スマートフォンのセンサーデータをローカルに保存することができました。