Tutorial¶
This tutorial describes the basic procedure for using the intdash SDK for Python. You will retrieve measurements of a particular edge and get its time series data.
Retrieve an edge¶
After creating the client, retrieve the edge you want to reference. Use Edges - Access object to edges to access edge resources.
Use the list() method to refer to the list of edges registered on the server. If you assign a character string to the name
parameter, you will get a list of edges whose names start with that string.
import intdash
lst = edges.list(name="admin", limit=10)
for edge in lst:
print(edge, "\n")
# uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# name: admin
# nickname: administrator
# description: this is admin user
# type: user
# disabled: False
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
# last_login_at: 2020-01-01 00:00:00.000000+00:00
If there are many edges, specify the iterator
option to generate an iterator. You can get a list of edges for each iteration.
it = edges.list(limit=10, iterator=True)
for lst in it:
for edge in lst:
print(edge, "\n")
Retrieve measurements associated with the edge¶
Retrieve the measurements associated with the edge. Use Measurements - Access object to measurements to access the measurement resources.
Get the access object to the measurements.
measurements = client.measurements
Use list() to retrieve a list of measurements. By asigning the UUID of the edge retrieved earlier to the parameter edge_uuid
, a list of measurements is retrieved.
from intdash import timeutils
lst = measurements.list(
edge_uuid=edge.uuid,
start=timeutils.str2timestamp('2020-01-01 00:00:00.000000+00:00'),
end=timeutils.str2timestamp('2020-01-02 00:00:00.000000+00:00'),
limit=10
)
for m in lst:
print(m, "\n")
# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
If you already know the UUID of the measurement, you can also retrieve the measurement by executing get() with uuid
.
measurement = measurements.get(
uuid="1cef9b31-a356-4415-8b71-84273bbeb1fd",
)
print(measurement)
# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
Retrieving time series data¶
Retrieve the time series data associated with the measurement. Use one of the following two to access time series data resources. Refer to each document for the detailed format because the response formats are different.
This tutorial describes an example using client.data_points.
data_points = client.data_points
Use list() to retrieve time series data associated with the measurement. If you specify measurement_uuid
, you can retrieve the list of time series data associated with that measurement.
import intdash
from intdash import timeutils
it = data_points.list(
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
id_queries = [
intdash.IdQuery(
data_type=intdash.DataType.string
)
],
limit = 1000,
iterator = True
)
for lst in it:
for dp in lst:
print(dp, "\n")
# time: 2020-01-01T05:08:25.676942+00:00
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'
Warning
When using the data point format, the time series data payload is stored in data_payload
in binary format. The binary format depends on the data type. The class that represents each data type (data type class) is defined in the time series data object. See Time series data objects for more information. To convert the binary stored in the payload into the corresponding class, use the static method from_payload() of each class.
import intdash
dps = data_points.list(
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
end = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
id_queries = [
intdash.IdQuery(
data_type=intdash.DataType.string
)
],
limit = 1000,
iterator = False
)
intdash.data.String.from_payload(dps[0].data_payload)
# data_type: str
# id: str_test
# value: test
If you want to retrieve time series data associated with one edge across multiple measurements, specify edge_name
or edge_uuid
.
from intdash import timeutils
it = data_points.list(
edge_name = "admin",
start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
id_queries = [
intdash.IdQuery(
data_type=intdash.DataType.string
)
]
limit = 1000,
iterator = True
)
for dpslst in it:
for dps in dpslst:
print(dps, "\n")
# time: 2020-01-01T05:08:25.676942+00:00
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'
# time: 2020-01-01T06:10:25.879654+00:00
# measurement_uuid: 10bf1795-0ae4-4e5e-a88b-b2fa0822a393
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test2'
Save time series data to the server¶
When saving time series data, use either client.data_points or client.units that accesses the time series data resource. The registration procedure varies depending on the access object used. If you need more detailed information, refer to the documentation of each access object.
client.data_points
Create the data as an array of intdash.DataPoint.
data_payload
must be binary data. The format of binary data differs depending on the data type. See Time series data objects if you don’t know the binary format. Use the to_payload() method to convert an object of data type class to binary format.
import pandas as pd
import intdash
dps = [
intdash.DataPoint(
elapsed_time = pd.Timedelta(seconds=0),
data_type = intdash.DataType.string,
channel = 1,
data_payload = intdash.data.String(data_id='str_test', value='test1').to_payload(),
),
intdash.DataPoint(
elapsed_time = pd.Timedelta(seconds=1),
data_type = intdash.DataType.string,
channel = 1,
data_payload = intdash.data.String(data_id='str_test', value='test2').to_payload(),
),
intdash.DataPoint(
elapsed_time = pd.Timedelta(seconds=2),
data_type = intdash.DataType.string,
channel = 1,
data_payload = intdash.data.String(data_id='str_test', value='test3').to_payload(),
),
...
]
To register the list of created data, execute datapoints.store().
data_points = client.data_points
data_points.store(
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
data_points = dps,
)
Send and receive real-time data¶
Use the client to connect to the WebSocket endpoint.
wsconn = client.connect_websocket()
Use open_upstreams() method to send real-time data. At this point, set two arguments: intdash.UpstreamSpec object that specifies the specification of the stream and an iterator that generates data.
import pandas as pd
from time import sleep
import intdash
from intdash import data
upstream_specs = [
intdash.UpstreamSpec(
src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
dst_edge_uuids = None,
resend = False,
store = True,
measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
),
]
def generate_units():
basetime = pd.Timestamp.utcnow()
yield intdash.Unit(
elapsed_time = pd.Timestamp.utcnow() - basetime,
channel = 1,
data = data.Basetime(type = intdash.BasetimeType.ntp, basetime = basetime)
)
while True:
sleep(0.1)
yield intdash.Unit(
elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
channel = 1,
data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
)
yield intdash.Unit(
elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
channel = 2,
data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
)
iterators = [generate_units()]
wsconn.open_upstreams(
specs = upstream_specs,
iterators = iterators,
)
To receive real-time data, use open_downstreams() method. At this point, set two arguments: intdash.DownstreamSpec object that specifies the specification of the stream and a callback method that is executed when the data is received.
import intdash
downstream_specs = [
intdash.DownstreamSpec(
src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
filters = [
intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
],
),
]
def callback(unit):
print(unit)
callbacks = [callback]
wsconn.open_downstreams(
specs = downstream_specs,
callbacks = callbacks,
)
When the process is finished, close the connection and finish the measurement.
wsconn.close()
client.measurements.update(
uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
ended = True,
)
The above process should be executed using an event loop as appropriate.
from tornado import gen, ioloop
@gen.coroutine
def main():
try:
wsconn = client.connect_websocket()
wsconn.open_upstreams(
specs = upstream_specs,
iterators = iterators,
)
wsconn.open_downstreams(
specs = downstream_specs,
callbacks = callbacks,
)
yield gen.sleep(5)
finally:
wsconn.close()
measurements.update(
uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
ended = True,
)
ioloop.IOLoop.current().run_sync(lambda: main())
Send and receive real-time data (Part2)¶
This section describes how to use the newly added method connect_iscp() to connect to the intdash real-time API. This is a new API for real-time connections that supports writing in async/await syntax.
connect_iscp() is passed on_close, a callback to be called when the connection is closed, as an input argument.
async def on_close():
print("connection closed!")
conn = await client.connect_iscp(on_close=on_close)
Use open_upstreams() method to send real-time data. At this point, set two arguments: intdash.UpstreamSpec object that specifies the specification of the stream and the on_ack callback to be called when the ACK is returned.
import pandas as pd
import asyncio
import intdash
from intdash import data
basetime = pd.Timestamp.utcnow()
meas = client.measurements.create(
edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
basetime=basetime,
basetime_type="ntp",
)
spec = intdash.UpstreamSpec(
src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
dst_edge_uuids=None,
resend=False,
store=True,
measurement_uuid=meas.uuid,
)
async def on_ack(serial, result):
print(f"ACK's serial number is {serial}. Result code is {result}.")
upstream = await conn.open_upstream(spec=spec, on_ack=on_ack)
d = intdash.data.Basetime(
type=intdash.BasetimeType.ntp,
basetime=basetime,
)
await upstream.write(
datapoint=intdash.DataPoint(
elapsed_time=pd.Timestamp.utcnow() - basetime,
channel=1,
data_type=d.data_type,
data_payload=d.to_payload(),
)
)
while True:
await asyncio.sleep(0.1)
try:
d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07')
await upstream.write(datapoint=intdash.DataPoint(
elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
channel = 1,
data_type=d.data_type,
data_payload=d.to_payload(),
))
d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07')
await upstream.write(datapoint=intdash.DataPoint(
elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
channel = 2,
data_type=d.data_type,
data_payload=d.to_payload(),
))
except intdash.ISCPConnClosedException:
break
When the transmission is finished, close the stream. At this time, terminate the measurement if necessary.
await upstream.close()
client.measurements.update(
uuid = meas.uuid,
ended=True,
)
To receive real-time data, use open_downstreams() method. At this point, set two arguments: intdash.DownstreamSpec object that specifies the specification of the stream and on_msg, the callback method that is executed when the data is received.
import intdash
spec = intdash.DownstreamSpec(
src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
filters=[
intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
],
)
async def on_msg(datapoint):
print(datapoint)
downstream = await conn.open_downstream(spec=spec, on_msg=on_msg)
When finished receiving, close the stream.
await downstream.close()
When the process is finished, close the connection.
await conn.close()
Since the new method is implemented using a native coroutine, the above process should be executed using an event loop as appropriate.
async def coroutine():
conn = await client.connect_iscp(on_close=on_close)
dspec = intdash.DownstreamSpec(...)
downstream = await conn.open_downstream(spec=dspec, on_msg=on_msg)
basetime = pd.Timestamp.utcnow()
meas = client.measurements.create(...)
uspec = intdash.UpstreamSpec(...)
upstream = await conn.open_upstream(spec=uspec, on_ack=on_ack)
d = intdash.data.Basetime(...)
await upstream.write(...)
async def deadline(upstream):
await asyncio.sleep(10)
await upstream.close()
asyncio.ensure_future(deadline(upstream))
while True:
try:
d = data.CAN(...)
await upstream.write(...)
d = data.CAN(...)
await upstream.write(...)
except intdash.ISCPConnClosedException:
break
client.measurements.update(...)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
loop.close()
You can also use the async with syntax to start a connection or open a stream.
async def coroutine():
async with await client.connect_iscp(on_close=on_close) as conn:
dspec = intdash.DownstreamSpec(...)
async with await conn.open_downstream(spec=dspec, on_msg=on_msg) as downstream:
basetime = pd.Timestamp.utcnow()
meas = client.measurements.create(...)
uspec = intdash.UpstreamSpec(...)
async with await conn.open_upstream(spec=uspec, on_ack=on_ack) as upstream:
d = intdash.data.Basetime(...)
await upstream.write(...)
async def deadline(upstream):
await asyncio.sleep(10)
await upstream.close()
asyncio.ensure_future(deadline(upstream))
while True:
try:
d = data.CAN(...)
await upstream.write(...)
d = data.CAN(...)
await upstream.write(...)
except intdash.ISCPConnClosedException:
break
client.measurements.update(...)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
loop.close()