|
|
@@ -3,64 +3,57 @@
|
|
|
const {Readable} = require('stream');
|
|
|
const defs = require('../defs');
|
|
|
|
|
|
-class FromIterable extends Readable {
|
|
|
- static resolved = Promise.resolve();
|
|
|
-
|
|
|
- constructor(options) {
|
|
|
- super(Object.assign({}, {objectMode: true}, options));
|
|
|
- const fn = options && options.iterable;
|
|
|
- if (typeof fn == 'function') {
|
|
|
- this._fn = fn;
|
|
|
- } else if (fn) {
|
|
|
- if (typeof fn[Symbol.asyncIterator] == 'function') {
|
|
|
- this._fn = fn[Symbol.asyncIterator].bind(fn);
|
|
|
- } else if (typeof fn[Symbol.iterator] == 'function') {
|
|
|
- this._fn = fn[Symbol.iterator].bind(fn);
|
|
|
- }
|
|
|
+const fromIterable = options => {
|
|
|
+ if (!options || !options.iterable) {
|
|
|
+ options = {iterable: options};
|
|
|
+ }
|
|
|
+ let fn = options && options.iterable;
|
|
|
+ if (fn && typeof fn != 'function') {
|
|
|
+ if (typeof fn[Symbol.asyncIterator] == 'function') {
|
|
|
+ fn = fn[Symbol.asyncIterator].bind(fn);
|
|
|
+ } else if (typeof fn[Symbol.iterator] == 'function') {
|
|
|
+ fn = fn[Symbol.iterator].bind(fn);
|
|
|
+ } else {
|
|
|
+ fn = null;
|
|
|
}
|
|
|
- if (!this._fn)
|
|
|
- throw TypeError(
|
|
|
- 'Only a function or an object with an iterator is accepted as the first argument'
|
|
|
- );
|
|
|
-
|
|
|
- // pump variables
|
|
|
- this._paused = FromIterable.resolved;
|
|
|
- this._resolvePaused = null;
|
|
|
- this._queue = [];
|
|
|
-
|
|
|
- this._startPump();
|
|
|
}
|
|
|
+ if (!fn)
|
|
|
+ throw TypeError(
|
|
|
+ 'Only a function or an object with an iterator is accepted as the first argument'
|
|
|
+ );
|
|
|
|
|
|
- _read() {
|
|
|
- this._resume();
|
|
|
- }
|
|
|
+ // pump variables
|
|
|
+ let paused = Promise.resolve(),
|
|
|
+ resolvePaused = null;
|
|
|
+ const queue = [];
|
|
|
|
|
|
// pause/resume
|
|
|
- _resume() {
|
|
|
- if (!this._resolvePaused) return;
|
|
|
- this._resolvePaused();
|
|
|
- this._resolvePaused = null;
|
|
|
- this._paused = FromIterable.resolved;
|
|
|
- }
|
|
|
- _pause() {
|
|
|
- if (this._resolvePaused) return;
|
|
|
- this._paused = new Promise(resolve => (this._resolvePaused = resolve));
|
|
|
- }
|
|
|
+ const resume = () => {
|
|
|
+ if (!resolvePaused) return;
|
|
|
+ resolvePaused();
|
|
|
+ resolvePaused = null;
|
|
|
+ paused = Promise.resolve();
|
|
|
+ };
|
|
|
+ const pause = () => {
|
|
|
+ if (resolvePaused) return;
|
|
|
+ paused = new Promise(resolve => (resolvePaused = resolve));
|
|
|
+ };
|
|
|
+
|
|
|
+ let stream = null; // will be assigned later
|
|
|
|
|
|
// data processing
|
|
|
- _pushResults(values) {
|
|
|
+ const pushResults = values => {
|
|
|
if (values && typeof values.next == 'function') {
|
|
|
// generator
|
|
|
- this._queue.push(values);
|
|
|
+ queue.push(values);
|
|
|
return;
|
|
|
}
|
|
|
// array
|
|
|
- this._queue.push(values[Symbol.iterator]());
|
|
|
- }
|
|
|
- async _pump() {
|
|
|
- const queue = this._queue;
|
|
|
+ queue.push(values[Symbol.iterator]());
|
|
|
+ };
|
|
|
+ const pump = async () => {
|
|
|
while (queue.length) {
|
|
|
- await this._paused;
|
|
|
+ await paused;
|
|
|
const gen = queue[queue.length - 1];
|
|
|
let result = gen.next();
|
|
|
if (result && typeof result.then == 'function') {
|
|
|
@@ -74,67 +67,65 @@ class FromIterable extends Readable {
|
|
|
if (value && typeof value.then == 'function') {
|
|
|
value = await value;
|
|
|
}
|
|
|
- await this._sanitize(value);
|
|
|
+ await sanitize(value);
|
|
|
}
|
|
|
- }
|
|
|
- async _sanitize(value) {
|
|
|
+ };
|
|
|
+ const sanitize = async value => {
|
|
|
if (value === undefined || value === null || value === defs.none) return;
|
|
|
if (value === defs.stop) throw new defs.Stop();
|
|
|
|
|
|
if (defs.isMany(value)) {
|
|
|
- this._pushResults(defs.getManyValues(value));
|
|
|
- return this._pump();
|
|
|
+ pushResults(defs.getManyValues(value));
|
|
|
+ return pump();
|
|
|
}
|
|
|
|
|
|
if (defs.isFinalValue(value)) {
|
|
|
// a final value is not supported, it is treated as a regular value
|
|
|
value = defs.getFinalValue(value);
|
|
|
- return this._processValue(value);
|
|
|
+ return processValue(value);
|
|
|
}
|
|
|
|
|
|
- if (!this.push(value)) {
|
|
|
- this._pause();
|
|
|
+ if (!stream.push(value)) {
|
|
|
+ pause();
|
|
|
}
|
|
|
- }
|
|
|
- async _startPump() {
|
|
|
+ };
|
|
|
+ const startPump = async () => {
|
|
|
try {
|
|
|
- const value = this._fn();
|
|
|
- await this._processValue(value);
|
|
|
- this.push(null);
|
|
|
+ const value = fn();
|
|
|
+ await processValue(value);
|
|
|
+ stream.push(null);
|
|
|
} catch (error) {
|
|
|
if (error instanceof defs.Stop) {
|
|
|
- this.push(null);
|
|
|
- this.destroy();
|
|
|
+ stream.push(null);
|
|
|
+ stream.destroy();
|
|
|
return;
|
|
|
}
|
|
|
throw error;
|
|
|
}
|
|
|
- }
|
|
|
- async _processValue(value) {
|
|
|
+ };
|
|
|
+ const processValue = async value => {
|
|
|
if (value && typeof value.then == 'function') {
|
|
|
// thenable
|
|
|
- return value.then(value => this._processValue(value));
|
|
|
+ return value.then(value => processValue(value));
|
|
|
}
|
|
|
if (value && typeof value.next == 'function') {
|
|
|
// generator
|
|
|
- this._pushResults(value);
|
|
|
- return this._pump();
|
|
|
+ pushResults(value);
|
|
|
+ return pump();
|
|
|
}
|
|
|
- return this._sanitize(value);
|
|
|
- }
|
|
|
+ return sanitize(value);
|
|
|
+ };
|
|
|
|
|
|
- static make(iterable) {
|
|
|
- return new FromIterable(
|
|
|
- typeof iterable == 'object' && iterable.iterable ? iterable : {iterable}
|
|
|
- );
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-FromIterable.fromIterable = FromIterable.make;
|
|
|
-FromIterable.make.Constructor = FromIterable;
|
|
|
+ stream = new Readable(
|
|
|
+ Object.assign({}, {objectMode: true}, options, {
|
|
|
+ read() {
|
|
|
+ resume();
|
|
|
+ }
|
|
|
+ })
|
|
|
+ );
|
|
|
|
|
|
-module.exports = FromIterable;
|
|
|
+ startPump();
|
|
|
+ return stream;
|
|
|
+};
|
|
|
|
|
|
-// to keep ESM happy:
|
|
|
-module.exports.fromIterable = FromIterable.make;
|
|
|
-module.exports.make = FromIterable.make;
|
|
|
+module.exports = fromIterable;
|