Explorar el Código

Added async versions of user functions.

Eugene Lazutkin hace 7 años
padre
commit
d39ece57a4
Se han modificado 8 ficheros con 149 adiciones y 23 borrados
  1. 41 1
      tests/test_fold.js
  2. 15 1
      tests/test_skip.js
  3. 15 1
      tests/test_take.js
  4. 13 6
      utils/Reduce.js
  5. 13 2
      utils/fold.js
  6. 15 3
      utils/scan.js
  7. 18 4
      utils/skipWhile.js
  8. 19 5
      utils/takeWhile.js

+ 41 - 1
tests/test_fold.js

@@ -3,7 +3,7 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray} = require('./helpers');
+const {streamFromArray, delay} = require('./helpers');
 
 const fold = require('../utils/fold');
 const scan = require('../utils/scan');
@@ -24,6 +24,20 @@ unit.add(module, [
       async.done();
     });
   },
+  function test_foldAsync(t) {
+    const async = t.startAsync('test_foldAsync');
+
+    const chain = new Chain([fold(delay((acc, x) => acc + x), 0)]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [6])'));
+      async.done();
+    });
+  },
   function test_foldScan(t) {
     const async = t.startAsync('test_foldScan');
 
@@ -38,6 +52,20 @@ unit.add(module, [
       async.done();
     });
   },
+  function test_foldScanAsync(t) {
+    const async = t.startAsync('test_foldScanAsync');
+
+    const chain = new Chain([scan(delay((acc, x) => acc + x), 0)]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 3, 6])'));
+      async.done();
+    });
+  },
   function test_foldReduce(t) {
     const async = t.startAsync('test_foldReduce');
 
@@ -45,6 +73,18 @@ unit.add(module, [
 
     streamFromArray([1, 2, 3]).pipe(reduce);
 
+    reduce.on('finish', () => {
+      eval(t.TEST('t.unify(reduce.accumulator, 6)'));
+      async.done();
+    });
+  },
+  function test_foldReduceAsync(t) {
+    const async = t.startAsync('test_foldReduceAsync');
+
+    const reduce = new Reduce({reducer: delay((acc, x) => acc + x), initial: 0});
+
+    streamFromArray([1, 2, 3]).pipe(reduce);
+
     reduce.on('finish', () => {
       eval(t.TEST('t.unify(reduce.accumulator, 6)'));
       async.done();

+ 15 - 1
tests/test_skip.js

@@ -3,7 +3,7 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray} = require('./helpers');
+const {streamFromArray, delay} = require('./helpers');
 
 const skip = require('../utils/skip');
 const skipWhile = require('../utils/skipWhile');
@@ -31,6 +31,20 @@ unit.add(module, [
 
     streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
 
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 4, 5])'));
+      async.done();
+    });
+  },
+  function test_skipWhileAsync(t) {
+    const async = t.startAsync('test_skipWhileAsync');
+
+    const chain = new Chain([skipWhile(delay(x => x != 3))]),
+      output = [];
+
+    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+
     chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 4, 5])'));

+ 15 - 1
tests/test_take.js

@@ -3,7 +3,7 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray} = require('./helpers');
+const {streamFromArray, delay} = require('./helpers');
 
 const take = require('../utils/take');
 const takeWhile = require('../utils/takeWhile');
@@ -45,6 +45,20 @@ unit.add(module, [
 
     streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
 
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 2])'));
+      async.done();
+    });
+  },
+  function test_takeWhileAsync(t) {
+    const async = t.startAsync('test_takeWhileAsync');
+
+    const chain = new Chain([takeWhile(delay(x => x != 3))]),
+      output = [];
+
+    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+
     chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 2])'));

+ 13 - 6
utils/Reduce.js

@@ -16,12 +16,19 @@ class Reduce extends Writable {
     }
   }
   _write(chunk, encoding, callback) {
-    this.accumulator = this._reducer.call(this, this.accumulator, chunk);
-    callback(null);
-  }
-  _writev(chunks, callback) {
-    chunks.forEach(item => (this.accumulator = this._reducer.call(this, this.accumulator, item.chunk)));
-    callback(null);
+    const result = this._reducer.call(this, this.accumulator, chunk);
+    if (result && typeof result.then == 'function') {
+      result.then(
+        value => {
+          this.accumulator = value;
+          callback(null);
+        },
+        error => callback(error)
+      );
+    } else {
+      this.accumulator = result;
+      callback(null);
+    }
   }
   static make(reducer, initial) {
     return new Reduce(typeof reducer == 'object' ? reducer : {reducer, initial});

+ 13 - 2
utils/fold.js

@@ -16,8 +16,19 @@ class Fold extends Transform {
     }
   }
   _transform(chunk, encoding, callback) {
-    this._accumulator = this._reducer.call(this, this._accumulator, chunk);
-    callback(null);
+    const result = this._reducer.call(this, this._accumulator, chunk);
+    if (result && typeof result.then == 'function') {
+      result.then(
+        value => {
+          this._accumulator = value;
+          callback(null);
+        },
+        error => callback(error)
+      );
+    } else {
+      this._accumulator = result;
+      callback(null);
+    }
   }
   _final(callback) {
     this.push(this._accumulator);

+ 15 - 3
utils/scan.js

@@ -16,9 +16,21 @@ class Scan extends Transform {
     }
   }
   _transform(chunk, encoding, callback) {
-    this._accumulator = this._reducer.call(this, this._accumulator, chunk);
-    this.push(this._accumulator);
-    callback(null);
+    const result = this._reducer.call(this, this._accumulator, chunk);
+    if (result && typeof result.then == 'function') {
+      result.then(
+        value => {
+          this._accumulator = value;
+          this.push(this._accumulator);
+          callback(null);
+        },
+        error => callback(error)
+      );
+    } else {
+      this._accumulator = result;
+      this.push(this._accumulator);
+      callback(null);
+    }
   }
   static make(reducer, initial) {
     return new Scan(typeof reducer == 'object' ? reducer : {reducer, initial});

+ 18 - 4
utils/skipWhile.js

@@ -13,11 +13,25 @@ class SkipWhile extends Transform {
     }
   }
   _transform(chunk, encoding, callback) {
-    if (!this._condition.call(this, chunk)) {
-      this._transform = this._passThrough;
-      this.push(chunk);
+    const result = this._condition.call(this, chunk);
+    if (result && typeof result.then == 'function') {
+      result.then(
+        flag => {
+          if (!flag) {
+            this._transform = this._passThrough;
+            this.push(chunk);
+          }
+          callback(null);
+        },
+        error => callback(error)
+      );
+    } else {
+      if (!result) {
+        this._transform = this._passThrough;
+        this.push(chunk);
+      }
+      callback(null);
     }
-    callback(null);
   }
   _passThrough(chunk, encoding, callback) {
     this.push(chunk);

+ 19 - 5
utils/takeWhile.js

@@ -13,12 +13,27 @@ class TakeWhile extends Transform {
     }
   }
   _transform(chunk, encoding, callback) {
-    if (this._condition.call(this, chunk)) {
-      this.push(chunk);
+    const result = this._condition.call(this, chunk);
+    if (result && typeof result.then == 'function') {
+      result.then(
+        flag => {
+          if (flag) {
+            this.push(chunk);
+          } else {
+            this._transform = this._doNothing;
+          }
+          callback(null);
+        },
+        error => callback(error)
+      );
     } else {
-      this._transform = this._doNothing;
+      if (result) {
+        this.push(chunk);
+      } else {
+        this._transform = this._doNothing;
+      }
+      callback(null);
     }
-    callback(null);
   }
   _doNothing(chunk, encoding, callback) {
     callback(null);
@@ -29,5 +44,4 @@ class TakeWhile extends Transform {
 }
 TakeWhile.make.Constructor = TakeWhile;
 
-
 module.exports = TakeWhile.make;