ソースを参照

Refactored the stream wrapper to handle multiple values better.

Eugene Lazutkin 3 年 前
コミット
a105331fd9
1 ファイル変更66 行追加71 行削除
  1. 66 71
      utils/Stream.js

+ 66 - 71
utils/Stream.js

@@ -4,72 +4,59 @@ const {Duplex} = require('stream');
 const defs = require('../defs');
 
 class Stream extends Duplex {
+  static resolved = Promise.resolve();
+
   constructor(fn, options) {
     super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
     if (typeof fn != 'function') throw Error('Only function is accepted as the first argument');
     this._fn = fn;
     // pump variables
-    this._pause = true;
+    this._paused = Stream.resolved;
+    this._resolvePaused = null;
     this._queue = [];
-    this._pending = false;
-    this._chunk = this._encoding = this._callback = null;
   }
 
   _write(chunk, encoding, callback) {
-    if (this._pause || this._queue.length) {
-      this._pending = true;
-      this._chunk = chunk;
-      this._encoding = encoding;
-      this._callback = callback;
-      return;
-    }
-    this._process(chunk, encoding).then(
+    this._processChunk(chunk, encoding).then(
       () => callback(null),
       error => callback(error)
     );
   }
   _final(callback) {
-    if (this._pause || this._queue.length) {
-      this._pending = true;
-      this._chunk = this._encoding = null;
-      this._callback = callback;
-      return;
-    }
+    // TODO: add isFlushable()
     this.push(null);
     callback(null);
   }
   _read() {
-    this._pause = false;
-    this._pump(this).then(() => this._pushPending());
+    this._resume();
   }
 
-  async _process(chunk, encoding) {
-    try {
-      const result = this._fn(chunk, encoding);
-      if (result && typeof result.then == 'function') {
-        // thenable
-        return await result.then(result => this._sanitize(result));
-      }
-      if (result && typeof result.next == 'function') {
-        // generator
-        this._pushResults(result);
-        return await this._pump();
-      }
-      await this._sanitize(result);
-    } catch (error) {
-      if (error instanceof defs.Stop) {
-        this.push(null);
-        this.destroy();
-        this._queue = [];
-        this._chunk = this._encoding = this._callback = null;
-        return;
-      }
-      throw error;
+  // pause/resume
+  _resume() {
+    if (!this._resolvePaused) return;
+    this._resolvePaused();
+    this._resolvePaused = null;
+    this._paused = Stream.resolved;
+  }
+  _pause() {
+    if (this._resolvePaused) return;
+    this._paused = new Promise(resolve => (this._resolvePaused = resolve));
+  }
+
+  // data processing
+  _pushResults(values) {
+    if (values && typeof values.next == 'function') {
+      // generator
+      this._queue.push(values);
+      return;
     }
+    // array
+    this._queue.push(values[Symbol.iterator]());
   }
   async _pump() {
     const queue = this._queue;
-    while (!this._pause && queue.length) {
+    while (queue.length) {
+      await this._paused;
       const gen = queue[queue.length - 1];
       let result = gen.next();
       if (result && typeof result.then == 'function') {
@@ -86,42 +73,50 @@ class Stream extends Duplex {
       await this._sanitize(value);
     }
   }
-  _pushResults(results) {
-    if (results && typeof results.next == 'function') {
-      // generator
-      this._queue.push(results);
-    } else {
-      // array
-      this._queue.push(results[Symbol.iterator]());
-    }
-  }
-  async _sanitize(result) {
-    if (result !== undefined && result !== null && result === defs.none) return;
-    if (result === defs.stop) throw new defs.Stop();
+  async _sanitize(value) {
+    if (value === undefined || value === null || value === defs.none) return;
+    if (value === defs.stop) throw new defs.Stop();
 
-    if (defs.isMany(result)) {
-      result = defs.getManyValues(result);
-      this._pushResults(result);
+    if (defs.isMany(value)) {
+      this._pushResults(defs.getManyValues(value));
       return this._pump();
     }
 
-    if (defs.isFinalValue(result)) {
-      result = defs.getFinalValue(result);
+    if (defs.isFinalValue(value)) {
+      value = defs.getFinalValue(value);
+      await this._processValue(value);
+      throw new defs.Stop(); // is it the correct handling of the final value?
     }
 
-    this._pause = !this.push(result);
+    if (!this.push(value)) {
+      this._pause();
+    }
   }
-  async _pushPending() {
-    if (this._pause || !this._pending) return;
-    const chunk = this._chunk,
-      encoding = this._encoding,
-      callback = this._callback;
-    this._pending = false;
-    this._chunk = this._encoding = this._callback = null;
-    return this._process(chunk, encoding).then(
-      () => callback(null),
-      error => callback(error)
-    );
+  async _processChunk(chunk, encoding) {
+    try {
+      const value = this._fn(chunk, encoding);
+      await this._processValue(value);
+    } catch (error) {
+      if (error instanceof defs.Stop) {
+        this.push(null);
+        this.destroy();
+        // clean up
+        return;
+      }
+      throw error;
+    }
+  }
+  async _processValue(value) {
+    if (value && typeof value.then == 'function') {
+      // thenable
+      return value.then(value => this._processValue(value));
+    }
+    if (value && typeof value.next == 'function') {
+      // generator
+      this._pushResults(value);
+      return this._pump();
+    }
+    return this._sanitize(value);
   }
 
   static make(fn, options) {