|
|
@@ -1,17 +1,29 @@
|
|
|
'use strict';
|
|
|
|
|
|
const {Duplex} = require('stream');
|
|
|
-const defs = require('../defs');
|
|
|
+const defs = require('./defs');
|
|
|
|
|
|
-class Stream extends Duplex {
|
|
|
+class AsStream extends Duplex {
|
|
|
static resolved = Promise.resolve();
|
|
|
|
|
|
constructor(fn, options) {
|
|
|
super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
|
|
|
- if (typeof fn != 'function') throw Error('Only function is accepted as the first argument');
|
|
|
- this._fn = fn;
|
|
|
+ if (typeof fn == 'function') {
|
|
|
+ this._fn = fn;
|
|
|
+ } else if (fn) {
|
|
|
+ if (typeof fn[Symbol.asyncIterator] == 'function') {
|
|
|
+ this._fn = fn[Symbol.asyncIterator].bind(fn);
|
|
|
+ } else if (typeof fn[Symbol.iterator] == 'function') {
|
|
|
+ this._fn = fn[Symbol.iterator].bind(fn);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!this._fn)
|
|
|
+ throw TypeError(
|
|
|
+ 'Only a function or an object with an iterator is accepted as the first argument'
|
|
|
+ );
|
|
|
+
|
|
|
// pump variables
|
|
|
- this._paused = Stream.resolved;
|
|
|
+ this._paused = AsStream.resolved;
|
|
|
this._resolvePaused = null;
|
|
|
this._queue = [];
|
|
|
}
|
|
|
@@ -42,7 +54,7 @@ class Stream extends Duplex {
|
|
|
if (!this._resolvePaused) return;
|
|
|
this._resolvePaused();
|
|
|
this._resolvePaused = null;
|
|
|
- this._paused = Stream.resolved;
|
|
|
+ this._paused = AsStream.resolved;
|
|
|
}
|
|
|
_pause() {
|
|
|
if (this._resolvePaused) return;
|
|
|
@@ -125,11 +137,15 @@ class Stream extends Duplex {
|
|
|
}
|
|
|
|
|
|
static make(fn, options) {
|
|
|
- return new Stream(fn, options);
|
|
|
+ return new AsStream(fn, options);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-Stream.stream = Stream.make;
|
|
|
-Stream.make.Constructor = Stream;
|
|
|
+AsStream.stream = AsStream.make;
|
|
|
+AsStream.make.Constructor = AsStream;
|
|
|
+
|
|
|
+module.exports = AsStream;
|
|
|
|
|
|
-module.exports = Stream;
|
|
|
+// to keep ESM happy:
|
|
|
+module.exports.asStream = AsStream.make;
|
|
|
+module.exports.make = AsStream.make;
|