Browse Source

Sync FromIterable with AsStream.

Eugene Lazutkin 3 years ago
parent
commit
1e1ea2a96c
1 changed files with 109 additions and 63 deletions
  1. 109 63
      src/utils/FromIterable.js

+ 109 - 63
src/utils/FromIterable.js

@@ -1,94 +1,140 @@
 'use strict';
 
 const {Readable} = require('stream');
+const defs = require('../defs');
 
 class FromIterable extends Readable {
+  static resolved = Promise.resolve();
+
   constructor(options) {
     super(Object.assign({}, options, {objectMode: true}));
-    this._iterable = null;
-    this._next = null;
-    if (options) {
-      'iterable' in options && (this._iterable = options.iterable);
+    const fn = options && options.iterable;
+    if (typeof fn == 'function') {
+      this._fn = fn;
+    } else if (fn) {
+      if (typeof fn[Symbol.asyncIterator] == 'function') {
+        this._fn = fn[Symbol.asyncIterator].bind(fn);
+      } else if (typeof fn[Symbol.iterator] == 'function') {
+        this._fn = fn[Symbol.iterator].bind(fn);
+      }
     }
-    !this._iterable && (this._read = this._readStop);
+    if (!this._fn)
+      throw TypeError(
+        'Only a function or an object with an iterator is accepted as the first argument'
+      );
+
+    // pump variables
+    this._paused = FromIterable.resolved;
+    this._resolvePaused = null;
+    this._queue = [];
+
+    this._startPump();
   }
 
   _read() {
-    if (Symbol.asyncIterator && typeof this._iterable[Symbol.asyncIterator] == 'function') {
-      this._next = this._iterable[Symbol.asyncIterator]();
-      this._iterable = null;
-      this._read = this._readNext;
-      this._readNext();
+    this._resume();
+  }
+
+  // pause/resume
+  _resume() {
+    if (!this._resolvePaused) return;
+    this._resolvePaused();
+    this._resolvePaused = null;
+    this._paused = FromIterable.resolved;
+  }
+  _pause() {
+    if (this._resolvePaused) return;
+    this._paused = new Promise(resolve => (this._resolvePaused = resolve));
+  }
+
+  // data processing
+  _pushResults(values) {
+    if (values && typeof values.next == 'function') {
+      // generator
+      this._queue.push(values);
       return;
     }
-    if (Symbol.iterator && typeof this._iterable[Symbol.iterator] == 'function') {
-      this._next = this._iterable[Symbol.iterator]();
-      this._iterable = null;
-      this._read = this._readNext;
-      this._readNext();
-      return;
+    // array
+    this._queue.push(values[Symbol.iterator]());
+  }
+  async _pump() {
+    const queue = this._queue;
+    while (queue.length) {
+      await this._paused;
+      const gen = queue[queue.length - 1];
+      let result = gen.next();
+      if (result && typeof result.then == 'function') {
+        result = await result;
+      }
+      if (result.done) {
+        queue.pop();
+        continue;
+      }
+      const value = result.value;
+      if (value && typeof value.then == 'function') {
+        value = await value;
+      }
+      await this._sanitize(value);
     }
-    if (typeof this._iterable.next == 'function') {
-      this._next = this._iterable;
-      this._iterable = null;
-      this._read = this._readNext;
-      this._readNext();
-      return;
+  }
+  async _sanitize(value) {
+    if (value === undefined || value === null || value === defs.none) return;
+    if (value === defs.stop) throw new defs.Stop();
+
+    if (defs.isMany(value)) {
+      this._pushResults(defs.getManyValues(value));
+      return this._pump();
     }
-    const result = this._iterable();
-    this._iterable = null;
-    if (result && typeof result.then == 'function') {
-      result.then(value => this.push(value), error => this.emit('error', error));
-      this._read = this._readStop;
-      return;
+
+    if (defs.isFinalValue(value)) {
+      // a final value is not supported, it is treated as a regular value
+      value = defs.getFinalValue(value);
+      return this._processValue(value);
     }
-    if (result && typeof result.next == 'function') {
-      this._next = result;
-      this._read = this._readNext;
-      this._readNext();
-      return;
+
+    if (!this.push(value)) {
+      this._pause();
     }
-    this.push(result);
-    this._read = this._readStop;
   }
-
-  _readNext() {
-    for (;;) {
-      const result = this._next.next();
-      if (result && typeof result.then == 'function') {
-        result.then(
-          value => {
-            if (value.done || value.value === null) {
-              this.push(null);
-              this._next = null;
-              this._read = this._readStop;
-            } else {
-              this.push(value.value);
-            }
-          },
-          error => this.emit('error', error)
-        );
-        break;
-      }
-      if (result.done || result.value === null) {
+  async _startPump() {
+    try {
+      const value = this._fn();
+      await this._processValue(value);
+      this.push(null);
+    } catch (error) {
+      if (error instanceof defs.Stop) {
         this.push(null);
-        this._next = null;
-        this._read = this._readStop;
-        break;
+        this.destroy();
+        return;
       }
-      if (!this.push(result.value)) break;
+      throw error;
     }
   }
-
-  _readStop() {
-    this.push(null);
+  async _processValue(value) {
+    if (value && typeof value.then == 'function') {
+      // thenable
+      return value.then(value => this._processValue(value));
+    }
+    if (value && typeof value.next == 'function') {
+      // generator
+      this._pushResults(value);
+      return this._pump();
+    }
+    return this._sanitize(value);
   }
 
   static make(iterable) {
-    return new FromIterable(typeof iterable == 'object' && iterable.iterable ? iterable : {iterable});
+    return new FromIterable(
+      typeof iterable == 'object' && iterable.iterable ? iterable : {iterable}
+    );
   }
 }
+
 FromIterable.fromIterable = FromIterable.make;
 FromIterable.make.Constructor = FromIterable;
 
 module.exports = FromIterable;
+
+// to keep ESM happy:
+module.exports.fromIterable = FromIterable.make;
+module.exports.make = FromIterable.make;