Sfoglia il codice sorgente

Added utilities.

take, takeWhile, skip, skipWhile, fold, scan, Reduce
Eugene Lazutkin 7 anni fa
parent
commit
646cac8809
13 ha cambiato i file con 385 aggiunte e 12 eliminazioni
  1. 15 10
      index.js
  2. 53 0
      tests/test_fold.js
  3. 40 0
      tests/test_skip.js
  4. 54 0
      tests/test_take.js
  5. 1 2
      tests/test_transducers.js
  6. 6 0
      tests/tests.js
  7. 31 0
      utils/Reduce.js
  8. 30 0
      utils/fold.js
  9. 27 0
      utils/scan.js
  10. 30 0
      utils/skip.js
  11. 30 0
      utils/skipWhile.js
  12. 37 0
      utils/take.js
  13. 31 0
      utils/takeWhile.js

+ 15 - 10
index.js

@@ -76,6 +76,12 @@ const wrapArray = array =>
     }
   });
 
+const convertToTransform = fn => {
+  if (typeof fn === 'function') return wrapFunction(fn);
+  if (fn instanceof Array) return fn.length ? wrapArray(fn) : 0;
+  return null;
+};
+
 class Chain extends Duplex {
   constructor(fns, options) {
     super(options || {writableObjectMode: true, readableObjectMode: true});
@@ -86,16 +92,7 @@ class Chain extends Duplex {
 
     this.streams = fns
       .map((fn, index) => {
-        if (typeof fn === 'function') return wrapFunction(fn);
-        if (fn instanceof Array) {
-          switch (fn.length) {
-            case 0:
-              return null;
-            case 1:
-              return wrapFunction(fn[0]);
-          }
-          return fn.length ? wrapArray(fn) : 0;
-        }
+        if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
         if (
           fn instanceof Duplex ||
           fn instanceof Transform ||
@@ -159,6 +156,14 @@ class Chain extends Duplex {
   static many(values) {
     return new Many(values);
   }
+  static convertToTransform(fn) {
+    if (typeof fn === 'function') return wrapFunction(fn);
+    if (fn instanceof Array) return fn.length ? wrapArray(fn) : 0;
+    return null;
+  }
 }
 
+Chain.make = Chain.chain;
+Chain.make.Constructor = Chain;
+
 module.exports = Chain;

+ 53 - 0
tests/test_fold.js

@@ -0,0 +1,53 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../index');
+const {streamFromArray} = require('./helpers');
+
+const fold = require('../utils/fold');
+const scan = require('../utils/scan');
+const Reduce = require('../utils/Reduce');
+
+unit.add(module, [
+  function test_fold(t) {
+    const async = t.startAsync('test_fold');
+
+    const chain = new Chain([fold((acc, x) => acc + x, 0)]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [6])'));
+      async.done();
+    });
+  },
+  function test_foldScan(t) {
+    const async = t.startAsync('test_foldScan');
+
+    const chain = new Chain([scan((acc, x) => acc + x, 0)]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 3, 6])'));
+      async.done();
+    });
+  },
+  function test_foldReduce(t) {
+    const async = t.startAsync('test_foldReduce');
+
+    const reduce = new Reduce({reducer: (acc, x) => acc + x, initial: 0});
+
+    streamFromArray([1, 2, 3]).pipe(reduce);
+
+    reduce.on('finish', () => {
+      eval(t.TEST('t.unify(reduce.accumulator, 6)'));
+      async.done();
+    });
+  }
+]);

+ 40 - 0
tests/test_skip.js

@@ -0,0 +1,40 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../index');
+const {streamFromArray} = require('./helpers');
+
+const skip = require('../utils/skip');
+const skipWhile = require('../utils/skipWhile');
+
+unit.add(module, [
+  function test_skip(t) {
+    const async = t.startAsync('test_skip');
+
+    const chain = new Chain([skip(2)]),
+      output = [];
+
+    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 4, 5])'));
+      async.done();
+    });
+  },
+  function test_skipWhile(t) {
+    const async = t.startAsync('test_skipWhile');
+
+    const chain = new Chain([skipWhile(x => x != 3)]),
+      output = [];
+
+    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 4, 5])'));
+      async.done();
+    });
+  }
+]);

+ 54 - 0
tests/test_take.js

@@ -0,0 +1,54 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../index');
+const {streamFromArray} = require('./helpers');
+
+const take = require('../utils/take');
+const takeWhile = require('../utils/takeWhile');
+
+unit.add(module, [
+  function test_take(t) {
+    const async = t.startAsync('test_take');
+
+    const chain = new Chain([take(2)]),
+      output = [];
+
+    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 2])'));
+      async.done();
+    });
+  },
+  function test_takeWithSkip(t) {
+    const async = t.startAsync('test_takeWithSkip');
+
+    const chain = new Chain([take({n: 2, skip: 2})]),
+      output = [];
+
+    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 4])'));
+      async.done();
+    });
+  },
+  function test_takeWhile(t) {
+    const async = t.startAsync('test_takeWhile');
+
+    const chain = new Chain([takeWhile(x => x != 3)]),
+      output = [];
+
+    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 2])'));
+      async.done();
+    });
+  }
+]);

+ 1 - 2
tests/test_transducers.js

@@ -3,8 +3,7 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray, streamToArray} = require('./helpers');
-const {Transform} = require('stream');
+const {streamFromArray} = require('./helpers');
 
 unit.add(module, [
   function test_transducers(t) {

+ 6 - 0
tests/tests.js

@@ -5,7 +5,13 @@ const unit = require('heya-unit');
 require('./test_simple');
 require('./test_readWrite');
 require('./test_errors');
+
 require('./test_transducers');
+
+require('./test_take');
+require('./test_skip');
+require('./test_fold');
+
 require('./test_demo');
 
 unit.run();

+ 31 - 0
utils/Reduce.js

@@ -0,0 +1,31 @@
+const {Writable} = require('stream');
+
+const defaultInitial = 0;
+const defaultReducer = (acc, value) => value;
+
+class Reduce extends Writable {
+  constructor(options) {
+    super(Object.assign({}, options, {objectMode: true}));
+    this.accumulator = defaultInitial;
+    this._reducer = defaultReducer;
+    if (options) {
+      'initial' in options && (this.accumulator = options.initial);
+      'reducer' in options && (this._reducer = options.reducer);
+    }
+  }
+  _write(chunk, encoding, callback) {
+    this.accumulator = this._reducer.call(this, this.accumulator, chunk);
+    callback(null);
+  }
+  _writev(chunks, callback) {
+    chunks.forEach(item => (this.accumulator = this._reducer.call(this, this.accumulator, item.chunk)));
+    callback(null);
+  }
+  static make(reducer, initial) {
+    return new Reduce(typeof reducer == 'object' ? reducer : {reducer, initial});
+  }
+}
+Reduce.reduce = Reduce.make;
+Reduce.make.Constructor = Reduce;
+
+module.exports = Reduce;

+ 30 - 0
utils/fold.js

@@ -0,0 +1,30 @@
+const {Transform} = require('stream');
+
+const defaultInitial = 0;
+const defaultReducer = (acc, value) => value;
+
+class Fold extends Transform {
+  constructor(options) {
+    super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
+    this._accumulator = defaultInitial;
+    this._reducer = defaultReducer;
+    if (options) {
+      'initial' in options && (this._accumulator = options.initial);
+      'reducer' in options && (this._reducer = options.reducer);
+    }
+  }
+  _transform(chunk, encoding, callback) {
+    this._accumulator = this._reducer.call(this, this._accumulator, chunk);
+    callback(null);
+  }
+  _final(callback) {
+    this.push(this._accumulator);
+    callback(null);
+  }
+  static make(reducer, initial) {
+    return new Fold(typeof reducer == 'object' ? reducer : {reducer, initial});
+  }
+}
+Fold.make.Constructor = Fold;
+
+module.exports = Fold.make;

+ 27 - 0
utils/scan.js

@@ -0,0 +1,27 @@
+const {Transform} = require('stream');
+
+const defaultInitial = 0;
+const defaultReducer = (acc, value) => value;
+
+class Scan extends Transform {
+  constructor(options) {
+    super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
+    this._accumulator = defaultInitial;
+    this._reducer = defaultReducer;
+    if (options) {
+      'initial' in options && (this._accumulator = options.initial);
+      'reducer' in options && (this._reducer = options.reducer);
+    }
+  }
+  _transform(chunk, encoding, callback) {
+    this._accumulator = this._reducer.call(this, this._accumulator, chunk);
+    this.push(this._accumulator);
+    callback(null);
+  }
+  static make(reducer, initial) {
+    return new Scan(typeof reducer == 'object' ? reducer : {reducer, initial});
+  }
+}
+Scan.make.Constructor = Scan;
+
+module.exports = Scan.make;

+ 30 - 0
utils/skip.js

@@ -0,0 +1,30 @@
+const {Transform} = require('stream');
+
+class Skip extends Transform {
+  constructor(options) {
+    super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
+    this._n = 0;
+    if (options) {
+      'n' in options && (this._n = options.n);
+    }
+    if (this._n <= 0) {
+      this._transform = this._passThrough;
+    }
+  }
+  _transform(chunk, encoding, callback) {
+    if (--this._n <= 0) {
+      this._transform = this._passThrough;
+    }
+    callback(null);
+  }
+  _passThrough(chunk, encoding, callback) {
+    this.push(chunk);
+    callback(null);
+  }
+  static make(n) {
+    return new Skip(typeof n == 'object' ? n : {n});
+  }
+}
+Skip.make.Constructor = Skip;
+
+module.exports = Skip.make;

+ 30 - 0
utils/skipWhile.js

@@ -0,0 +1,30 @@
+const {Transform} = require('stream');
+
+const alwaysFalse = () => false;
+
+class SkipWhile extends Transform {
+  constructor(options) {
+    super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
+    this._condition = alwaysFalse;
+    if (options) {
+      'condition' in options && (this._condition = options.condition);
+    }
+  }
+  _transform(chunk, encoding, callback) {
+    if (!this._condition.call(this, chunk)) {
+      this._transform = this._passThrough;
+      this.push(chunk);
+    }
+    callback(null);
+  }
+  _passThrough(chunk, encoding, callback) {
+    this.push(chunk);
+    callback(null);
+  }
+  static make(condition) {
+    return new SkipWhile(typeof condition == 'object' ? condition : {condition});
+  }
+}
+SkipWhile.make.Constructor = SkipWhile;
+
+module.exports = SkipWhile.make;

+ 37 - 0
utils/take.js

@@ -0,0 +1,37 @@
+const {Transform} = require('stream');
+
+class Take extends Transform {
+  constructor(options) {
+    super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
+    this._n = this._skip = 0;
+    if (options) {
+      'n' in options && (this._n = options.n);
+      'skip' in options && (this._skip = options.skip);
+    }
+    if (this._skip <= 0) {
+      this._transform = this._n > 0 ? this._countValues : this._doNothing;
+    }
+  }
+  _transform(chunk, encoding, callback) {
+    if (--this._skip <= 0) {
+      this._transform = this._n > 0 ? this._countValues : this._doNothing;
+    }
+    callback(null);
+  }
+  _countValues(chunk, encoding, callback) {
+    if (--this._n <= 0) {
+      this._transform = this._doNothing;
+    }
+    this.push(chunk);
+    callback(null);
+  }
+  _doNothing(chunk, encoding, callback) {
+    callback(null);
+  }
+  static make(n) {
+    return new Take(typeof n == 'object' ? n : {n});
+  }
+}
+Take.make.Constructor = Take;
+
+module.exports = Take.make;

+ 31 - 0
utils/takeWhile.js

@@ -0,0 +1,31 @@
+const {Transform} = require('stream');
+
+const alwaysTrue = () => true;
+
+class TakeWhile extends Transform {
+  constructor(options) {
+    super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
+    this._condition = alwaysTrue;
+    if (options) {
+      'condition' in options && (this._condition = options.condition);
+    }
+  }
+  _transform(chunk, encoding, callback) {
+    if (this._condition.call(this, chunk)) {
+      this.push(chunk);
+    } else {
+      this._transform = this._doNothing;
+    }
+    callback(null);
+  }
+  _doNothing(chunk, encoding, callback) {
+    callback(null);
+  }
+  static make(condition) {
+    return new TakeWhile(typeof condition == 'object' ? condition : {condition});
+  }
+}
+TakeWhile.make.Constructor = TakeWhile;
+
+
+module.exports = TakeWhile.make;