iscp
iSCP 2.0 Client Library for Kotlin
iSCP Client for Kotlin は、iSCP version 2を用いたリアルタイムAPIにアクセスするためのクライアントライブラリです。
Dependencies
Installation
// build.gradle
...
dependencies {
...
// Install iSCP
implementation 'com.aptpod.github:iscp:0.11.0'
}
Content copied to clipboard
// settings.gradle
...
dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
repositories {
google()
mavenCentral()
// Library references for iSCP
maven { url "https://aptpod.github.io/iscp-kt" }
}
}
Content copied to clipboard
Implementation
Connect to intdash API
このサンプルではiscp-ktを使ってintdash APIに接続します。
import android.app.Activity
import com.aptpod.iscp.connection.Connection
import com.aptpod.iscp.connection.ConnectionCallbacks
import com.aptpod.iscp.transport.ITransportConfig
import com.aptpod.iscp.transport.WebSocketConfig
class ExampleActivity : Activity() {
/**
* 接続するintdashサーバー
*/
var targetServer: String = "https://example.com"
/**
* ノードUUID(ここで指定されたノードとして送受信を行います。
*
* intdash APIでノードを生成した際に発行されたノードUUIDを指定します。)
*/
var nodeId = "00000000-0000-0000-0000-000000000000"
/**
* アクセストークン
*
* intdash APIで取得したアクセストークンを指定して下さい。
*/
var accessToken = ""
/**
* コネクション
*/
var connection: Connection? = null
fun connect() {
// 接続情報のセットアップをします。
var urls = targetServer.split("://")
var address: String
var enableTls: Boolean = false
if (urls.size == 1)
{
address = urls[0]
}
else
{
enableTls = urls[0] == "https"
address = urls[1]
}
// WebSocketを使って接続するように指定します。
var transportConfig: ITransportConfig = WebSocketConfig(enableTls = enableTls)
Connection.connectAsync(
address = address,
transportConfig = transportConfig,
tokenSource = {
// アクセス用のトークンを指定します。接続時に発生するイベントにより使用されます。
// ここでは固定のトークンを返していますが随時トークンの更新を行う実装にするとトークンの期限切れを考える必要がなくなります。
accessToken
},
nodeId = nodeId,
completion = { con, ex ->
if (con == null) {
// 接続失敗
return@connectAsync
}
// 接続成功.
connection = con
con.callbacks = this.connectionCallbacks // ConnectionCallbacks
// 以降、startUpstreamやstartDownstreamなどが実行可能になります。
}
)
}
val connectionCallbacks : ConnectionCallbacks
get() = object : ConnectionCallbacks {
override fun onReconnect(connection: Connection) {
// Connectionが再オープンされた際にコールされます。
}
override fun onDisconnect(connection: Connection) {
// Connectionがクローズされた際にコールされます。
}
override fun onFailWithError(connection: Connection, exception: Exception) {
// Connection内部で何らかのエラーが発生した際にコールされます。
}
}
}
Content copied to clipboard
Start Upstream
アップストリームの送信サンプルです。
このサンプルでは、基準時刻のメタデータと、文字列型のデータポイントをiSCPサーバーへ送信しています。
import com.aptpod.iscp.model.BaseTime
import com.aptpod.iscp.model.DataId
import com.aptpod.iscp.model.DataPoint
import com.aptpod.iscp.model.UpstreamChunk
import com.aptpod.iscp.model.UpstreamChunkAck
import com.aptpod.iscp.stream.Upstream
import com.aptpod.iscp.stream.UpstreamCallbacks
import java.time.ZonedDateTime
import java.util.UUID
/**
* 送信するデータを永続化するかどうか
*/
var upstreamPersist: Boolean = false
/**
* オープンしたストリーム一覧
*/
var upstreams: MutableList<Upstream> = mutableListOf()
fun ExampleActivity.startUpstream() {
// セッションIDを払い出します。
var sessionId = UUID.randomUUID().toString().lowercase()
// Upstreamをオープンします。
connection?.openUpstreamAsync(
sessionId = sessionId,
persist = upstreamPersist,
completion = { upstream, ex ->
if (upstream == null) {
// オープン失敗。
return@openUpstreamAsync
}
// オープン成功
upstreams.add(upstream)
// 送信するデータポイントを保存したい場合や、アップストリームのエラーをハンドリングしたい場合はコールバックを設定します。
upstream.callbacks = this.upstreamCallbacks // UpstreamCallbacks
var date = ZonedDateTime.now() // ※マイクロ秒以下をサポートするには修正が必要です。
var baseTime = (date.toEpochSecond() * 1000_000_000) + date.nano // 基準時刻です。
// 基準時刻をiSCPサーバーへ送信します。
connection?.sendBaseTimeAsync(
baseTime = BaseTime(
sessionId = sessionId,
name = "manual",
priority = 1000,
elapsedTime = 0,
baseTime = baseTime),
persist = upstreamPersist,
completion = { sendBaseTimeEx ->
if (sendBaseTimeEx != null) {
// 基準時刻の送信に失敗。
return@sendBaseTimeAsync
}
// 基準時刻の送信に成功。
// 文字列型のデータポイントをiSCPサーバーへ送信します。
var now = ZonedDateTime.now() // ※マイクロ秒以下をサポートするには修正が必要です。
upstream.writeDataPoint(
dataId = DataId(
name = "greeting",
type = "string"),
dataPoint = DataPoint(
elapsedTime = ((now.toEpochSecond() * 1000_000_000) + now.nano) - baseTime, // 基準時刻からの経過時間をデータポイントの経過時間として打刻します。
payload = "hello".toByteArray()
)
)
}
)
}
)
}
val ExampleActivity.upstreamCallbacks : UpstreamCallbacks
get() = object : UpstreamCallbacks {
override fun onGenerateChunk(upstream: Upstream, message: UpstreamChunk) {
// バッファへ書き込んだデータポイントが実際に送信される直前にコールされます。
}
override fun onReceiveAck(upstream: Upstream, message: UpstreamChunkAck) {
// データポイントの送信後に返却されるACKを受信できた場合にコールされます。
}
override fun onFailWithError(upstream: Upstream, error: Exception) {
// 内部でエラーが発生した場合にコールされます。
}
override fun onCloseWithError(upstream: Upstream, error: Exception) {
// 何らかの理由でストリームがクローズした場合にコールされます。
// 再度アップストリームをオープンしたい場合は、 `Connection.reopenUpstream()` を使用することにより、ストリームの設定を引き継いで別のストリームを開くことが可能です。
}
override fun onResume(upstream: Upstream) {
// 自動再接続機能が働き、再接続が行われた場合にコールされます。
}
}
Content copied to clipboard
Start Downstream
アップストリームで送信されたデータをダウンストリームで受信するサンプルです。
このサンプルでは、アップストリーム開始のメタデータ、基準時刻のメタデータ、文字列型のデータポイントを受信しています。
import android.util.Log
import com.aptpod.iscp.model.DataFilter
import com.aptpod.iscp.model.DownstreamChunk
import com.aptpod.iscp.model.DownstreamFilter
import com.aptpod.iscp.model.DownstreamMetadata
import com.aptpod.iscp.stream.Downstream
import com.aptpod.iscp.stream.DownstreamCallbacks
import java.time.Instant
import java.time.ZoneId
import java.time.ZonedDateTime
/**
* 受信したいデータを送信している送信元ノードのUUID
*
* (アップストリームを行っている送信元でConnection.Configで設定したnodeIdを指定してください。)
*/
var targetDownstreamNodeID = "00000000-0000-0000-0000-000000000000"
/**
* オープンしたダウンストリーム一覧
*/
var downstreams: MutableList<Downstream> = mutableListOf()
fun ExampleActivity.startDownstream() {
// ダウンストリームをオープンします。
connection?.openDownstreamAsync(
downstreamFilters = listOf(
DownstreamFilter(
sourceNodeId = targetDownstreamNodeID, // // 送信元ノードのIDを指定します。
dataFilters = listOf(
DataFilter(
name = "#", type = "#") // 受信したいデータを名称と型で指定します。この例では、ワイルドカード `#` を使用して全てのデータを取得します。
)
)
),
completion = { downstream, ex ->
if (downstream == null) {
// オープン失敗。
return@openDownstreamAsync
}
// オープン成功。
downstreams.add(downstream)
// 受信データを取り扱うためにデリゲートを設定します。
downstream.callbacks = this.downstreamCallbacks // DownstreamCallbacks
}
)
}
val ExampleActivity.downstreamCallbacks: DownstreamCallbacks
get() = object : DownstreamCallbacks {
override fun onReceiveChunk(downstream: Downstream, message: DownstreamChunk) {
// データポイントを読み込むことができた際にコールされます。
Log.d(this.javaClass.name, "Received dataPoints sequenceNumber[${message.sequenceNumber}], sessionId[${message.upstreamInfo.sessionId}]")
for (g in message.dataPointGroups) {
for (dp in g.dataPoints) {
Log.d(this.javaClass.name, "Received a dataPoint dataName[${g.dataId.name}], dataType[${g.dataId.type}], payload[${String(dp.payload)}]")
}
}
}
override fun onReceiveMetadata(downstream: Downstream, message: DownstreamMetadata) {
// メタデータを受信した際にコールされます。
Log.d(this.javaClass.name, "Received a metadata sourceNodeId[${message.sourceNodeId}], metadataType:${message.type}")
when (message.type) {
DownstreamMetadata.MetadataType.BASE_TIME -> {
var baseTime = message.baseTime!!
var date = ZonedDateTime.ofInstant(Instant.ofEpochSecond(baseTime.baseTime / 1000_000_000, baseTime.baseTime % 1000_000_000), ZoneId.systemDefault())
Log.d(this.javaClass.name, "Received baseTime[$date], priority[${baseTime.priority}], name[${baseTime.name}]")
}
else -> {}
}
}
override fun onFailWithError(downstream: Downstream, error: Exception) {
// 内部でエラーが発生した場合にコールされます。
}
override fun onCloseWithError(downstream: Downstream, error: Exception) {
// 何らかの理由でストリームがクローズした場合にコールされます。
// 再度ダウンストリームをオープンしたい場合は、 `Connection.reopenDownstream()` を使用することにより、ストリームの設定を引き継いで別のストリームを開くことが可能です。
}
override fun onResume(downstream: Downstream) {
// 自動再接続機能が働き、再接続が行われた場合にコールされます。
}
}
Content copied to clipboard
E2E Call
E2E(エンドツーエンド)コールのサンプルです。
コントローラノードが対象ノードに対して指示を出し、対象ノードは受信完了のリプライを行う簡単なサンプルです。
class E2ECallExampleActivity : Activity() {
/**
* 接続するintdashサーバー
*/
var targetServer: String = "https://example.com"
/**
* コントローラーノードのUUID
*/
var controllerNodeID: String = "00000000-0000-0000-0000-000000000000"
/**
* 対象ノードのUUID
*/
var targetNodeID: String = "11111111-1111-1111-1111-111111111111"
/**
* コントローラーノード用のアクセストークン
*
* intdash APIで取得したアクセストークンを指定して下さい。
*/
var accessTokenForController : String = ""
/**
* 対象ノード用のアクセストークン
*
* intdash APIで取得したアクセストークンを指定して下さい。
*/
var accessTokenForTarget: String = ""
/**
* コントローラーノード用のコネクション
*/
var connectionForController: Connection? = null
/**
* 対象ノード用のコネクション
*/
var connectionForTarget: Connection? = null
}
//region コントローラーノードからメッセージを送信するサンプルです。このサンプルでは文字列メッセージを対象ノードに対して送信し、対象ノードからのリプライを待ちます。
fun E2ECallExampleActivity.connectForController() {
// 接続情報のセットアップをします。
var urls = targetServer.split("://")
var address: String
var enableTls: Boolean = false
if (urls.size == 1)
{
address = urls[0]
}
else
{
enableTls = urls[0] == "https"
address = urls[1]
}
// WebSocketを使って接続するように指定します。
var transportConfig: ITransportConfig = WebSocketConfig(enableTls = enableTls)
Connection.connectAsync(
address = address,
transportConfig = transportConfig,
tokenSource = {
// アクセス用のトークンを指定します。接続時に発生するイベントにより使用されます。
// ここでは固定のトークンを返していますが随時トークンの更新を行う実装にするとトークンの期限切れを考える必要がなくなります。
accessTokenForController
},
nodeId = controllerNodeID,
completion = { con, ex ->
if (con == null) {
// 接続失敗
return@connectAsync
}
// 接続成功.
connectionForController = con
}
)
}
fun E2ECallExampleActivity.sendCall() {
// コールを送信し、リプライコールを受信するとコールバックが発生します。
connectionForController?.sendCallAndWaitReplyCallAsync(
upstreamCall = UpstreamCall(
destinationNodeId = targetNodeID,
name = "greeting",
type = "string",
payload = "hello".toByteArray()
),
completion = { downstreamReplyCall, ex ->
if (ex != null) {
// コールの送信もしくはリプライの受信に失敗。
return@sendCallAndWaitReplyCallAsync
}
// コールの送信及びリプライの受信に成功。
}
)
}
//endregion
//region コントローラーノードからのコールを受け付け、すぐにリプライするサンプルです。
fun E2ECallExampleActivity.connectForTarget() {
// 接続情報のセットアップをします。
var urls = targetServer.split("://")
var address: String
var enableTls: Boolean = false
if (urls.size == 1)
{
address = urls[0]
}
else
{
enableTls = urls[0] == "https"
address = urls[1]
}
// WebSocketを使って接続するように指定します。
var transportConfig: ITransportConfig = WebSocketConfig(enableTls = enableTls)
Connection.connectAsync(
address = address,
transportConfig = transportConfig,
tokenSource = {
// アクセス用のトークンを指定します。接続時に発生するイベントにより使用されます。
// ここでは固定のトークンを返していますが随時トークンの更新を行う実装にするとトークンの期限切れを考える必要がなくなります。
accessTokenForTarget
},
nodeId = targetNodeID,
completion = { con, ex ->
if (con == null) {
// 接続失敗
return@connectAsync
}
// 接続成功.
this.connectionForTarget = con
// DownstreamCallの受信を監視するためにコールバックを設定します。
con.e2ECallCallbacks = this.e2ECallCallbacks // ConnectionE2ECallCallbacks
}
)
}
//endregion
val E2ECallExampleActivity.e2ECallCallbacks: ConnectionE2ECallCallbacks
get() = object : ConnectionE2ECallCallbacks {
override fun onReceiveCall(connection: Connection, downstreamCall: DownstreamCall) {
// DownstreamCallを受信した際にコールされます。
// このサンプルではDownstreamCallを受信したらすぐにリプライコールを送信します。
connection.sendReplyCallAsync(
upstreamReplyCall = UpstreamReplyCall(
requestCallId = downstreamCall.callId,
destinationNodeId = downstreamCall.sourceNodeId,
name = "reply_greeting",
type = "string",
payload = "world".toByteArray()
),
completion = { ex ->
if (ex != null) {
// リプライコールの送信に失敗。
return@sendReplyCallAsync
}
// リプライコールの送信に成功。
}
)
}
override fun onReceiveReplyCall(connection: Connection, downstreamReplyCall: DownstreamReplyCall) {
// DownstreamReplyCallを受信した際にコールされます。
}
}
Content copied to clipboard