Rustによる独自エレメント開発

Rustで独自エレメントを開発する手順は、以下のとおりです。

Rustで開発したエレメントは プラグインとして使用する ことも、 デバイスコネクターの実行ファイルに含める ことも可能です。

どちらの場合も、以下の エレメントをプラグインにする/実行ファイルに含める の前までの手順は同じです。

準備

エレメントの開発のために、 cargo-generate用のテンプレート が用意されています。

注釈

cargo-generate は、既存のRustプロジェクトをテンプレートにして新しいプロジェクトを作成するためのツールです。 インストールしていない場合、次のコマンドでインストールしてください。

cargo install cargo-generate

次のコマンドでデバイスコネクターの新しいプロジェクトを作成します。 新しいプロジェクトの名前の入力を求められるので、任意の名前を入力してください。

cargo generate --git https://github.com/aptpod/device-connector-template.git

これで新しいプロジェクトが作成されます。

このプロジェクトには、以下の2つのエレメントのソースコードがあらかじめ含まれています。

  • src/hello.rs (hello-src)

  • src/hexdump.rs (hexdump-sink)

ElementBuildableトレイトの利用

Rustでは、型に何らかの属性や実装を付与するためにトレイトと呼ばれる機能が提供されています。 独自エレメントを定義するには、そのエレメントにおいて ElementBuildable トレイトを実装する必要があります。

ElementBuildable トレイトの定義は以下のとおりです。

pub trait ElementBuildable: Sized + Send + 'static {
    type Config: DeserializeOwned;
    const NAME: &'static str;
    const RECV_PORTS: Port = 0;
    const SEND_PORTS: Port = 0;
    fn acceptable_msg_types() -> Vec<Vec<MsgType>> {
        Vec::new()
    }
    fn new(conf: Self::Config) -> Result<Self, Error>;
    fn next(&mut self, pipeline: &mut Pipeline, receiver: &mut MsgReceiver) -> ElementResult;
    fn finalizer(&mut self) -> Result<Option<ElementFinalizer>, crate::error::Error>;
}
Config

このエレメントを構築するのに必要な設定項目を指定します。 デシリアライズ可能なデータ型が指定できますが、一般的には #[derive(Debug, Deserialize)] を指定した構造体にします。 設定を受け取らない場合、 device_connector::EmptyElementConf を指定します。

NAME

エレメントの名前を指定します。 他のエレメントとの重複は許されません。他のエレメントの名前と重複していると、実行時にエラーになります。

RECV_PORTS

メッセージを受け取るポートの数です。 srcエレメントの場合は0を指定します。filterまたはsinkエレメントの場合は1以上を指定します。デフォルトは0です。

SEND_PORTS

メッセージを送出するポートの数です。 sinkエレメントの場合は0を指定します。srcまたはfilterエレメントの場合は1以上を指定します。デフォルトは0です。

acceptable_msg_types()

受け取ることのできるメッセージの型情報の配列を返す関数です。 メッセージを受け取らないsrcエレメントの場合は実装する必要はありません。

new()

Config で指定した設定情報を受け取り、エレメントを実際に構築して返すメソッドです。

next()

データを受け取るための pipeline を受け取り、エレメントの実行結果を返します。 next() は、 MsgBuf に有効なデータを書き出すか終了するまで、処理を返さないように実装します。

finalizer()

プロセス終了時に実行されるクロージャを返すメソッドです。 このクロージャではエレメント固有の終了処理を定義します。 このメソッドを実装しない場合、終了処理を行わない挙動(デフォルト)となります。

HelloSrcElementを作る

実例として、device-connector-templateに含まれているサンプルのエレメント HelloSrcElement を見ていきます。

このエレメントは、一定時間ごとにテキストを送信します。

use device_connector::{error::Error, ElementBuildable, ElementResult, MsgType, Pipeline, Port};
use serde_derive::Deserialize;
use std::io::Write;
use std::thread::sleep;
use std::time::Duration;

// ElementBuildableを実装するための対象となる型
pub struct HelloSrcElement {
    conf: HelloSrcElementConf,
}

// HelloSrcElementが受け取る設定の定義
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct HelloSrcElementConf {
    text: String,
}

// ElementBuildableの実装
impl ElementBuildable for HelloSrcElement {
    type Config = HelloSrcElementConf;

    const NAME: &'static str = "hello-src";

    const SEND_PORTS: Port = 1;

    fn new(conf: Self::Config) -> Result<Self, Error> {
        Ok(HelloSrcElement { conf })
    }

    fn next(&mut self, pipeline: &mut Pipeline, _receiver: &mut MsgReceiver) -> ElementResult {
        pipeline.check_send_msg_type(0, || MsgType::from_mime("text/plain").unwrap())?;

        sleep(Duration::from_millis(100));

        let mut buf = pipeline.msg_buf(0);
        buf.write_all(self.conf.text.as_bytes())?;

        Ok(ElementValue::MsgBuf)
    }
}

まず、本体である HelloSrcElement を定義します。

pub struct HelloSrcElement {
    conf: HelloSrcElementConf,
}

エレメントを実装する場合、実行に必要なデータを構造体のメンバとして持ちます。 ここでは、送りたいテキストを含む HelloSrcElementConf をメンバとして持ちます。

HelloSrcElementConf は、エレメントを構築・実行するための設定を持ちます。

#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct HelloSrcElementConf {
    text: String,
}

構造体の宣言に付随する各属性は、パイプライン設定ファイルからのデシリアライズを可能にするためのものです。 基本的に、この例の通りの属性を付与すれば問題ありません。

エレメントを構築可能にするために、 ElementBuildable を実装します。

impl ElementBuildable for HelloSrcElement {
    type Config = HelloSrcElementConf;

    const NAME: &'static str = "hello-src";

    const SEND_PORTS: Port = 1;

    fn new(conf: Self::Config) -> Result<Self, Error> {
        Ok(HelloSrcElement { conf })
    }

    ..
}

Config にはさきほど定義した HelloSrcElementConf を、 NAME にはエレメントの名前である "hello-src" を指定します。 この名前はパイプライン設定ファイルにおいてエレメントを指定する時に使われるもので、他のエレメントと重複しないものを選ぶ必要があります。

このエレメントは1つの送信ポートを持つので、 SEND_PORTS に1を指定します。

new() には、受け取った設定(HelloSrcElementConf)を元に、 HelloSrcElement を生成するためのコードを記述します。

new()HelloSrcElementConf 型の値 conf (パイプライン設定ファイルからパースされた設定)を受け取り、その設定に基づいて HelloSrcElement を作成して返却します。

filterエレメントまたはsinkエレメントを実装する場合、どの型のメッセージを受け取れるかを示すために acceptable_msg_types() を実装しなければなりません。 実装しない場合、どのようなデータも受け取れないエレメントとなります。

最後に、 HelloSrcElement がメッセージを生成するためのメソッド next を定義します。

impl ElementBuildable for HelloSrcElement {
    ..

    fn next(&mut self, pipeline: &mut Pipeline, _receiver: &mut MsgReceiver) -> ElementResult {
        pipeline.check_send_msg_type(0, || MsgType::from_mime("text/plain").unwrap())?;

        sleep(Duration::from_millis(100));

        let mut buf = pipeline.msg_buf(0);
        buf.write_all(self.conf.text.as_bytes())?;

        Ok(ElementValue::MsgBuf)
    }
}

srcエレメントとして実装するために、 next() メソッドの内部を記述していきます。

pipeline.check_send_msg_type(0, || MsgType::from_mime("text/plain").unwrap())?;

送出メッセージの型情報を pipeline.check_send_msg_type に渡し、受信側で受け取りができるのかを判定します。このエレメントはテキストを送出するので、 "text/plain" というMIME情報を渡します。

sleep(Duration::from_millis(100));

データ量を適当な量に制限するため、ここでは100msの間sleepします。

let mut buf = pipeline.msg_buf(0);
buf.write_all(self.conf.text.as_bytes())?;

Ok(ElementValue::MsgBuf)

設定で与えられたテキストをメッセージにするために、まずは Pipeline::msg_buf() を呼び出して、 MsgBuf 型のバッファを用意します。

Pipeline::msg_buf() では、メッセージの送信に用いるポート番号を引数にします(大抵の場合は0を指定します)。

MsgBufstd::io::Write を実装しているため、 write_all() を用いてバッファにテキストを書き込みます。

関数の最後の Ok(ElementValue::MsgBuf) は、 MsgBuf に書き込んだデータが送信先のタスクに送信されるよう指定するためのものです。

返却されたメッセージは、エレメントが独立したスレッドで動作している場合、関連付けられたタスクへチャンネルを用いて送信されます。 エレメントが同期的に呼び出されている場合、呼び出し元のタスクの実行へ戻ります。

エレメントをプラグインにする/実行ファイルに含める

Rustで実装したエレメントを動作可能にする方法は、プラグインにする場合と、デバイスコネクターの実行ファイルに含める場合とで異なります。

プラグインにする場合

実装したエレメントをプラグインにする場合、 define_dc_load!() マクロを使用します。例として、テンプレートの src/lib.rs を見てみます。

mod hello;

// Implement plugin interface.
device_connector::define_dc_load!(hello::HelloSrcElement);

ここでは、さきほど実装した HelloSrcElement をプラグインに登録しています。 define_dc_load!() マクロには、複数のエレメントを渡すことも可能です。プラグインを開発したい場合、このような記述を lib.rs に記述します。

デバイスコネクターの実行ファイルに含める場合

Rustで実装したエレメントをデバイスコネクターの実行ファイルに含めるには、 ElementBank に登録します。

ElementBuildable を実装した型を append_from_buildable() により登録し、その後に Runner を構築することで、エレメントが利用できるようになります。

例として、テンプレートの src/main.rs は以下のようになっています。

fn main() -> Result<()> {
    // ログシステムの初期化
    env_logger::init();

    // パイプライン設定ファイルの読み込み
    let opts: Opts = Opts::parse();
    let conf = Conf::read_from_file(&opts.config)?;

    // ElementBankの作成
    let mut bank = ElementBank::new();

    // プラグインのロード
    let loaded_plugin = LoadedPlugin::from_conf(&conf.plugin)?;

    // 実装したHelloSrcElementの登録
    bank.append_from_buildable::<hello::HelloSrcElement>();

    // runnerの作成
    let mut runner_builder = RunnerBuilder::new(&bank, &loaded_plugin, &conf.runner);
    runner_builder.append_from_conf(&conf.tasks)?;
    let runner = runner_builder.build()?;

    // 起動
    runner.run()?;

    Ok(())
}

これは通常のRustの main 関数であるため、ここに追記することでデバイスコネクターが動作する前後の挙動をカスタマイズすることも可能です。

ビルドする

テンプレートを使用した場合、実装した拡張は以下のコマンドでビルドできます。

cargo build --release

これにより、 target/release に、以下の2種類の成果物が生成されます。いずれかを使用してください。

  • libdc_xxxxxx.so : HelloSrcElement を含むプラグインファイル(共有ライブラリ)。 使用するには、パイプライン設定ファイルの plugin.plugin_files で、プラグインファイル名を指定してください()。

  • xxxxxx-run : HelloSrcElement を含むデバイスコネクターの実行ファイル。このデバイスコネクターを実行すれば、プラグインなしで HelloSrcElement を使用できます。 plugin.plugin_files の設定は必要ありません。

(補足)sinkエレメントの例

上の説明では例としてsrcエレメントを作成しましたが、sinkエレメントを作成する場合、例えば以下のようになります。

このエレメント(hexdump-sink)では、受け取った各メッセージを標準エラーに出力します。

注釈

このエレメント(hexdump-sink)は device-connector-template に含まれます。

use device_connector::EmptyElementConf;
use device_connector::{
    ElementBuildable, ElementResult, Error, MsgReceiver, MsgType, Pipeline, Port,
};

pub struct HexdumpSinkElement {}

impl ElementBuildable for HexdumpSinkElement {
    type Config = EmptyElementConf;

    const NAME: &'static str = "hexdump-sink";
    const RECV_PORTS: Port = 1;
    const SEND_PORTS: Port = 0;

    fn acceptable_msg_types() -> Vec<Vec<MsgType>> {
        vec![vec![MsgType::any()]]
    }

    fn new(_conf: Self::Config) -> Result<Self, Error> {
        Ok(Self {})
    }

    fn next(&mut self, _pipeline: &mut Pipeline, receiver: &mut MsgReceiver) -> ElementResult {
        loop {
            let msg = receiver.recv(0)?;
            let bytes = msg.as_bytes();
            eprintln!(
                "msg ={}",
                bytes
                    .iter()
                    .map(|x| format!(" {:02X}", x))
                    .collect::<String>()
            );
        }
    }
}