|
|
@@ -39,12 +39,18 @@ const wrapFunction = fn =>
|
|
|
const result = fn.call(this, chunk, encoding);
|
|
|
if (result && typeof result.then == 'function') {
|
|
|
// thenable
|
|
|
- result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
|
|
|
+ result.then(
|
|
|
+ result => (Chain.sanitize(result, this), callback(null)),
|
|
|
+ error => callback(error)
|
|
|
+ );
|
|
|
return;
|
|
|
}
|
|
|
if (result && typeof result.next == 'function') {
|
|
|
// generator
|
|
|
- runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
|
|
|
+ runAsyncGenerator(result, this).then(
|
|
|
+ () => callback(null),
|
|
|
+ error => callback(error)
|
|
|
+ );
|
|
|
return;
|
|
|
}
|
|
|
Chain.sanitize(result, this);
|
|
|
@@ -82,6 +88,23 @@ const wrapArray = fns =>
|
|
|
}
|
|
|
});
|
|
|
|
|
|
+// is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
|
|
|
+const isReadableNodeStream = obj =>
|
|
|
+ obj &&
|
|
|
+ typeof obj.pipe === 'function' &&
|
|
|
+ typeof obj.on === 'function' &&
|
|
|
+ (!obj._writableState || (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
|
|
|
+ (!obj._writableState || obj._readableState); // Writable has .pipe.
|
|
|
+
|
|
|
+const isWritableNodeStream = obj =>
|
|
|
+ obj &&
|
|
|
+ typeof obj.write === 'function' &&
|
|
|
+ typeof obj.on === 'function' &&
|
|
|
+ (!obj._readableState || (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false); // Duplex
|
|
|
+
|
|
|
+const isDuplexNodeStream = obj =>
|
|
|
+ obj && typeof obj.pipe === 'function' && obj._readableState && typeof obj.on === 'function' && typeof obj.write === 'function';
|
|
|
+
|
|
|
class Chain extends Duplex {
|
|
|
constructor(fns, options) {
|
|
|
super(options || {writableObjectMode: true, readableObjectMode: true});
|
|
|
@@ -94,12 +117,7 @@ class Chain extends Duplex {
|
|
|
.filter(fn => fn)
|
|
|
.map((fn, index, fns) => {
|
|
|
if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
|
|
|
- if (
|
|
|
- fn instanceof Duplex ||
|
|
|
- fn instanceof Transform ||
|
|
|
- (!index && fn instanceof Readable) ||
|
|
|
- (index === fns.length - 1 && fn instanceof Writable)
|
|
|
- ) {
|
|
|
+ if (isDuplexNodeStream(fn) || (!index && isReadableNodeStream(fn)) || (index === fns.length - 1 && isWritableNodeStream(fn))) {
|
|
|
return fn;
|
|
|
}
|
|
|
throw Error('Arguments should be functions, arrays or streams.');
|
|
|
@@ -108,13 +126,13 @@ class Chain extends Duplex {
|
|
|
this.input = this.streams[0];
|
|
|
this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
|
|
|
|
|
|
- if (!(this.input instanceof Writable)) {
|
|
|
+ if (!isWritableNodeStream(this.input)) {
|
|
|
this._write = (_1, _2, callback) => callback(null);
|
|
|
this._final = callback => callback(null); // unavailable in Node 6
|
|
|
this.input.on('end', () => this.end());
|
|
|
}
|
|
|
|
|
|
- if (this.output instanceof Readable) {
|
|
|
+ if (isReadableNodeStream(this.output)) {
|
|
|
this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
|
|
|
this.output.on('end', () => this.push(null));
|
|
|
} else {
|