undefined

bokuweb.me

Denoを読む(1)

正月にDenoを読んでたメモです。いろいろ間違ってる可能性が高いのでご注意ください。

Denoとは

deno.land

Node.jsの作者Ryan Dahl氏による新しいTypeSciprtのランタイム。Node.jsの反省点を生かして作られてる。 おおきく分けてTypeScript、V8、Rustの三層で構成されていてTypeScriptとRust間はFlatBuffersでやり取りされ、仲介としてC++で書かれたlibdenoが存在する。

参考資料

scrapbox.io

denolib.gitbook.io

yosuke-furukawa.hatenablog.com

読んでいく

前提

実装は日に日に変化しているのでひとまず以下のバージョンについてのメモとする

github.com

Cargo.toml

まずはCargo.tomlを眺めてみる。package.jsonみたいなやつです。dependenciesは以下のような感じ。特段目を引くようなものは見当たらないようにみえる。

[dependencies]
atty = "=0.2.11"
dirs = "=1.0.4"
flatbuffers = "=0.5.0"
futures = "=0.1.25"
getopts = "=0.2.18"
http = "=0.1.14"
hyper = "=0.12.19"
hyper-rustls = "=0.15.0"
kernel32-sys = "=0.2.2"
lazy_static = "=1.2.0"
libc = "=0.2.46"
log = "=0.4.6"
rand = "=0.6.3"
remove_dir_all = "=0.5.1"
ring = "=0.13.5"
rustyline = "=2.1.0"
serde_json = "1.0.34"
source-map-mappings = "0.5.0"
tempfile = "=3.0.5"
tokio = "=0.1.13"
tokio-executor = "=0.1.5"
tokio-fs = "=0.1.4"
tokio-io = "=0.1.10"
tokio-process = "=0.2.3"
tokio-threadpool = "=0.1.9"
url = "=1.7.2"
winapi = "=0.3.6"

Rust側を見てく

エントリポイントはsrc/main.rsぽいのでここから読んでいく。

  • src/main.rs
fn main() {
  // ... ommited ... 基本的にはロガーの設定

  let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
  let snapshot = snapshot::deno_snapshot();
  let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
  tokio_util::init(|| {
    isolate
      .execute("denoMain();")
      .unwrap_or_else(print_err_and_exit);
    isolate.event_loop().unwrap_or_else(print_err_and_exit);
  });
}

前半はロガーの設定などをぼちぼちやる感じ。

isolate::IsolateStateisolate用のフラグやworker用channelsの保持用ぽい。まずこいつを作る。そもそもisolateは何かというとコンテキストが隔離されたJS実行環境と思えばいいのだろうか。chromeでのタブやworkerをイメージすれば良さそう(多分)。実際、最近入ったworker対応でもやはりworker作成時にisolateを作成している。

github.com

let snapshot = snapshot::deno_snapshot()ではv8のsnapshotを作成している。deno_snapshot()は以下。

  • src/snapshot.rs
pub fn deno_snapshot() -> deno_buf {
  #[cfg(not(feature = "check-only"))]
  let data =
    include_bytes!(concat!(env!("GN_OUT_DIR"), "/gen/snapshot_deno.bin"));
  // ... ommited .../

  unsafe { deno_buf::from_raw_parts(data.as_ptr(), data.len()) }
}

deno_snapshotはこれだけでinclude_bytes!でファイルをごそっと読んでそのポインタと長さを返しているだけの様子。snapshotはなんぞやという話は以下を読むと良さそう。

v8.dev

コンテキスト作成時にV8のヒープにロードするのには時間がかかるので、ロード後のsnapshotを撮っておいてそれを使用することで起動を速くする仕組みっぽい。上の記事でもまさにTypeScriptのコンパイラの話をしている。Denoではtools/build.py実行時にdeno/js配下のファイルがトランスパイルかつV8のヒープロードされた状態でスナップショットにされるぽい。なのでjs/*.tsを変更した場合は再ビルドしないと反映されない。ちなみにnew Date()Math.random()は値が焼き付くようなことが書いてある。

あとはtokioの中でdenoMainを実行して、isolate.event_loop()でタスクがなくなるまで待つことになっているぽい。タスクがなくなったらループを抜けて終了する。

tokio_util::init(|| {
  isolate
    .execute("denoMain();")
    .unwrap_or_else(print_err_and_exit);
  isolate.event_loop().unwrap_or_else(print_err_and_exit);
});

tokioの初期化は以下のようになっている。tokioのチュートリアルもやったがこの辺何をやってるのかまだちゃんとわかってない。宿題。

pub fn init<F>(f: F)
where
  F: FnOnce(),
{
  let rt = tokio::runtime::Runtime::new().unwrap();
  let mut executor = rt.executor();
  let mut enter = tokio_executor::enter().expect("Multiple executors at once");
  tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
}

そもそもtokioってなにかというとRustの非同期I/Oライブラリで、イベントループを作ってLinuxであればepoll、BSDであればkqueueを使ってディスクリプタを監視し適宜処理を行うやつでNode.jsでいうところのlibuvの役割を果たしているようにみえる。違ったら指摘いただけると。。。 denoを読み始めたんだけど、結局tokioを学ばなければならないとなって正月はほぼ以下を読んでいた。以下はtokioの学習用の簡易実装でいろいろ勉強になる。ひとまずこれを読めばどんなことをやっているかはわかる。(tokioのおもちゃ実装ということで昔はtoykioという名前だった) fahrenheitでは簡素化とポータビリティのためepollではなくselectを使用している。と書いてある。

github.com

ブログ記事もある。

rust-lang-nursery.github.io

ただ、まだ理解できていないのでもう少し勉強して理解できたら別途まとめたい。

isolate.event_loop()がどうなってるかというと以下のようになっていて、self.is_idle()が真になるまでループを抜けてこない。self.is_idle()は非同期タスクが0かつ設定されたtimeoutがなくなると真となる。なので非同期タスクがない(たとえば、console.log("hello");などを実行した)場合は待ちタスクがないのですぐアイドルと判定されループを抜けて終了する。

  pub fn event_loop(&self) -> Result<(), JSError> {
    while !self.is_idle() {
      match recv_deadline(&self.rx, self.get_timeout_due()) {
        Ok((req_id, buf)) => self.complete_op(req_id, buf),
        Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
        Err(e) => panic!("recv_deadline() failed: {:?}", e),
      }
      // ommited... promise error check
    }
    // ommited... promise error check
    Ok(())
  }

ループ内では、recv_deadline(&self.rx, self.get_timeout_due())で非同期タスク完了のメッセージを待ち続けることになる。

では送信元はどこかというとdeno/src/isolate.rsextern "C" fn pre_dispatchの以下の箇所っぽい。タスクを登録して、その完了時にsender.sendでメッセージを送信している。

let task = op
  .and_then(move |buf| {
    let sender = tx; // tx is moved to new thread
    sender.send((req_id, buf)).expect("tx.send error");
    Ok(())
  }).map_err(|_| ());
  tokio::spawn(task);

extern "C"がついていることからもC++で書かれたlibsenoから叩かれる箇所だと推測できる。追ってみるとIsorate::newlibdeno::configに受信コールバックとして渡されている。

let config = libdeno::deno_config {
  will_snapshot: 0,
  load_snapshot: snapshot,
  shared: libdeno::deno_buf::empty(), // TODO Use for message passing.
  recv_cb: pre_dispatch,
  resolve_cb,
};

let task = op.and_then(...)opは何かというと、以下のようなシグネチャになってる。

pub type Op = Future<Item = Buf, Error = DenoError> + Send;

deno/src/ops.rsdispatchの返り値となっており、dispatchでメッセージのデシリアライズ後matchでファイルの読み書きやフェッチなどの処理に振り分けられる。例えばメッセージの種別がReadFileであれば以下のようにop_read_fileに振り分けられる。

pub fn dispatch(
  isolate: &Isolate,
  control: libdeno::deno_buf,
  data: libdeno::deno_buf,
) -> (bool, Box<Op>) {
  let base = msg::get_root_as_base(&control);
  let is_sync = base.sync();
  let inner_type = base.inner_type();
  let cmd_id = base.cmd_id();
  let op: Box<Op> = if inner_type == msg::Any::SetTimeout {
    // ... ommited ...
  } else {
    // Handle regular ops.
    let op_creator: OpCreator = match inner_type {
      msg::Any::ReadFile => op_read_file,
      // ... 他の実処理に分岐される ...

たとえば一番シンプルな処理っぽいchdirであれば以下のような感じ。該当する処理を行ってBox<Op>を返すという感じ。

fn op_chdir(
  _state: &Arc<IsolateState>,
  base: &msg::Base,
  data: libdeno::deno_buf,
) -> Box<Op> {
  assert_eq!(data.len(), 0);
  let inner = base.inner_as_chdir().unwrap();
  let directory = inner.directory().unwrap();
  Box::new(futures::future::result(|| -> OpResult {
    std::env::set_current_dir(&directory)?;
    Ok(empty_buf())
  }()))

ここでの結果がpre_dispatchis_syncフラグと一緒に戻されて、非同期/同期で処理が分岐される。

例えば同期モードであれば(https://github.com/denoland/deno/blob/6f79ad721a9f8c9d66d79f21ea479286f3ca5374/src/isolate.rs#L416-L425) のようにbloking_onで処理の完了を待ってからレスポンスメッセージが送られる。

let buf = tokio_util::block_on(op).unwrap();
let buf_size = buf.len();

if buf_size == 0 {
  // FIXME
 isolate.state.metrics_op_completed(buf.len());
} else {
  // Set the synchronous response, the value returned from isolate.send().
  isolate.respond(req_id, buf);

非同期の場合は先に記載したように処理の完了を待って完了後、完了が通知される。この通知は先のisolate.event_loop()内で受信されて非同期タスクの完了処理が実行される。完了処理は現在待機中のタスク数のデクリメント(tokio側のAPIを使いたい旨のコメントがあったが、問題があるのか現在は手動で行っている。)とV8側へのレスポンス

let tx = isolate.tx.clone();
isolate.ntasks_increment();
let task = op
  .and_then(move |buf| {
    let sender = tx; // tx is moved to new thread
    sender.send((req_id, buf)).expect("tx.send error");
    Ok(())
  }).map_err(|_| ());
tokio::spawn(task);

TypeScript側を見てく

Rust側の大枠の流れはわかったのでTypeScript側を見てみる エントリポイントはjs/main.ts。ここにRust側から呼ばれていたdenoMainがある。

export default function denoMain() {
  libdeno.recv(handleAsyncMsgFromRust);
  const startResMsg = sendStart();

  // ... ommited ...

  os.setPid(startResMsg.pid());

  const cwd = startResMsg.cwd();
  log("cwd", cwd);

  for (let i = 1; i < startResMsg.argvLength(); i++) {
    args.push(startResMsg.argv(i));
  }
  log("args", args);
  Object.freeze(args);
  const inputFn = args[0];

  compiler.recompile = startResMsg.recompileFlag();

  if (inputFn) {
    compiler.run(inputFn, `${cwd}/`);
  } else {
    replLoop();
  }
}

まずはlibdeno.recv(handleAsyncMsgFromRust);でRust側からの受信コールバックを設定する。

const promiseTable = new Map<number, util.Resolvable<msg.Base>>();

export function handleAsyncMsgFromRust(ui8: Uint8Array) {
  // If a the buffer is empty, recv() on the native side timed out and we
  // did not receive a message.
  if (ui8.length) {
    const bb = new flatbuffers.ByteBuffer(ui8);
    const base = msg.Base.getRootAsBase(bb);
    const cmdId = base.cmdId();
    const promise = promiseTable.get(cmdId);
    util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
    promiseTable.delete(cmdId);
    const err = errors.maybeError(base);
    if (err != null) {
      promise!.reject(err);
    } else {
      promise!.resolve(base);
    }
  }
  // Fire timers that have become runnable.
  fireTimers();
}

基本的にここに到達するのはTypeScript側から非同期処理を呼んでその応答がRust側から返ってきたケース(だと思う)。非同期処理開始メッセージを送る祭にcommandIdをキーにPromisepromiseTableに登録しておいて、返ってきたメッセージのcommandIdをキーにそれを回収、resolve/rejectを実行してるっぽい。

ちょうど下にsendAsyncというのがいた。promiseTable.set(cmdId, promise);してる。

export function sendAsync(
  builder: flatbuffers.Builder,
  innerType: msg.Any,
  inner: flatbuffers.Offset,
  data?: ArrayBufferView
): Promise<msg.Base> {
  const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false);
  util.assert(resBuf == null);
  const promise = util.createResolvable<msg.Base>();
  promiseTable.set(cmdId, promise);
  return promise;
}

次にconst startResMsg = sendStart(); でスタートメッセージを同期モードで送信している。Rust側で各メッセージに対して何をやっているかはops.rsを見ればいいのがわかったいるので覗いてみる。

let inner = msg::StartRes::create(
  &mut builder,
  &msg::StartResArgs {
    cwd: Some(cwd_off),
    pid: std::process::id(),
    argv: Some(argv_off),
    debug_flag: state.flags.log_debug,
    recompile_flag: state.flags.recompile,
    types_flag: state.flags.types,
    version_flag: state.flags.version,
    v8_version: Some(v8_version_off),
    deno_version: Some(deno_version_off),
    ..Default::default()
  },
);

基本的にはRust側でもている基本的情報を返信しているだけっぽい。返してるメッセージは上記のようなものでフラグとか引数、プロセスIDやバージョンなどを詰めて返している模様。 あとは返ってきたメッセージの引数やフラグなどを処理して以下のようにファイルが指定されていればコンパイルして実行。なければREPLモードに入るっぽい。

if (inputFn) {
  compiler.run(inputFn, `${cwd}/`);
} else {
  replLoop();
}

compiler.runの先はどうなってるかまだちゃんとみてないけど、CodeFetchというメッセージが同期が飛んでるのでRust側で該当ファイルを読んで返却後トランスパイルしてどこかにキャッシュしてるのかな。今度みる。

FlatBuffers

メッセージのやり取りにはFlatBuffersが使用されているが、定義はsrc/msg.fbsにいる。

github.com

tools/build.pyを実行するとTypeScriptとRustのコードがtarget/debug/gen/配下にmsg_generated.rsmsg_generated.tsとして生成される。

たとえば先のstartメッセージのレスポンスであれば以下のように定義されている。

table StartRes {
  cwd: string;
  argv: [string];
  debug_flag: bool;
  deps_flag: bool;
  recompile_flag: bool;
  types_flag: bool;
  version_flag: bool;
  deno_version: string;
  v8_version: string;
}

FlatBuffersはロード時にパースせず値が必要なときまで後回しするなどオーバーヘッドが少なく速いらしい。 このへんもまた今度詳しく調べてみる。

qiita.com

setTimeoutを実行してみる

だいたいの流れはわかったのでひとまず何か非同期処理を実行してみる。まずはsetTimeoutを試してみる。あとこの辺試してて気づいたんですが、microtaskのqueueはV8側で面倒見てくれるっぽい。知らなかった。

setTimeout(() => console.log("hello"), 1000);

を実行してみてその流れをみてみる。

setTimeoutjs/timer.tsに定義されている。

export function setTimeout(
  cb: (...args: Args) => void,
  delay: number,
  ...args: Args
): number {
  return setTimer(cb, delay, args, false);
}

これをたどっていくとsetGlobalTimeoutでメッセージを作って送信しているのがわかる。ただし、sendSync で送られている。timeout周りは非同期ながら若干特別扱いされてるっぽい。

function setGlobalTimeout(due: number | null, now: number) {
  // ... ommitted...
  // Send message to the backend.
  const builder = flatbuffers.createBuilder();
  msg.SetTimeout.startSetTimeout(builder);
  msg.SetTimeout.addTimeout(builder, timeout);
  const inner = msg.SetTimeout.endSetTimeout(builder);
  const res = sendSync(builder, msg.Any.SetTimeout, inner);

  globalTimeoutDue = due;
}

これはsrc/ops.rsdispatchのメッセージから各処理への分岐部分に書いてあった。例外的に同期処理として扱われメインスレッドで更新されるとのこと。

let op: Box<Op> = if inner_type == msg::Any::SetTimeout {
  // SetTimeout is an exceptional op: the global timeout field is part of the
  // Isolate state (not the IsolateState state) and it must be updated on the
  // main thread.
  assert_eq!(is_sync, true);
  op_set_timeout(isolate, &base, data)
}

op_set_timeoutを見るとどうもisolate側にtimeout値を設定しているだけのよう。そして同期モードのメッセージなのでdummyの空bufferをひとまず返してTypeScript側がブロックしないようにしてるっぽい。

fn op_set_timeout(
  isolate: &Isolate,
  base: &msg::Base,
  data: libdeno::deno_buf,
) -> Box<Op> {
  let inner = base.inner_as_set_timeout().unwrap();
  let val = inner.timeout() as i64;
  let timeout_due = if val >= 0 {
    Some(Instant::now() + Duration::from_millis(val as u64))
  } else {
    None
  };
  isolate.set_timeout_due(timeout_due);
  ok_future(empty_buf())
}

timeout_dueがセットされると最初の方で記載したisolate.eventloopself.is_idleが偽になってrecv_deadlineで受信待ちになる。

pub fn event_loop(&self) -> Result<(), JSError> {
  while !self.is_idle() {
    match recv_deadline(&self.rx, self.get_timeout_due()) {
      Ok((req_id, buf)) => self.complete_op(req_id, buf),
      Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
      Err(e) => panic!("recv_deadline() failed: {:?}", e),
    }
    // ... ommited ...
 }

recv_deadlineは以下のようになっている。dueが設定されていれば、rx.recv_timeout(timeout)でタイムアウトを待つ。が、その後非同期メッセージを受信した場合は一旦rx.recv_timeoutから抜けてきてしまうので、後続の非同期タスクを登録したあと、次ループで再度rx.recv_timeoutで待つんだと思う。

fn recv_deadline<T>(
  rx: &mpsc::Receiver<T>,
  maybe_due: Option<Instant>,
) -> Result<T, mpsc::RecvTimeoutError> {
  match maybe_due {
    None => rx.recv().map_err(|e| e.into()),
    Some(due) => {
        let now = Instant::now();
      let timeout = if due > now {
        due - now
      } else {
        Duration::new(0, 0)
      };
      rx.recv_timeout(timeout)
    }
  }
}
let now = Instant::now();
let timeout = if due > now {
  due - now
} else {
  Duration::new(0, 0)
};

とやっているのは一度ループから抜けた際に経過してしまった時間を吸収してるっぽい。なので、常に設定されるタイマーは一個になる気がする。 なので以下を実行した場合も1,000msと2,000msのタイマーが設定されるわけではなく1,000msのタイマーを待ったあと再度差分の1,000ms(実際には997とか微妙に減った値だと思う)が設定されるぽい。

setTimeout(() => {...}, 1000);
setTimeout(() => {...}, 2000);

そうするとTypeScript側も工夫が必要で複数のタイマーはひとまずconst dueMap: { [due: number]: Timer[] } = Object.create(null); に管理されるっぽい。稼働中のタイマーのみglobalTimeoutDueにセットされて管理される。現在のタイマーが完了前に次のタイマー設定が来た場合はglobalTimeoutDueが未設定、もしくは globalTimeoutDueより期限が近いタイマーが新たに設定されるっぽい。そのへんをやってるのが以下。

function schedule(timer: Timer, now: number) {
  assert(!timer.scheduled);
  assert(now <= timer.due);
  let list = dueMap[timer.due];
  if (list === undefined) {
    list = dueMap[timer.due] = [];
  }
  list.push(timer);
  timer.scheduled = true;
  if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
    setGlobalTimeout(timer.due, now);
  }
}

なのでタイマーがセットされると同時にタイムアウト完了コールバックも同時に設定され、コールバック内で次に設定すべきタイムアウトがあれば経過時間を調整して設定、なければ完了メッセージ(timeout = -1)を送っている。Rust側では完了メッセージを受けたら、timeout_dueNoneを設定して(他にまちタスクがなければ)isolate.eventloopを抜けて終了という流れっぽい。

なのでhandleAsyncMsgFromRustfireTimersはまさにそれようなんですね。バッファが空の場合はタイムアウトという取り決めのもと次のタイマーをセットしにいってるんだと思う。

export function handleAsyncMsgFromRust(ui8: Uint8Array) {
  if (ui8.length) {
    // ... ommitted ...
  }
  // Fire timers that have become runnable.
  fireTimers();
}

かなり昔にtokio_timerを使ってタイマーを実装するというissueを見かけた気がしたけど、そのような作りではなく、どのような議論を経てこの実装になっているのかはちょっとわからん。

非同期処理を確認したかったんだけどsetTimeoutはちょっと特殊だったっぽい。次はreadFileとかcodeFetch周りを読めたら読みたい。

ひとまずここまで。

Deno用のpretty_assertを作った

あけましておめでとうございます。

tl;dr

Denoの入門に以下を作った

github.com

f:id:bokuweb:20190104183220p:plain

Deno?

https://deno.land/

概要

Ryan DahlがRustでDenoというものを作っていると聞いたとき貢献したいなーと思っていたけど、忙しさを言い訳に長い間ビルドすらできずにいた。

そんな中最近になってTLでDenoを楽しそうに触ってる方々がでてきて、みんなあまりに楽しそうなので触発されて自分の始めることにした。特にhashrockさんの記事をみて自分もやるぞ!となった。

hashrock.hatenablog.com

Denoを読んで見る

最近なかなかまとまった時間が取れてなかったけど正月は時間がとれそうだったのでdenoを読むというのをハイプライオリティなタスクとしてスケジュールし、12/31〜1/1はDenoとその周辺を読んでた。

まだまだ理解が怪しいがどのように動いているかは把握できた気がするのでもうちょっとアップデートしてできれば記事にしたい。

Denoのテスト周り

Denoをきりのいいところまで読んだあとまずは人間に見やすいアサートを書いてみるか。ってことになった。 Denoにはテスト用のモジュールがあり以下のように書ける。

import { test, assertEqual } from 'https://deno.land/x/testing/testing.ts';

test({
  name: 'example',
  fn() {
    assertEqual(10, 10);
  },
});

が、テスト失敗時の出力が見にくかった為だ。

実装

モジュールを作りはじめると「あれもない、これもない」となる。具体的にはjestのもっているpretty-formatを使いたかったのだが、直接は使えないのでひとまず export { default } from 'pretty-format'; を書いてrollupでバンドル後@ts-ignore を付加する方法をとった。anyになるしあまりいい方法ではないので今後何かしらいい方法が提案されるんじゃないかな。

これで一応import prettyFormat from './pretty-format/dist/index.js';として使用できる。

あと自分はたまたまno dependenciesのdiffライブラリを作っていたのでこれらを使用してassert結果を色付してやった。(こっちはまったく手をいれずimport diff, { DiffType } from 'https://denopkg.com/bokuweb/wu-diff-js@0.1.6/lib/index.ts';として使用できた。便利。)

denolandにはregistoryが用意してあって、PRを送ってマージされるとhttps://deno.land/x/pretty_assert@0.1.1/index.tsのようなURLで使用できるようになる。

github.com

多くのnpmモジュールは何かしらの方法で使用することはできるけど、NodeのAPIに依存したものはやはり移植する必要があるので、そのあたりから貢献してみるのは勉強にもなるし良さそうだと思った。

wasm-bindgenを使ってRustのモジュールをnode_modulesに持ってくる

この記事はWebAssembly Advent Calendar 2018の21日目です。wasm-bindgenを使用して何かしてみたいと思っていたので、今回は以前Rustで実装した画像の差分を取るツールをwasm-bindgenを使用してnode_modulesとして使用できるようにしてみたいと思います。

adventar.org

移植元

github.com

これはもともと、go-diff-image(https://github.com/murooka/go-diff-image)というgolang製のツールをRustへポーティングしたものになります。

github.com

同じピクセル同士を比較して差分を出力するのではなく、githubのdiffのような感じで画像の差分を可視化するツールです。 以下のような比較画像を生成します。

f:id:bokuweb:20181221223051p:plain

成果物

github.com

手順

さっそくミニマムなプロジェクトを作ってみます。

  • cargo.toml
[package]
name = "node-lcs-img-diff"
version = "0.1.0"
authors = ["bokuweb"]
edition = "2018"

[lib]
crate-type = ["cdylib"]

[dependencies]
wasm-bindgen = "0.2"

wasm-bindgen-cliが入ってない場合はインストールします。

rustup target add wasm32-unknown-unknown --toolchain nightly
cargo +nightly install wasm-bindgen-cli

まずは1を加算する関数で試してみます。

  • src/lib.rs
use wasm_bindgen::prelude::*;

#[wasm_bindgen]
pub fn add_one(n: usize) -> usize {
    n + 1
}

次にMakefileを用意しておきます。 wasm-bindgenはデフォルトブラウザ向けのコードを吐きますが、今回はnodejs向けに--nodejsつけて実行するようにします。

build:
    cargo +nightly build --target wasm32-unknown-unknown --release
    mkdir -p dist
    wasm-bindgen ./target/wasm32-unknown-unknown/release/node_lcs_img_diff.wasm --out-dir ./dist --nodejs

以下でビルド。

$ make build
  • node_lcs_img_diff_bg.d.ts
  • node_lcs_img_diff_bg.js
  • node_lcs_img_diff_bg.wasm
  • node_lcs_img_diff.d.ts
  • node_lcs_img_diff.js

が吐かれる

  • node_lcs_img_diff_bg.d.ts
/* tslint:disable */
export const memory: WebAssembly.Memory;
export function add_one(a: number): number;

内部で使用される定義

  • nodde-lcs_img_diff.d.ts
/* tslint:disable */
export function add_one(arg0: number): number;

公開関数の定義

  • node_lcs_img_diff_bg.js
const path = require('path').join(__dirname, 'node_lcs_img_diff_bg.wasm');
const bytes = require('fs').readFileSync(path);
let imports = {};

const wasmModule = new WebAssembly.Module(bytes);
const wasmInstance = new WebAssembly.Instance(wasmModule, imports);
module.exports = wasmInstance.exports;

wasmの読み込みからinstanciateまで。

  • node_lcs_img_diff.js
/* tslint:disable */
var wasm;

/**
* @param {number} arg0
* @returns {number}
*/
module.exports.add_one = function(arg0) {
    return wasm.add_one(arg0);
};

wasm = require('./node_lcs_img_diff_bg');

使用方法は以下のように呼ぶだけ。

  • index.ts
import { add_one } from './dist/node_lcs_img_diff';

add_one(1); // -> 2

良さそうです。 後はせっせと移植していきます。 注意点としてはVecは返り値として返せないので、そのような場合JSONにしStringを返すことになりそうです。

github.com

細かい部分は省略しますが、Rust側は以下のようになりました。去年以下の記事を書きましたがwasm-bindgenのおかげで受け取る値も返す値もシンプルになっています。以前はArrayBufferのオフセットやデータの長さを受け取り、自分でバッファに変換する必要がありましたが、そのあたりの処理をwasm-bindgenが受け持ってくれているからですね。

qiita.com

どういうことをやっているかざっくり言うと、画層データを2枚受け取ってデコード。差分が発生した領域を計算して、元画像に緑/赤色をブレンドしたあとpngにエンコードして返しています。細かい処理は省略していますが、mainとなるdiff関数は以下のような感じです。

  • lib.rs
#[wasm_bindgen]
pub fn diff(before: &[u8], after: &[u8]) -> String {
    let mut before = load_from_memory(before).expect("Unable to load image from memory");
    let mut after = load_from_memory(after).expect("Unable to load image from memory");
    let encoded_before = create_encoded_rows(&before.raw_pixels(), before.dimensions().0 as usize);
    let encoded_after = create_encoded_rows(&after.raw_pixels(), after.dimensions().0 as usize);
    let result = lcs_diff::diff(&encoded_before, &encoded_after);
    let mut added: Vec<usize> = Vec::new();
    let mut removed: Vec<usize> = Vec::new();
    for d in result.iter() {
        match d {
            &lcs_diff::DiffResult::Added(ref a) => added.push(a.new_index.unwrap()),
            &lcs_diff::DiffResult::Removed(ref r) => removed.push(r.old_index.unwrap()),
            _ => (),
        }
    }
    create_marked_image(&mut after, (99, 195, 99), RATE, &added);
    create_marked_image(&mut before, (255, 119, 119), RATE, &removed);
    serde_json::to_string(&Result {
        after: to_png(&after),
        before: to_png(&before)
    }).unwrap()
}

typescriptの型まで吐いてくれるので以下のように使用できます。

  • index.ts
import { diff } from './dist/node_lcs_img_diff';

const [before, after] = await Promise.all([readFile("YOUR_IMAGE"), readFile("YOUR_IMAGE")]);
JSON.parse(diff(before, after));

実際にはcliや画像の読み書き処理を追加しています。詳しくは以下を参照してみてください。

github.com

あとはbuildしてnpn publishすれば完了です。

速度

自分はよくJavaScriptとwasmの速度比較を行うのですが、今回はJavaScript実装がないのでRust版と速度比較をしてお茶を濁しときます。

  • wasm(node v10.11.0)
Benchmark #1: node . test/images/before.png test/images/after.png --dist test/expected
  Time (mean ± σ):     720.2 ms ±  57.1 ms    [User: 1.150 s, System: 0.145 s]
  Range (min … max):   687.9 ms … 821.9 ms    5 runs
  • Rust 1.31
Benchmark #1: lcs-image-diff test/images/before.png test/images/after.png aaa.png
  Time (mean ± σ):      29.3 ms ±   0.8 ms    [User: 26.2 ms, System: 5.0 ms]
  Range (min … max):    28.2 ms …  32.4 ms    89 runs

ベンチマークにはhyperfineを使用しました。(MacBook Air (11-inch, Early 2015), 1.6 GHz Intel Core i5, 8 GB 1600 MHz DDR3)です。 結構な差がでましたね。どうもwasmの方はwasmファイルのリードからinstansiateまでで200msくらい持ってかれてるようです。diff関数は185msくらいですね。

ファミコンのエミュレータをRust / WebAssembly で書き直した

f:id:bokuweb:20180208090512p:plain

概要

以前、JSで書いた(ファミコンのエミュレータを書いた - undefined)ファミコンのエミュレータをRustで書き直してみた。 また、技術的な内容はQiitaの方にも書いているので興味のある方は参照してみてください。(まだ Hello, World!までしか書けてませんが。)

qiita.com

もともとファミコンのエミュレータって新しい言語を習得するのにちょうどいい題材だったりするのでは、って話しからスタートしてて、よくわからないのでJSで書いてみて、ようやくRustで一通りは実装できた感じ。まだバグや未実装(音声周りやマッパー)も多いんですが、ひとまずはお腹いっぱいな感じ。

成果物

github.com

あと、いくつかのROMは以下で遊べるようにしてます。音が出るので注意してください。 またAPUの実装にまだバグが残っているのDCMチャンネルが未実装なので音が変だったり出てなかったりします。 描画のほうはマッパー0であれば、ほぼほぼ問題ないと思ってます。

また、現状chromeでのみ動きます。firefoxは(多分)esmodulesのフラグを立てれば動きます。

https://bokuweb.github.io/rustynes/

CPUやPPUやRAM / ROM周りをRustで書いて、描画や音はemscriptenを介してJS側でcanvas / WebAudioのAPIを叩くようにしています。 これに着手し始めたときは、wasm32-unknown-emscripten しか選択肢がなかったのですが、今なら、wasm32-unknown-unknown が選択できます。 今から着手するなら、wasm32-unknown-unknown 前提で、stdwebwasm-bindgen辺りを使用しemscriptenを剥がしたほうが良い面が多いような気がします。

過程

移植するだけなので強くてニューゲーム気分だったんですが、駄目ですね。 前回よりわけの分からない状態は減ったものの、それでもおかしなバグには遭遇しました。

Hello world

ここまでは、とにかくコンパイラに怒られまくったりどうやってブラウザで描画するのがいいのか悩んだりで大変でした。Qiitaかどこかにも書きましたがHello, World!を表示するにはCPUがほぼほぼ実装できていないといけないのでやはりここまでは苦労しますね。

GIKO005

スプライトを表示するサンプル。Hello, Worldからここまではすんなり。

GIKO016

スクロールが絡んでくるとやはりよくわからないこと現象に遭遇する

GIKO017

前回対応していなかった8 * 16形式のスプライトに対応した、が、ばぐってて歩く度に顔が上下するキャラクターが産まれてしまった。

Super mario bros

さすがにハマりどこは抑えてたので着手後早い段階でそこそこ描画されてた。

思わぬところがスクロール

隠しブロックが隠れてない

土管が仕事しない

音の伸びが良すぎる このバグまだ解決できてなくて、地下でブロック叩いたときになんとも言えない音が鳴る

falling

f:id:bokuweb:20180208090512p:plain

今回見つけた謎のROM。ひたすらおっさん(?)が落ちていくゲーム。 実はオープンソースで、initial commitが2018年の1月7日になっている。

github.com

つまり2018年産まれのファミコンROMということになる。タイトルの音楽が好きだったりする(ちょっとAPUの実装の都合でおかしいとこありますが)

速度

WebAssemblyというと速度が気になると思います。JS版との比較を行ってみました。 WebAudio周辺は共通かつJSは側の処理なので比較はAPUをディセーブルにして、giko017.nesでメインループの処理時間20回分の平均値を取っています。マシンはMacBook Air (11-inch, Early 2015) , 1.6 GHz Intel Core i5, 8 GB 1600 MHz DDR3

ブラウザ JS版 Wasm版
Chrome 63 4.36ms 5.68ms
Firefox 58 5.76ms 3.98ms
Safari 11 9.98ms 4.21ms

Firefox, Safariではwasmのほうが速かったんですが、chromeではJS版の方が速くwasm版はイマイチでした。emscriptenのグルーコードのオーバーヘッドやrust / wasm周りに不備、チューニング不足も当然あるでしょうが、firefoxでそこそこ速いこと考えると、v8すごいって話しとchrome x wasmはまだまだってことになるんでしょうか。このあたり正直なところよくわかってないです。

さいごに

まだ未実装箇所やバグも多いんですが、お腹いっぱいなのでひとまずここまでにして、何か次に取り掛かりたいなーという思い。 一応Rustで書いたものの、分からないことだらけでもう少し、いろいろ書いてみないとなーという気持ち。

なにかおかしなことしてたらプルリクエストなどいただけると泣いて喜びます

題材としては、以前お世話になったARM7のエミュレータを書いてみるとか、ラズパイにOSを移植してみるとかが楽しそうだなーと思いつつ、goも勉強したいので時間はかかりそうです。

ファミコンのエミュレータを書いた

f:id:bokuweb:20170919214338p:plain

概要

ファミコンのエミュレータをJSでだらだらと作ってた。そこそこ遊べるようになったので公開しておく。技術的な内容は、またどこかで発表したり、Qiitaなどにまとめたい。(忘れないうちに。需要があるかは怪しいが。)

随分昔に作ってみたいなーと思いFPGAでの実装を開始したんだけど、早々に挫折した覚えがある。今思うとFPGAの場合タイミングの問題が付き纏うのでJSで書くより圧倒的に難易度も高いし、ハードエミュレータを実装するにしても前段階としてソフトウェミュレータを実装するのが定石っぽいので無謀だったっぽい。

ひとまずMapper0という基本的なカセット形式のみに対応し、スーパーマリオブラザーズがそこそこ遊べるくらいを目標とした。

成果物

github.com

ファミコンのスペック

  • MPU 6502(RP2A03), 8bit
  • WRAM2KB
  • VRAM 2KB
  • 最大発色数 52色
  • 画面解像度 256x240

MPUは6502にAPUと呼ばれるオーディオプロセッサを搭載したカスタム品。メモリマップを覗くとわかるがAPUがブチ込まれた感が表現されていて良い。結構無茶したんじゃなかろうか。

解像度は256x240。デモを見せた人が口をそろえて「小さい」というが確かに小さい。

上記に加えてPPU(ピクチャープロセッシングユニット)という独自ICが実装されていて、各ハードウェアをひとつずつ再現していくことになる。

過程

Sprite2png

まずはどのように描画すべきなのか理解するためにカセットの中のスプライト領域をpngで出力するツールを書いてみた。

github.com

たとえばスーパーマリオブラザーズのスプライトは以下。これだけであの世界が構築されているのはすごい。

f:id:bokuweb:20170919222738p:plain

Hello world

まずはHello worldだけどCPUとPPU(ピクチャープロセッシングユニット)の背景レンダリングくらいは出来上がっていないといけないのでHello worldまでもそこそこ大変。

このツイート前はcanvasに描画してたんだけど、上のtweetは遊びで1div + cssでレンダリングしたときの。10FPS前後出てた。

ROMは以下で手に入る。C言語版もあるのでわかりやすい。

NES研究室

GIKO005

Hello worldのあとは「ギコ猫でもわかるファミコンプログラミング」を順にやっていくとよい。

gikofami.fc2web.com

これはGIKO005のスプライトを表示するサンプル。パレットがまだ実装されていないのか色がついていない。

GIKO013

これはGIKO013。GIKO013はAPUのサンプルでAPU(オーディオプロセッシングユニット)にどのような矩形波を出力すべきかが書き込まれるのでWebAudioで矩形波を作って音を鳴らしている。WebAudio自体あまり触ったこと無いし、音楽の知識もないのでここは本当にきつかった。

ここまでのサンプルで1Playerのキーパッドや背景スクロール、キャラクター移動などは実装済だった。 これだけ動いていたのでかなり順調で「CPUなんて完璧では?」と思ったりもしたが、この後に大量のバグと不可解な挙動に遭遇する。逆に言うとCPUがボロボロでもこの程度は動く。

GIKO016

縦スクロールのサンプルなんだけど全然だめだった。このあたりからパッと見つかるようなバグは減っていて、ひたすらアセンブラを読みながらデバッグするようになってた。結局「あとちょっとだから」っといって最後までやらなかったんだけど、デバッガの実装を早めにやっておけばよかった。CPUができていれば難しくない。結局全然「あとちょっと」じゃなかった。

GIKO017

これは横スクロールのサンプルで身体がないのは8 * 16形式のスプライトに対応してないから。マリオは8 * 8で動いたので、結局まだ8 * 16は対応してない。比較的このサンプルはすんなり動いた。

nestest

nes用のテストROMを発見してテストが通るようにデバッグを開始した。このROMの存在は知ったのはマリオに着手し始めたあとなので、この記事の内容の順番は実際とは多少前後している。このROMは最高でもっと早く試すべきだった。注意点としてはテスト対象のCPU上でこのテストROMが走るため、エラーとなった箇所を鵜呑みにはできない点。とは言えこいつのおかげでかなり進んだ。

CPU / PPU / keypadを早めに優先してこいつをGIKOたちより先に動作させるのが手順としては良さそう。

http://www.qmtpro.com/~nes/misc/nestest.log

テストログも落ちているのでデバッグもしやすい。

Super mario bros

だれもいない。

マリオが出たが、1マス浮いてる。ガタガタ。

味がある。

パレットをいじっていたらクリボーにコインの点滅が移ってしまった。

斜めになりながらものおばあちゃんを思い出した

重力無視してた。左上のスコアの0に黒いものが移っているけど、これはSprite0と呼ばれるもので、こいつが描画された瞬間PPUのあるビットがtrueになる。この画像では描画位置ズレて見えてしまってるが、本当はコインの裏くらいに隠してあるっぽい。

多分プレイ中はスコアやタイムなどのヘッダは固定されていて、コインより下の部分のみスクロール処理が必要なため、この位置においてあるんだと思う。CPUはPPUのsprite 0 hit フラグが立つのをポーリングしていてこいつがtrueになったらスクロールなどを始めるんだと思う。

ちなみにマリオのソースが以下で読める。

A Comprehensive Super Mario Bros. Disassembly · GitHub

未実装

未実装な箇所はたくさんあって例えば以下。 Audioはまだまだバグってるっぽくて変な音なる場合がある。

  • 8 * 16 スプライト
  • 各Mapper
  • Noise audio
  • 2 player keypad
  • DCM

Mapper3くらいは対応してやりたい。

現状

ファイナルファンタジー3の高速飛空艇はCPUのバグを突いてあの速度を実現しているらしく、どんな仕組みなのか解析したいと思ってたけど、そもそもファイナルファンタジー3を単に動かすだけでも道のりは遠そう。まだ実装しなきゃいけない箇所が結構ある。 ひとまずJS版はもういいや、という気持ちになったので今はRust + wasmで書き直してる。楽しいけどRust難しい。

詳細をどっかにまとめようかとは思うが、需要ありますか?

Rust+wasmでライフゲーム

Rustとwasmの入門にライフゲームを書いてみた

成果物

github.com

以下のURLで動作を確認できますが、わらわらしてますので苦手な方は注意してください。 windowサイズを小さくすると60FPSでて楽しいです。

Game of life with rust + wasm

環境構築

環境構築は以下の記事を参考にさせてもらっています。

sbfl.net

また「Think Web」の「Rust + WebAssembly でブラウザで動くLisp処理系を作る」も合わせて参考にさせてもらってます。

techbooster.booth.pm

実装

JS側からRust側へポインタを渡しておき、JS側のrequestAnimaionFrameからRust側から公開されているupdate関数を叩き、更新されたメモリをcanvasに反映するという構成を取っています。

Rust側とメモリを共有

簡略化してますが以下のようにwindowサイズ分の領域を確保して初期化(実際にはランダムにtrue / falseで埋めてますが省略)ポインタとサイズをupdateに渡してます。

  • js
const bufsize = window.innerHeight * window.innerWidth;
const bufptr = Module._malloc(bufsize);

Module._memset(bufptr, 0, bufsize);
let buf = new Uint8Array(Module.HEAPU8.buffer, bufptr, bufsize);

  ... 省略 ...
  
  update(bufsize, buf.byteOffset, column);

Rust側でupdateは以下のようになっていて、もらったポインタ、サイズからsliceを作成して次のステートを作成、バッファに戻す、という処理を行っています。

  • Rust
#[no_mangle]
pub extern "C" fn update(len: usize, ptr: *mut bool, col: usize) {
    let row = len / col;
    let buf: &mut [bool] = unsafe { std::slice::from_raw_parts_mut(ptr, len) };
    let game: Vec<bool> = Game::new(buf, row, col).next();
    buf.clone_from_slice(game.as_slice())
}

また、jsからupdateをを使用するために、公開する関数をbuild時に指定してやる必要があります。 具体的には -C link-args="-s EXPORTED_FUNCTIONS=['_update']"のように指定する必要があります。 実際buildオプションは以下から確認できます。

github.com

Rust側のGame

数十行なので載せときます。 もらったSliceをVec<Vec>に変換して、あとはゲームのルールに従い次のステートを算出しています。

この間、コンパイラにはめちゃめちゃ叱られたし、未だ書ける気がしてこないが、入門にはいい題材だったっぽい。 パターンマッチ好きです。

type Field<T> = Vec<Vec<T>>;

pub struct Game {
    field: Field<bool>,
}

impl Game {
    pub fn new(buf: &[bool], row_size: usize, col_size: usize) -> Game {
        let field = Game::create(buf, row_size, col_size);
        Game { field }
    }

    pub fn next(self) -> Vec<bool> {
        self.field
            .iter()
            .enumerate()
            .map(|(y, r)| self.next_row(r, y))
            .flat_map(|x| x)
            .collect()
    }


    fn next_row(&self, row: &Vec<bool>, y: usize) -> Vec<bool> {
        row.iter()
            .enumerate()
            .map(|(x, _)| self.next_cell(y as i32, x as i32))
            .collect()
    }

    fn next_cell(&self, y: i32, x: i32) -> bool {
        let alive_num = [
            (y - 1, x - 1),
            (y, x - 1),
            (y + 1, x - 1),
            (y - 1, x),
            (y + 1, x),
            (y - 1, x + 1),
            (y, x + 1),
            (y + 1, x + 1),
        ].iter()
            .map(|&(y, x)| self.get_cell_state(y, x))
            .filter(|cell| *cell)
            .collect::<Vec<_>>()
            .len();
        match alive_num {
            3 => true,
            2 if self.is_alive(y as usize, x as usize) => true,
            _ => false,
        }
    }

    fn is_alive(&self, y: usize, x: usize) -> bool {
        self.field[y][x]
    }

    fn create(buf: &[bool], row_size: usize, col_size: usize) -> Field<bool> {
        (0..row_size)
            .into_iter()
            .map(|i| {
                let start = i * col_size;
                let end = start + col_size;
                buf[start..end].to_vec()
            })
            .collect()
    }

    fn get_cell_state(&self, row: i32, column: i32) -> bool {
        match self.field.iter().nth(row as usize) {
            Some(r) => {
                match r.iter().nth(column as usize) {
                    Some(c) => *c,
                    None => false,
                }
            }
            None => false,
        }
    }
}

速度

Rustのコードが完成してから、そのコードをJSにざくっと移植して、速度を測ってみたところ約5倍ほどwasmの方が早い結果となってる。 この結果はJSの最適化がされていないのが主要因だと思ってるので眉唾なんですが、フィボナッチで比較した際3倍程度との記事を見かけたことがあるので最適化を施していくとその辺に落ち着くのかもしれない。この辺は宿題。

今後

書く量が圧倒的が足りないのでテーマアップして継続していきたい。ひとまずはテトリスとか、ファミコンエミュレータを考えてる。キーボードも自作したいしテーマはが尽きなそう。時間が足りない。

wasm化したOpenCVでカメラ入力に笑い男を加えて描画する

OpenCVで試したいことがあり、OpenCV + wasmで入門がてら顔認識を試して遊んでみました。

OpenCVのビルド

wasmへのビルドは参考になるような記事を見つけられず、いろいろ試したものの成功しなかったんですが、ビルドを成功させているリポジトリが発見でき手順通り(cloneするリポジトリ名のみ間違ってて修正しましたが)にやることでビルドできました。

github.com

例えば顔認識分類器のデータの追加変更や任意のモジュールの追加、削除などを行うことを考えると自前でビルドできないと今後きつそうなんですが、正直良くわからないってのが現状です。このあたりは課題。 ビルド筋を鍛えようにも取っ掛かりもないような状態なので、おすすめの資料などがあれば、是非教えていただけると嬉しいです。

wasm

Rustでいろいろ試しているが、その際以下の記事をよく参考にさせてもらったりしてます。

sbfl.net

具体的には以下の用な感じで事前にビルドしたopencvを読み込んで使用する。

    "use strict"
    const name = "wasm/cv-wasm";
    const Module = {
      preRun: [],
      postRun: [],
      wasmBinaryFile: `${name}.wasm`,
      print: text => console.log(text),
      printErr: text => console.error(text),
    };
    fetch(`${name}.wasm`)
      .then(res => res.arrayBuffer())
      .then(buffer => {
        Module.wasmBinary = buffer;
        const script = document.createElement("script");
        script.src = `${name}.js`;
        script.addEventListener('load', (e) => {
          const main = document.createElement('script');
          main.src = 'main.js';
          document.body.appendChild(main);
        });
        document.body.appendChild(script);
      });

入力画像の顔認識

上記のリポジトリにはありがたいことに多くのサンプル、テストが含まれており、その中の顔認識のサンプルを書き直しながら試していきました。

github.com

wasmに対応したブラウザであれば以下で試すことができます。(loadに時間かかります)

https://bokuweb.github.io/cv-wasm-face-detect-sample/index.html

検知できれば以下のように顔部分が赤枠でマークされると思います。

f:id:bokuweb:20170727205411p:plain

カメラ入力の対応と笑い男のレンダリング

ここまでできれば、あとはカメラ入力に対応するだけです。 navigator.getUserMediaで映像を取得しvideoタグに繋いで、rafvideoからcanvsに描画し、顔が検出されていれば、座標とサイズを元に笑い男を描画しています。以下から試すことができますが、雑にcustomElements.defineを使用したがために先のサンプルとは違い動くのは最新のchromeぐらいだと思います。飽きたので最大笑い男数1。

https://bokuweb.github.io/wasm-cv-with-laughing-man/ (loadに時間かかります、カメラ必要)

ちゃんと動くとこんなこんな感じ。

f:id:bokuweb:20170727210000g:plain

顔を傾けると結構認識できなかったり、服のシワをご検知したりします。精度をあげる方法はいくつかあるようでちらっと見かけはしましたが、最終目的としては顔認識したいわけではないので、ひとまずここまで。MacBook Pro (Retina 13-inch、Early 2015) 2.9 GHz Intel Core i5 RAM 8 GB + chrome 59で4FPSとかそんなもんでした。画像を200 × 200くらいにすると60FPSでてた。参考まで。

実装して学ぶRxJS

実際にいくつかのオペレータを実装してみたらRxの気持ちがわかるかと思い実践してみました。 簡素化するために以下の方針とします。

  • unsubscribeしない
  • errorハンドリングしない

実装してみたのは以下です。

  • of
  • map
  • subject
  • filter
  • delay
  • fromPromise
  • combineLatest
  • switchMap

Observable

何はともあれ、まずはObservableを実装します。

class Observable {
  constructor(producer) {
    this.subscribe = producer
  }
}

コードはこれだけで、producerを受け取って、自身のsubscribeに接続します。 producerobserverを引数にとって、次に、どんなタイミングで、どんな値を流すか決定する関数です。 現時点ではイメージもわかないと思うので次ofを眺めたほうがわかりやすいかと思います。

of

次にofを実装します。ofは引数で受け取った値を順に流していくだけの最もシンプルなOperatorの一つです。

github.com

Observable.prototype.of = function (...values) {
  const producer = observer => {
    values.forEach(v => observer.next(v));
    observer.complete();
  }
  return new Observable(producer)
}

observerを引数にとるproducerを作成し、引数の値を順にobserver.nextで流し。完了すればobserber.completeします。 使用例は以下のようになります。1,2,3と値が流れ、completeします。冒頭で述べましたが、observerは本来errorをハンドリングする関数を含みますが簡素化のため削除しています。

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1,2,3).subscribe(observer);

実際の動作を以下で確認することができます。

runkit.com

observersubscribeすることで初めて、producerが実行され、値が流れます。

map

もうひとつシンプルかつ、多用するoperatorであるmapを実装してみます。

Observable.prototype.map = function (f) {
  const producer = observer => {
    return this.subscribe({
      next(value) { observer.next(f(value)) },
      complete() { observer.complete() }
    })
  };
  return new Observable(producer);
}

関数を引数にとり、producerの中で受け取った関数を適用した値をobserver.nextで流します。 先程のofと合わせて以下のように使用します。

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1,2,3).map(x => x * x).subscribe(observer); // 1, 4, 9, complete

1, 4, 9と値が流れcompleteします。 動作は以下で確認できます。

runkit.com

Subject

ここまで来るとあとは応用で、粛々とOperatorを追加していくだけなんですが、先にHotであるSubjectを実装しておきます。

class Subject extends Observable {
  constructor() {
    super(function (observer) {
      this.observers.push(observer);
    });
    this.observers = [];
  }

  next(x) {
    this.observers.forEach((observer) => observer.next(x));
  }

  complete() {
    this.observers.forEach((observer) => observer.complete());
  }
}

SubjectObservableを継承しsubscribeされると配信先を自身のリストに登録されるようにします。 また、登録された配信先に値を流せるようnextを生やします。nextでは登録済の配信先全てに値を流します。

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

const subject$ = new Subject();
subject$.subscribe(observer);
subject$.next('hoge'); // hoge
subject$.next('fuga'); // fuga
subject$.complete();   // complete

良さそう。

runkit.com

filter

後は上記の応用でほとんどのものは実装できます。 filterは名前の通りなんですが、マーブルダイアグラムがあるとよりわかりやすいですね。

f:id:bokuweb:20170413231104p:plain

Observable.prototype.filter = function (f) {
  const producer = observer => {
    return this.subscribe({
      next(value) {
        if (f(value)) observer.next(value)
      },
      complete() { observer.complete() }
    })
  };
  return new Observable(producer);
}
const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1,2,3).map(x => x * x).filter(x => x % 2 === 0).subscribe(observer); // 4, complete

delay

こちらも名前の通り指定時間出力を遅延させるoperatorです。

Observable.prototype.delay = function (time) {
  const producer = observer => {
    return this.subscribe({
      next(value) {
        setTimeout(() => observer.next(value), time)
      },
      complete() {
        setTimeout(() => observer.complete(), time)
      }
    })
  };
  return new Observable(producer);
}

1秒後に1が流れてきます

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable().of(1).delay(1000).subscribe(observer); // 1 after 1sec

fromPromise

promiseからobservableに変換するoperator

Observable.prototype.fromPromise = function (promised) {
  const producer = observer => {
    return this.subscribe({
      next(value) {
        promised.then((a) => {
          observer.next(a)
        })
      },
      complete() {
        promised.then(() => {
          observer.complete()
        })
      }
    })
  };
  return new Observable(producer);
}

5秒後に1が流れてきます

const observer = {
  next: (v) => console.log(v),
  complete: () => console.log('complete'),
}

new Observable()
  .of(null)
  .fromPromise(new Promise(resolve => {
    setTimeout(() => { resolve(1)}, 5000)
  }))
  .subscribe(observer); // 1 after 5sec

combineLatest

頻出ですが、このあたりから結構複雑ですね。 みんなだいすきcombineLatest

本来combineLatestは最後の引数が値を変換するtransform関数なんですが、省略も可能になっていて、今回はObservableの結合のみ行うOperatorとして実装しています。

f:id:bokuweb:20170413224908p:plain * http://rxmarbles.com/#combineLatestより

Observable.prototype.combineLatest = function (...observables) {

  const length = observables.length + 1;
  const producer = outObserver => {
    const values = [...Array(length)].map(_ => undefined);
    const hasValue = [...Array(length)].map(_ => false);
    const hasComplete = [...Array(length)].map(_ => false);

    const next = (x, index) => {
      values[index] = x;
      hasValue[index] = true;
      if (hasValue.every(x => x === true)) outObserver.next(values);
    };

    const complete = (index) => {
      hasComplete[index] = true;
      if (hasComplete.every(x => x === true)) outObserver.complete();
    };

    observables.forEach((observable, index) => {
      observable.subscribe({
        next: (x) => next(x, index + 1),
        complete: () => complete(index + 1),
      });
    });
    this.subscribe({
      next: (x) => next(x, 0),
      complete: () => complete(0),
    });
  };
  return new Observable(producer);
}

以下使用例。

new Observable()
   .of(0)
   .combineLatest(
     new Observable().of(1, 4),
     new Observable().of(2),
     new Observable().of(3).delay(1000),  
   )
   .subscribe(observer);  // [0, 4, 2, 3] after 1sec

動作を文書で説明するのがなかなか難しいOperatorだと思います。 1の値が流れていってしまうことに注意してください。初回のみ各streamの値が出揃うまで値は流れません。 次回以降値が流れてくる度に、他のstreamの最新の値と合わせて配列にパッキングされます。

以下のマーブルダイアグラムを動かしてみるのが一番しっくりくるかもしれません。

rxmarbles.com

動作確認は以下。

runkit.com

こっちはRx

runkit.com

switchMap

こちらも頻出、みんなだいすきswitchMapです。 実装も泥臭くて若干怪しいですが、おもちゃレベルでは動いてそうです。

switchMapは分かりやすいマーブルダイアグラムがないですね。。

Observable.prototype.switchMap = function (f) {
  const producer = outObserver => {
    let i = 0;
    let hasSourceCompleted = false;
    const completed = [];
    this.subscribe({
      next: (x) => {      
        i++;
        completed[i] = false;
        f(x).subscribe({
          next: ((index, y) => {
            if (index === i) {
              outObserver.next(y)
            }
          }).bind(this, i),
          complete: ((index) => {
            completed[index] = true;
            if (hasSourceCompleted && completed.every(x => x)) outObserver.complete();
          }).bind(this, i),
        });
      },
      complete: (() => {
        hasSourceCompleted = true;
        if (hasSourceCompleted && completed.every(x => x)) outObserver.complete();
      }),
    });
  };
  return new Observable(producer);
};

RxのswitchMapPromiseも展開してつないでくれますが、今回はObservableのみ対応しています。 使用例は以下。

const observer = {
  next(value) { console.log(value) },
  complete() { console.log('Done') }
}

new Observable().of(1, 2).switchMap((v) => {
  if (v === 1) return new Observable().of(v).delay(400);
  if (v === 2) return new Observable().of(v).delay(200);
}).subscribe(observer); // 2 after 200ms

12の値が順次流れてきて、1のときは400ms後に1が返るObservableが、2のときは200ms後に2が返るObservableがreturnされます。 200ms後に2が400ms後に1が流れてきそうですが、2switchMapに流れてきた時点でstreamは200ms後に2が返るObservableswitchされるのでobserverまで1が流れてくることはありません。

動作確認用

runkit.com

こっちはRx

runkit.com

さいごに

業務ではAngularを使用しているため、Observableの扱いにはいつも悩んでいて、もう少し仲良くなるために今回は実装してみました。 若干複雑なOperatorもありますが、mapfilterはかなりシンプルで、仕組みを知るにはちょうどいい題材ではないかと思います。 また趣味ではredux-observableを使用していて、わりと気に入っているのでもう少し使いこなせるようになりたいですね。