# Copyright 2020 Aptpod, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from debtcollector import removals
from intdash import _models, _protocol, _utils, timeutils
__all__ = ["DataPoints"]
[docs]class DataPoints(object):
"""時系列データ(データポイントリソース)へのアクセスオブジェクトです。"""
def __init__(self):
self.client = None
[docs] def store(self, measurement_uuid, data_points, serial_number=0, final=True):
"""時系列データ(データポイント)をサーバーへ保存します。
Args:
measurement_uuid (str): 保存先の計測のUUID
data_points (list[DataPoint]): 保存するデータポイントオブジェクトのリスト
serial_number (int): セクションの通し番号
final (bool): 最終フラグ
Examples:
`DataPoint` の `data_payload` には、バイナリデータを指定します。バイナリデータは、使用する `data_type` に合わせて `intdash.data.Data` の `to_payload()` を使用することにより作成できます。
>>> data_points = [
DataPoint(
elapsed_time=pd.Timedelta(seconds=0) ,
data_type=DataType.int.value,
channel=1,
data_payload=intdash.data.Int(data_id='test', value=2).to_payload()
)
]
>>> client.data_points.store(
measurement_uuid=sample_measurement.uuid,
data_points=data_points
)
.. note::
一回の処理で指定できる ``data_points`` の容量は2GBまでです。2GBを超える場合は、 ``store()`` を複数回実行してください。
"""
req = _utils._create_req_store_protobuf(
data_points=data_points,
serial_number=0,
measurement_uuid=measurement_uuid,
final=final,
)
resp = self.client._request(
method="post",
spath="/api/v1/measurements/data",
headers={"Content-Type": "application/protobuf"},
data=req,
).content
for ack in _utils._read_resp_body(resp):
if type(ack) != _protocol.SectionAck:
raise RuntimeError("unexpected datapoint received")
if ack.result_code != _protocol.ResultCode.OK:
raise RuntimeError(
"result_code is not ok: {result_code}".format(
result_code=ack.result_code
)
)
[docs] def list(
self,
start,
end,
measurement_uuid=None,
edge_uuid=None,
edge_name=None,
id_queries=None,
labels=None,
limit=None,
iterator=False,
exit_on_error=False,
):
"""時系列データ(データポイント)のリストを取得します。
Args:
start (pandas.Timestamp): 取得対象範囲の始点
end (pandas.Timestamp): 取得対象範囲の終点
measurement_uuid (str): 取得元の計測のUUID
edge_uuid (str): 取得元のエッジUUID
edge_name (str): 取得元のエッジ名
id_queries (list[IdQuery]): 取得対象のidのリスト
labels (list[str]): 取得対象のラベル名
limit (int): 最大取得件数
iterator (bool): Trueの場合、イテレータを生成します
exit_on_error (bool): Trueの場合、取得中にエラーが発生すると処理を中断し、中断前までのDataResponseのリストを返します
Returns:
list[DataResponse]: データオブジェクト
Examples:
出力されたintdash.DataResponseの `data_payload` はバイナリデータです。使用する `data_type` に合わせて、 `intdash.data.Data` の `from_payload()` を使用することで、物理値を得ることができます。
>>> dps = client.data_points.list(
measurement_uuid=sample_measurement.uuid,
id_queries=[
intdash.IdQuery(
data_type=intdash.DataType.int.value,
data_id='test'
)
],
start=timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
end=timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
)
>>> print(intdash.data.Int.from_payload(dps[0].data_payload))
data_type: int
data_id: test
value: 2
.. note::
``measurement_uuid`` ``edge_uuid`` ``edge_name`` はいずれか1つを指定してください。
同時に指定された場合、``measurement_uuid`` > ``edge_uuid`` > ``edge_name`` の優先順位で参照し、低順位のものは無視されます。
.. note::
``labels`` と ``id_queries`` 両方を指定した場合、双方いずれかにあてはまるデータすべてが対象となります。
(ただし ``labels`` については、別途「信号定義」を登録する必要があります)
.. note::
``limit`` を指定しない場合、指定範囲の全データを取得します。取得データの容量が大きい場合、``limit`` に取得数の上限を指定し ``iterator`` を ``True`` にすることで、
上限ごとに繰り返し取得処理を実行するイテレーターを使用することができます。詳しくは、 :doc:`guide/tutolial` の **時系列データを取得する** を参考にしてください。
.. note::
サーバー側の取得処理にて例外が発生すると、例外メッセージを格納したDataResponseが出力され、処理自体は正常に終了します (以下サンプル参照)。この時、Unitの ``data_type`` はStringです。
例外発生時に処理を中断したい場合、 ``exit_on_error`` に ``True`` を指定してください。
Examples:
以下は、データ取得時に例外が発生した場合のサンプルです。
エラーメッセージがDataResponseに格納されています。エラーメッセージを格納したDataResponseの `data_type` はStringです。 `data_id` には、エラーが発生したエンドポイントのnamespaceが出力されます。
DataResponseの内容を確認することで、再取得や原因究明に使用できます。
>>> dps = lc.data_points.list(
measurement_uuid=sample_measurement.uuid,
labels=['nmea'],
start=sample_measurement.basetime,
end=sample_measurement.basetime + pd.Timedelta(seconds=10)
)
>>> import json
>>> for p in dps:
if 'error' in p.data_id:
print(p)
data = intdash.data.String.from_payload(p.data_payload)
error_message = json.loads(data.value)['error_description']
raise ValueError(f'contains failed data: {error_message}')
# time: 2020-06-23T05:08:25.676942+00:00
# measurement_uuid: 3b5b9bed-d509-4198-aea0-54ee714f7a5b
# data_type: 10
# channel: 1
# data_id: intdash/measurement/get/data/error
# data_payload: b'\x05error{"error":"converted_error","error_description":"Error occurred in signal conversion","error_extra":{"signal_channel":1,"signal_data_id":"GPRMC","signal_data_type":2,"signal_label":"nmea"}}'
ValueError: contains failed data: Error occurred in signal conversion
"""
name = None
if measurement_uuid:
name = measurement_uuid
elif edge_uuid:
name = edge_uuid
elif edge_name:
name = edge_name
if name is None:
raise ValueError(
"please define `measurement_uuid` , `edge_uuid` or `edge_name`"
)
if iterator:
return self._iter_lists(
name=name,
start=start,
end=end,
id_queries=id_queries,
labels=labels,
limit=limit,
exit_on_error=exit_on_error,
)
return self._getdatapoints(
name=name,
start=start,
end=end,
labels=labels,
id_queries=id_queries,
limit=limit,
exit_on_error=exit_on_error,
)
def _iter_lists(self, name, start, end, id_queries, labels, limit, exit_on_error):
return self._getdatapoints_iterator(
name=name,
start=start,
end=end,
labels=labels,
id_queries=id_queries,
limit=limit,
exit_on_error=exit_on_error,
)
def _getdatapoints(
self, name, start, end, id_queries, labels, exit_on_error, limit=1000
):
query = [
("name", name),
("start", timeutils.timestamp2str(start)),
("end", timeutils.timestamp2str(end)),
("exit_on_error", exit_on_error),
]
if limit:
query.append(("limit", limit))
if labels:
for label in labels:
query.append(("label", label))
if id_queries:
query.append(
(
"idq",
[
"{data_type}:{channel}/{data_id}".format(
data_type=item.data_type,
channel=item.channel,
data_id=item.data_id,
)
for item in id_queries
],
)
)
resp = self.client._request(
method="get",
spath="/api/v1/data",
query=query,
headers={"Accept": "application/protobuf"},
).content
data_points = _utils._read_resp_body_protobuf(resp)
data = []
for d in data_points:
data.append(
_models.DataResponse(
time=timeutils.unix2timestamp(d.time.seconds, d.time.nanos),
measurement_uuid=d.measurement_uuid,
data_type=_models.DataType(int(d.data_type)),
data_id="/".join(d.data_id.split("/")[1:]),
channel=int(d.data_id.split("/")[0]),
data_payload=d.data_payload,
)
)
return data
def _getdatapoints_iterator(
self, name, start, end, labels, id_queries, exit_on_error, limit=1000
):
ended = False
while True:
if ended:
return
data_points = self._getdatapoints(
name=name,
start=start,
end=end,
labels=labels,
id_queries=id_queries,
limit=limit,
exit_on_error=exit_on_error,
)
if len(data_points) == 0:
return
start = timeutils.str2timestamp(
data_points[-1].time
) + timeutils.micro2timedelta(1)
if end <= start:
ended = True
yield data_points