|
|
@@ -5,40 +5,39 @@ const {Writable} = require('stream');
|
|
|
const defaultInitial = 0;
|
|
|
const defaultReducer = (acc, value) => value;
|
|
|
|
|
|
-class ReduceStream extends Writable {
|
|
|
- constructor(options) {
|
|
|
- super(Object.assign({}, {objectMode: true}, options));
|
|
|
- this.accumulator = defaultInitial;
|
|
|
- this._reducer = defaultReducer;
|
|
|
- if (options) {
|
|
|
- 'initial' in options && (this.accumulator = options.initial);
|
|
|
- 'reducer' in options && (this._reducer = options.reducer);
|
|
|
- }
|
|
|
+const reduceStream = (options, initial) => {
|
|
|
+ if (!options || !options.reducer) {
|
|
|
+ options = {reducer: options, initial};
|
|
|
}
|
|
|
- _write(chunk, encoding, callback) {
|
|
|
- const result = this._reducer.call(this, this.accumulator, chunk);
|
|
|
- if (result && typeof result.then == 'function') {
|
|
|
- result.then(
|
|
|
- value => {
|
|
|
- this.accumulator = value;
|
|
|
- callback(null);
|
|
|
- },
|
|
|
- error => callback(error)
|
|
|
- );
|
|
|
- } else {
|
|
|
- this.accumulator = result;
|
|
|
- callback(null);
|
|
|
- }
|
|
|
- }
|
|
|
- static make(reducer, initial) {
|
|
|
- return new ReduceStream(typeof reducer == 'object' && reducer.reducer ? reducer : {reducer, initial});
|
|
|
+ let accumulator = defaultInitial,
|
|
|
+ reducer = defaultReducer;
|
|
|
+ if (options) {
|
|
|
+ 'initial' in options && (accumulator = options.initial);
|
|
|
+ 'reducer' in options && (reducer = options.reducer);
|
|
|
}
|
|
|
-}
|
|
|
-ReduceStream.reduceStream = ReduceStream.make;
|
|
|
-ReduceStream.make.Constructor = ReduceStream;
|
|
|
|
|
|
-module.exports = ReduceStream;
|
|
|
+ const stream = new Writable(
|
|
|
+ Object.assign({}, {objectMode: true}, options, {
|
|
|
+ write(chunk, encoding, callback) {
|
|
|
+ const result = reducer.call(this, this.accumulator, chunk);
|
|
|
+ if (result && typeof result.then == 'function') {
|
|
|
+ result.then(
|
|
|
+ value => {
|
|
|
+ this.accumulator = value;
|
|
|
+ callback(null);
|
|
|
+ },
|
|
|
+ error => callback(error)
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ this.accumulator = result;
|
|
|
+ callback(null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ );
|
|
|
+ stream.accumulator = accumulator;
|
|
|
+
|
|
|
+ return stream;
|
|
|
+};
|
|
|
|
|
|
-// to keep ESM happy:
|
|
|
-module.exports.reduceStream = ReduceStream.make;
|
|
|
-module.exports.make = ReduceStream.make;
|
|
|
+module.exports = reduceStream;
|