Tutorial

This tutorial describes the basic procedure for using the intdash SDK for Python. You will retrieve measurements of a particular edge and get its time series data.

Retrieve an edge

After creating the client, retrieve the edge you want to reference. Use Edges - Access object to edges to access edge resources.

Use the list() method to refer to the list of edges registered on the server. If you assign a character string to the name parameter, you will get a list of edges whose names start with that string.

import intdash

lst = edges.list(name="admin", limit=10)
for edge in lst:
   print(edge, "\n")

# uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# name: admin
# nickname: administrator
# description: this is admin user
# type: user
# disabled: False
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00
# last_login_at: 2020-01-01 00:00:00.000000+00:00

If there are many edges, specify the iterator option to generate an iterator. You can get a list of edges for each iteration.

it = edges.list(limit=10, iterator=True)
for lst in it:
    for edge in lst:
        print(edge, "\n")

Retrieve measurements associated with the edge

Retrieve the measurements associated with the edge. Use Measurements - Access object to measurements to access the measurement resources.

Get the access object to the measurements.

measurements = client.measurements

Use list() to retrieve a list of measurements. By asigning the UUID of the edge retrieved earlier to the parameter edge_uuid, a list of measurements is retrieved.

from intdash import timeutils

lst = measurements.list(
    edge_uuid=edge.uuid,
    start=timeutils.str2timestamp('2020-01-01 00:00:00.000000+00:00'),
    end=timeutils.str2timestamp('2020-01-02 00:00:00.000000+00:00')
    limit=10
  )

for m in lst:
    print(m, "\n")

# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00

If you already know the UUID of the measurement, you can also retrieve the measurement by executing get() with uuid.

measurement = measurements.get(
    uuid="1cef9b31-a356-4415-8b71-84273bbeb1fd",
)
print(measurement)

# uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# name: measurement
# description: this is updated description
# edge_uuid: fb7bd20c-a455-4736-b554-477fbb06bc2e
# duration: 0:00:00
# ended: False
# basetime: 1970-01-01 00:00:00+00:00
# basetime_type: undefined
# created_at: 2020-01-01 00:00:00.000000+00:00
# updated_at: 2020-01-01 00:00:00.000000+00:00

Retrieving time series data

Retrieve the time series data associated with the measurement. Use one of the following two to access time series data resources. Refer to each document for the detailed format because the response formats are different.

This tutorial describes an example using client.data_points.

data_points = client.data_points

Use list() to retrieve time series data associated with the measurement. If you specify measurement_uuid, you can retrieve the list of time series data associated with that measurement.

import intdash
from intdash import timeutils

it = data_points.list(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = True
)

for lst in it:
    for dp in lst:
        print(dp, "\n")

# time: 2020-01-01T05:08:25.676942+00:00
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'

Warning

When using the data point format, the time series data payload is stored in data_payload in binary format. The binary format depends on the data type. The class that represents each data type (data type class) is defined in the time series data object. See Time series data objects for more information. To convert the binary stored in the payload into the corresponding class, use the static method from_payload() of each class.

import intdash

dps = data_points.list(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = False
)

intdash.data.String.from_payload(dps[0].data_payload)
#  data_type: str
#  id: str_test
#  value: test

If you want to retrieve time series data associated with one edge across multiple measurements, specify edge_name or edge_uuid.

from intdash import timeutils

it = data_points.list(
    edge_name = "admin",
    start = timeutils.str2timestamp("2020-01-01T00:00:00.000000Z"),
    end = timeutils.str2timestamp("2020-01-02T00:00:00.000000Z"),
    id_queries = [
      intdash.IdQuery(
        data_type=intdash.DataType.string
      )
    ]
    limit = 1000,
    iterator = True
)

for dpslst in it:
    for dps in dpslst:
        print(dps, "\n")

# time: 2020-01-01T05:08:25.676942+00:00
# measurement_uuid: 1cef9b31-a356-4415-8b71-84273bbeb1fd
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test'

# time: 2020-01-01T06:10:25.879654+00:00
# measurement_uuid: 10bf1795-0ae4-4e5e-a88b-b2fa0822a393
# data_type: 10
# channel: 1
# data_id: str_test
# data_payload: b'\x08test2'

Save time series data to the server

When saving time series data, use either client.data_points or client.units that accesses the time series data resource. The registration procedure varies depending on the access object used. If you need more detailed information, refer to the documentation of each access object.

  • client.data_points

    • Create the data as an array of intdash.DataPoint.

    • data_payload must be binary data. The format of binary data differs depending on the data type. See Time series data objects if you don’t know the binary format. Use the to_payload() method to convert an object of data type class to binary format.

import pandas as pd
import intdash

dps = [
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=0),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test1').to_payload(),
  ),
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=1),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test2').to_payload(),
  ),
  intdash.DataPoint(
    elapsed_time = pd.Timedelta(seconds=2),
    data_type=intdash.DataType.string,
    channel = 1,
    data_payload = intdash.data.String(data_id='str_test',  value='test3').to_payload(),
  ),
  ...
]

To register the list of created data, execute datapoints.store().

data_points = client.data_points

data_points.store(
    measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    data_points = dps,
)

Send and receive real-time data

Use the client to connect to the WebSocket endpoint.

wsconn = client.connect_websocket()

Use open_upstreams() method to send real-time data. At this point, set two arguments: intdash.UpstreamSpec object that specifies the specification of the stream and an iterator that generates data.

import pandas as pd
from time import sleep

import intdash
from intdash import data

specs = [
    intdash.UpstreamSpec(
        src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
        dst_edge_uuids = None,
        resend = False,
        store = True,
        measurement_uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    ),
]

def gen():
    basetime = pd.Timestamp.utcnow()

    yield intdash.Unit(
      elapsed_time = pd.Timestamp.utcnow() - basetime,
      channel = 1,
      data = data.Basetime(type = intdash.BasetimeType.ntp, basetime = basetime)
    )

    while True:
        sleep(0.1)

        yield intdash.Unit(
          elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
          channel = 1,
          data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
        )
        yield intdash.Unit(
          elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
          channel = 2,
          data = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07'),
        )

iterators = [gen()]

wsconn.open_upstreams(
    specs = specs,
    iterators = iterators,
)

To receive real-time data, use open_downstreams() method. At this point, set two arguments: intdash.DownstreamSpec object that specifies the specification of the stream and a callback method that is executed when the data is received.

import intdash

specs = [
    intdash.DownstreamSpec(
        src_edge_uuid = "fb7bd20c-a455-4736-b554-477fbb06bc2e",
        filters = [
            intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
            intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
        ],
    ),
]

def callback(unit):
    print(unit)

callbacks = [callback]

wsconn.open_downstreams(
    specs = specs,
    callbacks = callbacks,
)

When the process is finished, close the connection and finish the measurement.

wsconn.close()

measurements.update(
    uuid = "1cef9b31-a356-4415-8b71-84273bbeb1fd",
    ended=True,
)

Send and receive real-time data (Part2)

This section describes how to use the newly added method connect_iscp() to connect to the intdash real-time API. This is a new API for real-time connections that supports writing in async/await syntax.

connect_iscp() is passed on_close, a callback to be called when the connection is closed, as an input argument.

async def on_close():
    print("connection closed!")

conn = await client.connect_iscp(on_close=on_close)

Use open_upstreams() method to send real-time data. At this point, set two arguments: intdash.UpstreamSpec object that specifies the specification of the stream and the on_ack callback to be called when the ACK is returned.

import pandas as pd
import asyncio

import intdash
from intdash import data

basetime = pd.Timestamp.utcnow()
meas = client.measurements.create(
    edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
    basetime=basetime,
    basetime_type="ntp",
)

spec = intdash.UpstreamSpec(
    src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
    dst_edge_uuids=None,
    resend=False,
    store=True,
    measurement_uuid=meas.uuid,
)

async def on_ack(serial, result):
    print(f"ACK's serial number is {serial}. Result code is {result}.")

upstream = await conn.open_upstream(spec=spec, on_ack=on_ack)

d = intdash.data.Basetime(
    type=intdash.BasetimeType.ntp,
    basetime=basetime,
)
await upstream.write(
    datapoint=intdash.DataPoint(
        elapsed_time=pd.Timestamp.utcnow() - basetime,
        channel=1,
        data_type=d.data_type,
        data_payload=d.to_payload(),
    )
)

while True:
    await asyncio.sleep(0.1)

    try:
        d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07')
        await upstream.write(datapoint=intdash.DataPoint(
            elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
            channel = 1,
            data_type=d.data_type,
            data_payload=d.to_payload(),
        ))

        d = data.CAN(decimal_id = 1, data = b'\x00\x01\x02\x03\x04\x05\x06\x07')
        await upstream.write(datapoint=intdash.DataPoint(
            elapsed_time = pd.Timedelta(pd.Timestamp.utcnow()-basetime),
            channel = 2,
            data_type=d.data_type,
            data_payload=d.to_payload(),
        ))

    except intdash.ISCPConnClosedException:
        break

When the transmission is finished, close the stream. At this time, terminate the measurement if necessary.

await upstream.close()
client.measurements.update(
    uuid = meas.uuid,
    ended=True,
)

To receive real-time data, use open_downstreams() method. At this point, set two arguments: intdash.DownstreamSpec object that specifies the specification of the stream and on_msg, the callback method that is executed when the data is received.

import intdash

spec = intdash.DownstreamSpec(
    src_edge_uuid="fb7bd20c-a455-4736-b554-477fbb06bc2e",
    filters=[
        intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=1),
        intdash.DataFilter(data_type=intdash.DataType.can, data_id="00000001", channel=2),
    ],
)

async def on_msg(datapoint):
    print(datapoint)

downstream = await conn.open_downstream(spec=spec, on_msg=on_msg)

When finished receiving, close the stream.

await downstream.close()

When the process is finished, close the connection.

await conn.close()

Since the new method is implemented using a native coroutine, the above process should be executed using an event loop as appropriate.

async def coroutine():
    conn = await client.connect_iscp(on_close=on_close)

    dspec = intdash.DownstreamSpec(...)
    downstream = await conn.open_downstream(spec=dspec, on_msg=on_msg)

    basetime = pd.Timestamp.utcnow()
    meas = client.measurements.create(...)

    uspec = intdash.UpstreamSpec(...)
    upstream = await conn.open_upstream(spec=uspec, on_ack=on_ack)

    d = intdash.data.Basetime(...)
    await upstream.write(...)

    async def deadline(upstream):
        await asyncio.sleep(10)
        await upstream.close()

    asyncio.ensure_future(deadline(upstream))

    while True:
        try:
            d = data.CAN(...)
            await upstream.write(...)

            d = data.CAN(...)
            await upstream.write(...)

        except intdash.ISCPConnClosedException:
            break

    client.measurements.update(...)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
loop.close()

You can also use the async with syntax to start a connection or open a stream.

async def coroutine():
    async with await client.connect_iscp(on_close=on_close) as conn:

        dspec = intdash.DownstreamSpec(...)
        async with await conn.open_downstream(spec=dspec, on_msg=on_msg) as downstream:

            basetime = pd.Timestamp.utcnow()
            meas = client.measurements.create(...)

            uspec = intdash.UpstreamSpec(...)
            async with await conn.open_upstream(spec=uspec, on_ack=on_ack) as upstream:
                d = intdash.data.Basetime(...)
                await upstream.write(...)

                async def deadline(upstream):
                    await asyncio.sleep(10)
                    await upstream.close()

                asyncio.ensure_future(deadline(upstream))

                while True:
                    try:
                        d = data.CAN(...)
                        await upstream.write(...)

                        d = data.CAN(...)
                        await upstream.write(...)

                    except intdash.ISCPConnClosedException:
                        break

            client.measurements.update(...)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
loop.close()