|
|
@@ -39,12 +39,18 @@ const wrapFunction = fn =>
|
|
|
const result = fn.call(this, chunk, encoding);
|
|
|
if (result && typeof result.then == 'function') {
|
|
|
// thenable
|
|
|
- result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
|
|
|
+ result.then(
|
|
|
+ result => (Chain.sanitize(result, this), callback(null)),
|
|
|
+ error => callback(error)
|
|
|
+ );
|
|
|
return;
|
|
|
}
|
|
|
if (result && typeof result.next == 'function') {
|
|
|
// generator
|
|
|
- runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
|
|
|
+ runAsyncGenerator(result, this).then(
|
|
|
+ () => callback(null),
|
|
|
+ error => callback(error)
|
|
|
+ );
|
|
|
return;
|
|
|
}
|
|
|
Chain.sanitize(result, this);
|
|
|
@@ -83,26 +89,21 @@ const wrapArray = fns =>
|
|
|
});
|
|
|
|
|
|
// is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
|
|
|
-const isReadableNodeStream = obj => !!(obj &&
|
|
|
+const isReadableNodeStream = obj =>
|
|
|
+ obj &&
|
|
|
typeof obj.pipe === 'function' &&
|
|
|
typeof obj.on === 'function' &&
|
|
|
(!obj._writableState || (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
|
|
|
- (!obj._writableState || obj._readableState) // Writable has .pipe.
|
|
|
-);
|
|
|
+ (!obj._writableState || obj._readableState); // Writable has .pipe.
|
|
|
|
|
|
-const isWritableNodeStream = obj => !!(
|
|
|
+const isWritableNodeStream = obj =>
|
|
|
obj &&
|
|
|
typeof obj.write === 'function' &&
|
|
|
typeof obj.on === 'function' &&
|
|
|
- (!obj._readableState || (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false) // Duplex
|
|
|
-);
|
|
|
+ (!obj._readableState || (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false); // Duplex
|
|
|
|
|
|
-const isDuplexNodeStream = obj => !!(
|
|
|
- obj &&
|
|
|
- (typeof obj.pipe === 'function' && obj._readableState) &&
|
|
|
- typeof obj.on === 'function' &&
|
|
|
- typeof obj.write === 'function'
|
|
|
-);
|
|
|
+const isDuplexNodeStream = obj =>
|
|
|
+ obj && typeof obj.pipe === 'function' && obj._readableState && typeof obj.on === 'function' && typeof obj.write === 'function';
|
|
|
|
|
|
class Chain extends Duplex {
|
|
|
constructor(fns, options) {
|
|
|
@@ -116,11 +117,7 @@ class Chain extends Duplex {
|
|
|
.filter(fn => fn)
|
|
|
.map((fn, index, fns) => {
|
|
|
if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
|
|
|
- if (
|
|
|
- isDuplexNodeStream(fn) ||
|
|
|
- (!index && isReadableNodeStream(fn)) ||
|
|
|
- (index === fns.length - 1 && isWritableNodeStream(fn))
|
|
|
- ) {
|
|
|
+ if (isDuplexNodeStream(fn) || (!index && isReadableNodeStream(fn)) || (index === fns.length - 1 && isWritableNodeStream(fn))) {
|
|
|
return fn;
|
|
|
}
|
|
|
throw Error('Arguments should be functions, arrays or streams.');
|