|
|
@@ -84,75 +84,85 @@ const wrapFunctions = (fn, index, fns) => {
|
|
|
return chain.asStream(iterator);
|
|
|
};
|
|
|
|
|
|
-let chain = null; // will be assigned later
|
|
|
+// default implementation of required stream methods
|
|
|
+
|
|
|
+const write = (input, chunk, encoding, callback) => {
|
|
|
+ let error = null;
|
|
|
+ try {
|
|
|
+ input.write(chunk, encoding, e => callback(e || error));
|
|
|
+ } catch (e) {
|
|
|
+ error = e;
|
|
|
+ }
|
|
|
+};
|
|
|
|
|
|
-class Chain extends Duplex {
|
|
|
- constructor(fns, options) {
|
|
|
- super(Object.assign({}, {writableObjectMode: true, readableObjectMode: true}, options));
|
|
|
+const final = (input, callback) => {
|
|
|
+ let error = null;
|
|
|
+ try {
|
|
|
+ input.end(null, null, e => callback(e || error));
|
|
|
+ } catch (e) {
|
|
|
+ error = e;
|
|
|
+ }
|
|
|
+};
|
|
|
|
|
|
- if (!(fns instanceof Array) || !fns.length) {
|
|
|
- throw TypeError("Chain's argument should be a non-empty array.");
|
|
|
- }
|
|
|
+const read = output => {
|
|
|
+ output.resume();
|
|
|
+};
|
|
|
|
|
|
- fns = fns.filter(fn => fn); // remove nulls
|
|
|
+// the chain creator
|
|
|
|
|
|
- this.streams = (
|
|
|
+const chain = (fns, options) => {
|
|
|
+ if (!Array.isArray(fns) || !fns.length) {
|
|
|
+ throw TypeError("Chain's argument should be a non-empty array.");
|
|
|
+ }
|
|
|
+
|
|
|
+ fns = fns.filter(fn => fn); // remove nulls
|
|
|
+
|
|
|
+ const streams = (
|
|
|
options && options.noGrouping
|
|
|
? fns.reduce(groupFunctions, []).map(produceStreams)
|
|
|
: fns.map(wrapFunctions)
|
|
|
- ).filter(s => s);
|
|
|
- this.input = this.streams[0];
|
|
|
- this.output = this.streams.reduce(
|
|
|
- (output, stream) => (output && output.pipe(stream)) || stream
|
|
|
- );
|
|
|
-
|
|
|
- if (!isWritableNodeStream(this.input)) {
|
|
|
- this._write = (_1, _2, callback) => callback(null);
|
|
|
- this._final = callback => callback(null); // unavailable in Node 6
|
|
|
- this.input.on('end', () => this.end());
|
|
|
- }
|
|
|
-
|
|
|
- if (isReadableNodeStream(this.output)) {
|
|
|
- this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
|
|
|
- this.output.on('end', () => this.push(null));
|
|
|
- } else {
|
|
|
- this._read = () => {}; // nop
|
|
|
- this.resume();
|
|
|
- this.output.on('finish', () => this.push(null));
|
|
|
- }
|
|
|
-
|
|
|
- // connect events
|
|
|
- if (!options || !options.skipEvents) {
|
|
|
- this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
|
|
|
- }
|
|
|
- }
|
|
|
- _write(chunk, encoding, callback) {
|
|
|
- let error = null;
|
|
|
- try {
|
|
|
- this.input.write(chunk, encoding, e => callback(e || error));
|
|
|
- } catch (e) {
|
|
|
- error = e;
|
|
|
- }
|
|
|
- }
|
|
|
- _final(callback) {
|
|
|
- let error = null;
|
|
|
- try {
|
|
|
- this.input.end(null, null, e => callback(e || error));
|
|
|
- } catch (e) {
|
|
|
- error = e;
|
|
|
- }
|
|
|
+ ).filter(s => s),
|
|
|
+ input = streams[0],
|
|
|
+ output = streams.reduce((output, item) => (output && output.pipe(item)) || item);
|
|
|
+
|
|
|
+ let stream = null; // will be assigned later
|
|
|
+
|
|
|
+ let writeMethod = (chunk, encoding, callback) => write(input, chunk, encoding, callback),
|
|
|
+ finalMethod = callback => final(input, callback),
|
|
|
+ readMethod = () => read(output);
|
|
|
+
|
|
|
+ if (!isWritableNodeStream(input)) {
|
|
|
+ writeMethod = (_1, _2, callback) => callback(null);
|
|
|
+ finalMethod = callback => callback(null); // unavailable in Node 6
|
|
|
+ input.on('end', () => stream.end());
|
|
|
}
|
|
|
- _read() {
|
|
|
- this.output.resume();
|
|
|
+
|
|
|
+ if (isReadableNodeStream(output)) {
|
|
|
+ output.on('data', chunk => !stream.push(chunk) && output.pause());
|
|
|
+ output.on('end', () => stream.push(null));
|
|
|
+ } else {
|
|
|
+ readMethod = () => {}; // nop
|
|
|
+ output.on('finish', () => stream.push(null));
|
|
|
}
|
|
|
- static make(fns, options) {
|
|
|
- return new Chain(fns, options);
|
|
|
+
|
|
|
+ stream = new Duplex(
|
|
|
+ Object.assign({}, {writableObjectMode: true, readableObjectMode: true}, options, {
|
|
|
+ write: writeMethod,
|
|
|
+ final: finalMethod,
|
|
|
+ read: readMethod
|
|
|
+ })
|
|
|
+ );
|
|
|
+ if (!isReadableNodeStream(output)) {
|
|
|
+ stream.resume();
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-Chain.make.Constructor = Chain;
|
|
|
+ // connect events
|
|
|
+ if (!options || !options.skipEvents) {
|
|
|
+ streams.forEach(item => item.on('error', error => stream.emit('error', error)));
|
|
|
+ }
|
|
|
|
|
|
-chain = Chain.make;
|
|
|
+ return stream;
|
|
|
+};
|
|
|
|
|
|
module.exports = chain;
|
|
|
|
|
|
@@ -172,9 +182,7 @@ module.exports.isFlushable = chain.isFlushable = defs.isFlushable;
|
|
|
module.exports.getFinalValue = chain.getFinalValue = defs.getFinalValue;
|
|
|
module.exports.getManyValues = chain.getManyValues = defs.getManyValues;
|
|
|
module.exports.final = chain.final = defs.final;
|
|
|
-module.exports.make = chain.make = Chain.make;
|
|
|
|
|
|
module.exports.chain = chain.chain = chain; // for compatibility with 2.x
|
|
|
-module.exports.Chain = chain.Chain = Chain;
|
|
|
module.exports.gen = chain.gen = gen;
|
|
|
module.exports.asStream = chain.asStream = asStream;
|