4. チュートリアル1C: API/SDKを使ってサーバーのデータを加工してみよう
3つめのチュートリアルでは、intdashサーバーに保存されているデータにAPIを使ってアクセスし、簡単な加工を行います。 さらに、加工したデータを再度intdashサーバーに格納し、Data Visualizer上で加工前後のデータを比較します。
4.1. ダッシュボードを変更する
4.1.1. 加工後のデータ用のデータ設定を登録する
加工後のデータをバインドできるように、加工後のデータに適したデータ設定を登録します。
以下のリンク先からデータ設定ファイルをダウンロードします。
データ設定ファイルを読み込む と同様の手順で登録します。
4.1.2. 加工後のデータが入ったスクリーンを読み込む
加工前と加工後の両方のデータを表示できるダッシュボードに変更します。
以下のリンク先からスクリーン定義ファイルをダウンロードします。
ダッシュボードの設定ファイルを読み込む と同様の手順で登録します。すべてのパネルのすべてのデータの取得元として
edge1
を指定してください。
4.2. CSVファイルをアップロードする
可視化したいCSVデータをintdashに格納します。CSVファイルのアップロード方法は、前のチュートリアルの CSVファイルをアップロードする と同一のため、ここでは説明を省略します。
事前に チュートリアル1A: サンプルデータを可視化してみよう を実施済みの方は、intdashサーバー上にデータが存在するため次の手順に進んで構いません。
4.3. データを取得して加工する
intdashサーバーに格納されたデータにAPIを用いてアクセスし、加工したうえで、結果を再度intdashのサーバーに格納します。 以降、具体的なソースコードを交えて解説します。
4.3.1. APIアクセス用の設定情報を設定する
まず最初に、以下のようにAPIアクセス用の設定情報を設定します。
INTDASH_URL
: intdashサーバーのURLAPI_TOKEN
: ユーザーのAPIトークンPROJECT_UUID
: 使用するプロジェクトのUUID
HEADERS
は上記情報により自動的に設定されます。
INTDASH_URL = "https://example.intdash.jp"
PROJECT_UUID = "00000000-0000-0000-0000-000000000000" # Null UUIDは、Global Projectに相当します
API_TOKEN = "*************************************"
HEADERS = {"X-Intdash-Token": API_TOKEN}
また、以降で使用する各種メソッドを定義しておきます。
import requests
def get(path, params={}):
kwargs = {
"url": INTDASH_URL + "/api" + path,
"headers": HEADERS,
"params": params,
}
resp = requests.get(**kwargs)
resp.raise_for_status()
return resp.json()
def post(path, body=None):
kwargs = {
"url": INTDASH_URL + "/api" + path,
"headers": HEADERS,
}
if body is not None:
kwargs["json"] = body
resp = requests.post(**kwargs)
resp.raise_for_status()
return resp.json()
def put(path, body=None):
kwargs = {
"url": INTDASH_URL + "/api" + path,
"headers": HEADERS,
}
if body is not None:
kwargs["json"] = body
resp = requests.put(**kwargs)
resp.raise_for_status()
4.3.2. CSVアップロードに使用したエッジの情報を取得する
次に、CSVファイルをアップロードする際に使用したエッジの情報を取得します。
あるプロジェクトに所属しているエッジの情報の取得には、REST APIの「AUTH > Project Edges > List Project Edges」エンドポイントを使用します。
以降の説明では、 edge1
という名前を持つエッジを使用したものとして解説します。
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def get_edge_by_nickname(nickname: str) -> dict:
page = 1
while True:
resp = get(
path=f"/auth/projects/{PROJECT_UUID}/edges",
params={"page": page},
)
for edge in resp["items"]:
if edge["nickname"] == nickname:
return edge
if resp["page"]["next"]:
page += 1
else:
return None
# -----------------------------------------
# 実行
# -----------------------------------------
edge1 = get_edge_by_nickname("edge1")
print(edge1)
4.3.3. CSVアップロードにより作成された計測を取得する
START
と END
により時間範囲を指定して、計測のリストを取得します。
あるプロジェクトに所属している計測の情報の取得には、REST APIの「MEAS > Measurements > List Project Measurements」エンドポイントを使用します。
import pandas as pd
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def get_measurements(edge_uuid: str, start: pd.Timestamp, end: pd.Timestamp) -> list[dict]:
measurements = []
page = 1
while True:
resp = get(
path=f"/v1/projects/{PROJECT_UUID}/measurements",
params={
"edge_uuid": edge_uuid,
"start": start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"end": end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"page": page,
},
)
for meas in resp["items"]:
measurements.append(meas)
if resp["page"]["next"]:
page += 1
else:
break
return measurements
# -----------------------------------------
# 実行
# -----------------------------------------
START = "2020/07/01 00:00:00+09:00"
END = "2020/08/01 00:00:00+09:00"
measurements = get_measurements(
edge_uuid=edge1["edge_uuid"],
start=pd.Timestamp(START),
end=pd.Timestamp(END),
)
measurement1 = measurements[0]
print(measurement1)
4.3.4. 時系列データを取得する
計測に格納されている時系列データをintdashサーバーから取得します。
ここでは、データタイプ、チャンネル番号、データIDなどを指定することにより、目的のデータポイントだけを取得します。
今回 CSVファイルをアップロードする でアップロードしたデータは、 iSCPv1 Float型、チャンネル番号 1
のデータとして保存されているため、
DATA_TYPE = "11"
、 CHANNEL = 1
を指定します。
また、CSVファイルをアップロードした場合、データIDとしてCSVファイルの列名が使用されるため、取得対象のデータIDとして sp_ACCX
, sp_ACCY
等の列名を指定します。
ここでは、加速度データのみ( sp_ACCX
, sp_ACCY
, sp_ACCZ
)を取得することにします。
あるプロジェクトに所属しているデータポイントの取得には、REST APIの「MEAS > Data Points > List Project Data Points」エンドポイントを使用します。
取得するデータを指定するには、 idq
パラメータを使用します。
このエンドポイントは、レスポンスが Transfer-Encoding: chunked
形式で、改行区切りのJSONによって返却されますので、
他のエンドポイントと比較して若干特殊なパース処理をしている点にご注意ください。
import json
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def get_data_points(meas_uuid: str, idq: list[str]) -> list[dict]:
resp = requests.get(
url=INTDASH_URL + f"/api/v1/projects/{PROJECT_UUID}/data",
headers=HEADERS,
params={
"name": meas_uuid,
"idq": idq,
"time_format": "ns",
},
stream=True,
)
resp.raise_for_status()
data_points = []
received = ""
for chunk in resp.iter_content(chunk_size=None, decode_unicode=True):
received += chunk
for line in received.split("\n"):
try:
decoded = json.loads(line)
data_points.append(decoded)
except ValueError:
received = line
break
return data_points
# -----------------------------------------
# 実行
# -----------------------------------------
DATA_TYPE = "11"
CHANNEL = 1
data_points = get_data_points(
meas_uuid=measurement1["uuid"],
idq=[
f"{DATA_TYPE}:{CHANNEL}/sp_ACCX",
f"{DATA_TYPE}:{CHANNEL}/sp_ACCY",
f"{DATA_TYPE}:{CHANNEL}/sp_ACCZ",
],
)
print(len(data_points))
print(data_points[0])
4.3.5. 取得したデータを可視化する
取得した時系列データを、 pandas.DataFrame
に変換し、 plot
メソッドにより可視化します。
まず、取得したデータを pandas.DataFrame
形式に変換します。
事前にCSVアップロードにより保存されたデータは、iSCPv1 Float 型として保存されているため、デコードして値に戻します。
APIから返却されたデータポイントのフォーマットについては API仕様書 を参照してください。
ペイロードに格納されたバイナリのフォーマットについては、 詳説 iSCP 1.0 を参照してください。
df = pd.DataFrame(
[{
"time": pd.Timestamp(dp["time"]).isoformat()+"Z",
dp["data"]["i"]: dp["data"]["d"],
} for dp in data_points]
).groupby("time").last().reindex(columns=["sp_ACCX", "sp_ACCY", "sp_ACCZ"])
print(df)
次に、 plot
メソッドを用いてデータを描画します。
描画した結果、以下のように表示されます。
from matplotlib import pyplot as plt
df.plot(figsize=(15,6), grid=True, ylim=[-10, 10])
plt.xticks(rotation=-90)
plt.show()
plt.close()
4.3.6. 取得したデータを加工する
次に、取得したデータに対して加工処理を実行し、データの変化を確認します。 本チュートリアルでは、加工処理として移動平均を計算してみます。
df_converted = df.rolling(5).mean()
df_converted.plot(figsize=(15,6), grid=True, ylim=[-10, 10])
plt.xticks(rotation=-90)
plt.show()
plt.close()
移動平均を実行したグラフは、元のグラフよりなめらかになっていることが分かります。
4.3.7. 加工済みのデータをintdashにアップロードする
最後に、加工済みのデータを新しい計測としてintdashにアップロードします。
まず、計測を作成します。あるプロジェクトへの計測の作成には、REST APIの「MEAS > Measurements > Create Project Measurement」エンドポイントを使用します。
基準時刻(Base Time、計測が開始された時刻)は、最初のデータポイントが持つ時刻にします。
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def create_measurement(edge_uuid: str, base_time: pd.Timestamp):
return post(
path=f"/v1/projects/{PROJECT_UUID}/measurements",
body={
"basetime": base_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"basetime_type": "manual",
"edge_uuid": edge_uuid,
},
)
# -----------------------------------------
# 実行
# -----------------------------------------
base_time = pd.Timestamp(df_converted.index[0])
new_meas = create_measurement(edge1["edge_uuid"], base_time)
print(new_meas)
ある計測へのデータポイントの保存は以下の手順で行います。
REST APIの「MEAS > Measurements > Replace Project Measurement Sequence」エンドポイントを使い、新しいシーケンスを作成する。
REST APIの「MEAS > Measurements > Create Project Measurement Sequence Chunk」エンドポイントで、上記のシーケンス内にチャンクを作成する(このチャンク内にデータポイントグループが作成され、データポイントグループ内にデータポイントが作成されます)。
なお、アップロードするデータは、iSCPv1 互換形式の iSCPv2 Float64型、チャンネル番号 1 のデータとして保存するため、 ここでは DATA_TYPE = "float64" 、 CHANNEL = 1 を使用します。 アップロードに使用するデータポイントのペイロードフォーマットの詳細は iSCPv2 のプロトコル仕様書 を参照してください。 iSCPv2 の iSCPv1 との互換形式の詳細については、 こちらの解説ドキュメント を参照してください。
import uuid
import base64
import struct
# -----------------------------------------
# 関数の定義
# -----------------------------------------
CHANNEL = 1
DATA_TYPE = "float64"
names_converted = ["cov_ACCX", "cov_ACCY", "cov_ACCZ"]
def store_data_points(meas_uuid: str, df: pd.DataFrame):
seq_uuid = str(uuid.uuid4())
put(
path=f"/v1/projects/{PROJECT_UUID}/measurements/{meas_uuid}/sequences/{seq_uuid}",
body={},
)
resp = post(
path=f"/v1/projects/{PROJECT_UUID}/measurements/sequences/chunks",
body={
"meas_uuid": meas_uuid,
"sequence_uuid": seq_uuid,
"chunks": [{
"sequence_number": 1,
"data_point_groups": [{
"data_id": {"name": f"v1/{CHANNEL}/{name}", "type": DATA_TYPE},
"data_points": [{
"elapsed_time": (pd.Timestamp(time)-base_time).value,
"payload": base64.b64encode(struct.pack(">d", value)).decode(),
} for time, value in cols.items()],
} for name, (_, cols) in zip(names_converted, df.items())],
}],
},
)
for item in resp["items"]:
if item["result"] != "ok":
print(f"サーバーでの保存処理に失敗しました: シーケンス番号={item['sequence_number']}")
# -----------------------------------------
# 実行
# -----------------------------------------
store_data_points(new_meas["uuid"], df_converted)
最後に、計測のステータスを完了に変更します。
# -----------------------------------------
# 関数の定義
# -----------------------------------------
def complete_measurement(meas_uuid: str):
put(
path=f"/v1/projects/{PROJECT_UUID}/measurements/{meas_uuid}/complete",
)
# -----------------------------------------
# 実行
# -----------------------------------------
complete_measurement(new_meas["uuid"])
これでアップロードは完了です。
4.4. 加工したデータを表示する
intdashサーバーに格納した加工前のデータと加工後のデータの両方を、Data Visualizerを用いて可視化してみます。 本手順は アップロードしたデータをダッシュボードで可視化する と同一の手順のため、ここでは説明を省略します。