|
|
@@ -82,6 +82,28 @@ const wrapArray = fns =>
|
|
|
}
|
|
|
});
|
|
|
|
|
|
+// is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
|
|
|
+const isReadableNodeStream = obj => !!(obj &&
|
|
|
+ typeof obj.pipe === 'function' &&
|
|
|
+ typeof obj.on === 'function' &&
|
|
|
+ (!obj._writableState || (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
|
|
|
+ (!obj._writableState || obj._readableState) // Writable has .pipe.
|
|
|
+);
|
|
|
+
|
|
|
+const isWritableNodeStream = obj => !!(
|
|
|
+ obj &&
|
|
|
+ typeof obj.write === 'function' &&
|
|
|
+ typeof obj.on === 'function' &&
|
|
|
+ (!obj._readableState || (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false) // Duplex
|
|
|
+);
|
|
|
+
|
|
|
+const isDuplexNodeStream = obj => !!(
|
|
|
+ obj &&
|
|
|
+ (typeof obj.pipe === 'function' && obj._readableState) &&
|
|
|
+ typeof obj.on === 'function' &&
|
|
|
+ typeof obj.write === 'function'
|
|
|
+);
|
|
|
+
|
|
|
class Chain extends Duplex {
|
|
|
constructor(fns, options) {
|
|
|
super(options || {writableObjectMode: true, readableObjectMode: true});
|
|
|
@@ -95,10 +117,9 @@ class Chain extends Duplex {
|
|
|
.map((fn, index, fns) => {
|
|
|
if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
|
|
|
if (
|
|
|
- fn instanceof Duplex ||
|
|
|
- fn instanceof Transform ||
|
|
|
- (!index && fn instanceof Readable) ||
|
|
|
- (index === fns.length - 1 && fn instanceof Writable)
|
|
|
+ isDuplexNodeStream(fn) ||
|
|
|
+ (!index && isReadableNodeStream(fn)) ||
|
|
|
+ (index === fns.length - 1 && isWritableNodeStream(fn))
|
|
|
) {
|
|
|
return fn;
|
|
|
}
|
|
|
@@ -108,13 +129,13 @@ class Chain extends Duplex {
|
|
|
this.input = this.streams[0];
|
|
|
this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
|
|
|
|
|
|
- if (!(this.input instanceof Writable)) {
|
|
|
+ if (!isWritableNodeStream(this.input)) {
|
|
|
this._write = (_1, _2, callback) => callback(null);
|
|
|
this._final = callback => callback(null); // unavailable in Node 6
|
|
|
this.input.on('end', () => this.end());
|
|
|
}
|
|
|
|
|
|
- if (this.output instanceof Readable) {
|
|
|
+ if (isReadableNodeStream(this.output)) {
|
|
|
this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
|
|
|
this.output.on('end', () => this.push(null));
|
|
|
} else {
|