|
|
@@ -33,7 +33,56 @@ const getIterator = x => {
|
|
|
if (typeof x[Symbol.asyncIterator] == 'function') return x[Symbol.asyncIterator].bind(x);
|
|
|
if (typeof x[Symbol.iterator] == 'function') return x[Symbol.iterator].bind(x);
|
|
|
return null;
|
|
|
-}
|
|
|
+};
|
|
|
+
|
|
|
+const groupFunctions = (output, fn, index, fns) => {
|
|
|
+ if (
|
|
|
+ isDuplexNodeStream(fn) ||
|
|
|
+ (!index && isReadableNodeStream(fn)) ||
|
|
|
+ (index === fns.length - 1 && isWritableNodeStream(fn))
|
|
|
+ ) {
|
|
|
+ output.push(fn);
|
|
|
+ return output;
|
|
|
+ }
|
|
|
+ if (typeof fn != 'function') {
|
|
|
+ const iterator = getIterator(fn);
|
|
|
+ if (!iterator)
|
|
|
+ throw TypeError('Item #' + index + ' is not a proper stream, function, nor iterable.');
|
|
|
+ fn = iterator;
|
|
|
+ }
|
|
|
+ if (!output.length) output.push([]);
|
|
|
+ const last = output[output.length - 1];
|
|
|
+ if (Array.isArray(last)) {
|
|
|
+ last.push(fn);
|
|
|
+ } else {
|
|
|
+ output.push([fn]);
|
|
|
+ }
|
|
|
+ return output;
|
|
|
+};
|
|
|
+
|
|
|
+const produceStreams = item => {
|
|
|
+ if (Array.isArray(item)) {
|
|
|
+ if (!item.length) return null;
|
|
|
+ if (item.length == 1) return item[0] && Chain.asStream(item[0]);
|
|
|
+ return Chain.asStream(Chain.gen(...item));
|
|
|
+ }
|
|
|
+ return item;
|
|
|
+};
|
|
|
+
|
|
|
+const wrapFunctions = (fn, index, fns) => {
|
|
|
+ if (
|
|
|
+ isDuplexNodeStream(fn) ||
|
|
|
+ (!index && isReadableNodeStream(fn)) ||
|
|
|
+ (index === fns.length - 1 && isWritableNodeStream(fn))
|
|
|
+ ) {
|
|
|
+ return fn; // an acceptable stream
|
|
|
+ }
|
|
|
+ if (typeof fn == 'function') return Chain.asStream(fn); // a function
|
|
|
+ const iterator = getIterator(fn);
|
|
|
+ if (!iterator)
|
|
|
+ throw TypeError('Item #' + index + ' is not a proper stream, function, nor iterable.');
|
|
|
+ return Chain.asStream(iterator);
|
|
|
+};
|
|
|
|
|
|
class Chain extends Duplex {
|
|
|
constructor(fns, options) {
|
|
|
@@ -43,40 +92,13 @@ class Chain extends Duplex {
|
|
|
throw TypeError("Chain's argument should be a non-empty array.");
|
|
|
}
|
|
|
|
|
|
- this.streams = fns
|
|
|
- .filter(fn => fn)
|
|
|
- .reduce((output, fn, index, fns) => {
|
|
|
- if (
|
|
|
- isDuplexNodeStream(fn) ||
|
|
|
- (!index && isReadableNodeStream(fn)) ||
|
|
|
- (index === fns.length - 1 && isWritableNodeStream(fn))
|
|
|
- ) {
|
|
|
- output.push(fn);
|
|
|
- return output;
|
|
|
- }
|
|
|
- if (typeof fn != 'function') {
|
|
|
- const iterator = getIterator(fn);
|
|
|
- if (!iterator) throw TypeError('Item #' + index + ' is not a proper stream, function, nor iterable.');
|
|
|
- fn = iterator;
|
|
|
- }
|
|
|
- if (!output.length) output.push([]);
|
|
|
- const last = output[output.length - 1];
|
|
|
- if (Array.isArray(last)) {
|
|
|
- last.push(fn);
|
|
|
- } else {
|
|
|
- output.push([fn]);
|
|
|
- }
|
|
|
- return output;
|
|
|
- }, [])
|
|
|
- .map(item => {
|
|
|
- if (Array.isArray(item)) {
|
|
|
- if (!item.length) return null;
|
|
|
- if (item.length == 1) return item[0] && Chain.asStream(item[0]);
|
|
|
- return Chain.asStream(Chain.gen(...item));
|
|
|
- }
|
|
|
- return item;
|
|
|
- })
|
|
|
- .filter(s => s);
|
|
|
+ fns = fns.filter(fn => fn); // remove nulls
|
|
|
+
|
|
|
+ this.streams = (
|
|
|
+ options && options.noGrouping
|
|
|
+ ? fns.reduce(groupFunctions, []).map(produceStreams)
|
|
|
+ : fns.map(wrapFunctions)
|
|
|
+ ).filter(s => s);
|
|
|
this.input = this.streams[0];
|
|
|
this.output = this.streams.reduce(
|
|
|
(output, stream) => (output && output.pipe(stream)) || stream
|