intdash._units のソースコード

# Copyright 2020 Aptpod, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from debtcollector import removals

from intdash import _models, _protocol, _utils, data, timeutils

__all__ = ["Units"]


[ドキュメント]class Units(object): """時系列データ(ユニット形式)へのアクセスオブジェクトです。""" def __init__(self): self.client = None
[ドキュメント] def store(self, measurement_uuid, units, serial_number=0, final=True): """時系列データ(ユニットオブジェクト)をサーバーへ保存します。 Args: measurement_uuid (str): 保存先の計測のUUID units (list[Unit]): 保存するユニットオブジェクトのリスト serial_number (int): セクションの通し番号 final (bool): 最終フラグ .. note:: 一回の処理に指定できる ``units`` の容量は2GBまでです。2GBを超える場合は、 ``store()`` を複数回実行してください。 """ req = _utils._write_req_body( elems=_utils._create_elems_storedata( units=units, serial_number=serial_number, final=final ) ) resp = self.client._request( method="post", spath="/api/v1/measurements/{measurement_uuid}/data".format( measurement_uuid=measurement_uuid ), data=req, ).content for ack in _utils._read_resp_body(resp): if type(ack) != _protocol.SectionAck: raise RuntimeError("unexpected unit received") if ack.result_code != _protocol.ResultCode.OK: raise RuntimeError( "result_code is not ok: {result_code}".format( result_code=ack.result_code ) )
[ドキュメント] def list( self, start, end, measurement_uuid=None, edge_uuid=None, edge_name=None, id_queries=None, labels=None, limit=None, iterator=False, exit_on_error=False, ): """時系列データ(ユニットオブジェクト)のリストを取得します。 Args: start (pandas.Timestamp): 取得対象期間の開始時刻 end (pandas.Timestamp): 取得対象期間の終了時刻 measurement_uuid (str): 取得元の計測のUUID edge_uuid (str): 取得元のエッジのUUID edge_name (str): 取得元のエッジ名 id_queries (list[IdQuery]): 取得対象のidのリスト labels (list[str]): 取得対象のラベル名 limit (int): 最大取得件数 iterator (bool): Trueの場合、イテレータを生成します exit_on_error (bool): Trueの場合、取得中にエラーが発生すると処理を中断し、中断前までのUnitのリストを返します Returns: list[Unit]: ユニットオブジェクトのリスト .. note:: ``measurement_uuid`` ``edge_uuid`` ``edge_name`` はいずれか1つを指定してください。 同時に指定された場合、``measurement_uuid`` > ``edge_uuid`` > ``edge_name`` の優先順位で参照し、低順位のものは無視されます。 .. note:: ``labels`` と ``id_queries`` 両方を指定した場合、双方いずれかにあてはまるデータすべてが対象となります。 (``labels`` を使用する際は、別途「信号定義」を登録する必要があります。) .. note:: ``limit`` を指定しない場合、指定範囲の全データを取得します。取得データの容量が大きい場合、``limit`` に取得数の上限を指定し ``iterator`` を ``True`` にすることで、 上限ごとに取得するイテレーターを使用することができます。詳しくは、 :doc:`guide/tutolial` の **時系列データを取得する** を参考にしてください。 .. note:: サーバー側の取得処理にて例外が発生すると、例外メッセージを格納したUnitが出力され、処理自体は正常に終了します (以下サンプル参照)。この時、Unitの ``data_type`` はStringです。 例外発生時に処理を中断したい場合、 ``exit_on_error`` に ``True`` を指定してください。 Examples: 以下は、データ取得時に例外が発生した場合のサンプルです。エラーメッセージがUnitに格納されています。エラーメッセージを格納したUnitの `data_type` はStringです。 `id` には、エラーが発生したエンドポイントのnamespaceが出力されます。 Unitの内容を確認することで、再取得や原因究明に使用できます。 >>> us = lc.units.list( measurement_uuid=sample_measurement.uuid, labels=['nmea', 'test'], start=sample_measurement.basetime, end=sample_measurement.basetime + pd.Timedelta(seconds=10), exit_on_error=False ) >>> import json >>> for u in us: if u.data.data_type.value == DataType.basetime.value: continue if 'error' in u.data.id: print(u) error_message = json.loads(u.data.value)['error_description'] raise ValueError(f'contains failed data: {error_message}') # elapsed_time: 0 days 00:00:00 # channel: 1 # measurement_uuid: 3b5b9bed-d509-4198-aea0-54ee714f7a5b # data_type: string # id: intdash/measurement/get/data/error # value: {"error":"converted_error","error_description":"Error occurred in signal conversion","error_extra":{"signal_channel":1,"signal_data_id":"GPRMC","signal_data_type":2,"signal_label":"nmea"}} ValueError: contains failed data: Error occurred in signal conversion """ name = None if measurement_uuid: name = measurement_uuid elif edge_uuid: name = edge_uuid elif edge_name: name = edge_name if name is None: raise ValueError( "please define `measurement_uuid` , `edge_uuid` or `edge_name`" ) if iterator: return self._iter_lists( name=name, start=start, end=end, id_queries=id_queries, labels=labels, limit=limit, exit_on_error=exit_on_error, ) return self._getdatapoints( name=name, start=start, end=end, labels=labels, id_queries=id_queries, limit=limit, exit_on_error=exit_on_error, )
def _iter_lists(self, name, start, end, id_queries, labels, limit, exit_on_error): return self._getdatapoints_iterator( name=name, start=start, end=end, labels=labels, id_queries=id_queries, limit=limit, exit_on_error=exit_on_error, ) def _getdatapoints( self, name, start, end, id_queries, labels, exit_on_error, limit ): query = [ ("name", name), ("start", timeutils.timestamp2str(start)), ("end", timeutils.timestamp2str(end)), ("exit_on_error", exit_on_error), ] if limit: query.append(("limit", limit)) if labels: for label in labels: query.append(("label", label)) if id_queries: query.append( ( "idq", [ "{data_type}:{channel}/{data_id}".format( data_type=item.data_type, channel=item.channel, data_id=item.data_id, ) for item in id_queries ], ) ) resp = self.client._request( method="get", spath="/api/v1/data", query=query, headers={"Accept": "application/protobuf"}, ).content data_points = _utils._read_resp_body_protobuf(resp) units = [] basetime = None for d in data_points: if basetime is None: basetime = timeutils.unix2timestamp(d.time.seconds, d.time.nanos) units.append( _models.Unit( elapsed_time=timeutils.micro2timedelta(0), channel=0, data=data.Basetime( type=_models.BasetimeType.volatile.value, basetime=basetime ), ) ) units.append( _models.Unit( elapsed_time=timeutils.unix2timestamp(d.time.seconds, d.time.nanos) - basetime, channel=int(d.data_id.split("/")[0]), data=data.type_to_data_class(int(d.data_type)).from_payload( d.data_payload ), measurement_uuid=d.measurement_uuid, ) ) return units def _getdatapoints_iterator( self, name, start, end, labels, id_queries, exit_on_error, limit ): ended = False while True: if ended: return units = self._getdatapoints( name=name, start=start, end=end, labels=labels, id_queries=id_queries, limit=limit, exit_on_error=exit_on_error, ) only_basetime = True for u in units: d = u.data if type(d) != data.Basetime: only_basetime = False break if only_basetime: return basetime = None for u in units: d = u.data if type(d) == data.Basetime: basetime = d.basetime break start = basetime + units[-1].elapsed_time + timeutils.micro2timedelta(1) if end <= start: ended = True yield units