فهرست منبع

Added comp().

Eugene Lazutkin 7 سال پیش
والد
کامیت
4d1489361b
13فایلهای تغییر یافته به همراه310 افزوده شده و 26 حذف شده
  1. 19 3
      README.md
  2. 26 22
      index.js
  3. 190 0
      tests/test_comp.js
  4. 1 1
      tests/test_simple.js
  5. 1 0
      tests/tests.js
  6. 2 0
      utils/Reduce.js
  7. 59 0
      utils/comp.js
  8. 2 0
      utils/fold.js
  9. 2 0
      utils/scan.js
  10. 2 0
      utils/skip.js
  11. 2 0
      utils/skipWhile.js
  12. 2 0
      utils/take.js
  13. 2 0
      utils/takeWhile.js

+ 19 - 3
README.md

@@ -124,7 +124,8 @@ The constructor accepts following arguments:
           }
           x => new Nextable(x)
           ```
-      * Any thrown exception will be catched and passed to a callback function effectively generating an error event.
+          `next()` can return a `Promise` according to the [asynchronous generator](https://zaiste.net/nodejs_10_asynchronous_iteration_async_generators/) protocol.
+      * Any thrown exception will be caught and passed to a callback function effectively generating an error event.
         ```js
         // fails
         x => { throw new Error('Bad!'); }
@@ -138,7 +139,7 @@ The constructor accepts following arguments:
       return x;
     }
     ```
-  * If it is a generator function, each yield or return should produce a regular value.
+  * If it is a generator function, each yield should produce a regular value.
     * In essence, it is covered under "special values" as a function that returns a generator object.
     ```js
     // produces multiple values:
@@ -149,6 +150,20 @@ The constructor accepts following arguments:
       return x;
     }
     ```
+  * *(since 2.2.0)* If it is an asynchronous generator function, each yield should produce a regular value.
+    * In essence, it is covered under "special values" as a function that returns a generator object.
+    ```js
+    // produces multiple values:
+    async function* (x) {
+      for (let i = -1; i <= 1; ++i) {
+        if (i) {
+          await new Promise(resolve => setTimeout(() => resolve(), 50));
+          yield x + i;
+        }
+      }
+      return x;
+    }
+    ```
   * *(since 2.1.0)* If a value is an array, it is assumed an array of regular functions.
     Their values are passed in a chain. All values (including `null`, `undefined`, and arrays) are allowed
     and passed without modifications. The last value is a subject to precessing defined above for regular functions.
@@ -294,7 +309,7 @@ Following static methods are available:
     ]]));
   ```
 * *(since 2.1.0)* `many(array)` is a helper factory function, which is used to wrap arrays to be interpreted as multiple values returned from a function.
-  At the moment it is redundant: you can use a simple array to indicate that, but a naked array is being depricated and in future versions it will be passed as is.
+  At the moment it is redundant: you can use a simple array to indicate that, but a naked array is being deprecated and in future versions it will be passed as is.
   The thinking is that using `many()` is better indicates the intention. Additionally, in the future versions it will be used by array of functions (see above).
   ```js
   const {chain, many} = require('stream-chain');
@@ -306,6 +321,7 @@ Following static methods are available:
 
 ## Release History
 
+- 2.2.0 *Added utilities: `take`, `takeWhile`, `skip`, `skipWhile`, `fold`, `scan`, `Reduce`, `comp`.*
 - 2.1.0 *Added simple transducers, dropped Node 6.*
 - 2.0.3 *Added TypeScript typings and the badge.*
 - 2.0.2 *Workaround for Node 6: use `'finish'` event instead of `_final()`.*

+ 26 - 22
index.js

@@ -10,9 +10,9 @@ function Many(values) {
 }
 
 const processData = (result, stream) => {
-  if (result instanceof Final) {
+  if (result instanceof Chain.Final) {
     result = result.value;
-  } else if (result instanceof Many) {
+  } else if (result instanceof Chain.Many) {
     result = result.values;
   }
   if (result !== undefined && result !== null) {
@@ -24,6 +24,17 @@ const processData = (result, stream) => {
   }
 };
 
+const runAsyncGenerator = async (gen, stream) => {
+  for (;;) {
+    let data = gen.next();
+    if (data && typeof data.then == 'function') {
+      data = await data;
+    }
+    if (data.done) break;
+    processData(data.value, stream);
+  }
+};
+
 const wrapFunction = fn =>
   new Transform({
     writableObjectMode: true,
@@ -32,20 +43,16 @@ const wrapFunction = fn =>
       try {
         const result = fn.call(this, chunk, encoding);
         if (result && typeof result.then == 'function') {
-          // Promise
+          // thenable
           result.then(result => (processData(result, this), callback(null)), error => callback(error));
           return;
         }
         if (result && typeof result.next == 'function') {
           // generator
-          while (true) {
-            const data = result.next();
-            processData(data.value, this);
-            if (data.done) break;
-          }
-        } else {
-          processData(result, this);
+          runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
+          return;
         }
+        processData(result, this);
         callback(null);
       } catch (error) {
         callback(error);
@@ -62,7 +69,7 @@ const wrapArray = array =>
         let value = chunk;
         for (let i = 0; i < array.length; ++i) {
           const result = array[i].call(this, value, encoding);
-          if (result instanceof Final) {
+          if (result instanceof Chain.Final) {
             value = result.value;
             break;
           }
@@ -76,12 +83,6 @@ const wrapArray = array =>
     }
   });
 
-const convertToTransform = fn => {
-  if (typeof fn === 'function') return wrapFunction(fn);
-  if (fn instanceof Array) return fn.length ? wrapArray(fn) : 0;
-  return null;
-};
-
 class Chain extends Duplex {
   constructor(fns, options) {
     super(options || {writableObjectMode: true, readableObjectMode: true});
@@ -91,6 +92,7 @@ class Chain extends Duplex {
     }
 
     this.streams = fns
+      .filter(fn => fn)
       .map((fn, index) => {
         if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
         if (
@@ -136,7 +138,6 @@ class Chain extends Duplex {
     }
   }
   _final(callback) {
-    // unavailable in Node 6
     let error = null;
     try {
       this.input.end(null, null, e => callback(e || error));
@@ -147,14 +148,14 @@ class Chain extends Duplex {
   _read() {
     this.output.resume();
   }
-  static chain(fns, options) {
+  static make(fns, options) {
     return new Chain(fns, options);
   }
   static final(value) {
-    return new Final(value);
+    return new Chain.Final(value);
   }
   static many(values) {
-    return new Many(values);
+    return new Chain.Many(values);
   }
   static convertToTransform(fn) {
     if (typeof fn === 'function') return wrapFunction(fn);
@@ -163,7 +164,10 @@ class Chain extends Duplex {
   }
 }
 
-Chain.make = Chain.chain;
+Chain.Final = Final;
+Chain.Many = Many;
+
+Chain.chain = Chain.make;
 Chain.make.Constructor = Chain;
 
 module.exports = Chain;

+ 190 - 0
tests/test_comp.js

@@ -0,0 +1,190 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../index');
+const {streamFromArray} = require('./helpers');
+
+const comp = require('../utils/comp');
+
+const {final, many} = Chain;
+const none = final();
+
+unit.add(module, [
+  function test_comp(t) {
+    const async = t.startAsync('test_comp');
+
+    const chain = new Chain([comp(x => x * x, x => 2 * x + 1)]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 9, 19])'));
+      async.done();
+    });
+  },
+  function test_compFinal(t) {
+    const async = t.startAsync('test_compFinal');
+
+    const chain = new Chain([comp(x => x * x, x => final(x), x => 2 * x + 1)]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_compNothing(t) {
+    const async = t.startAsync('test_compNothing');
+
+    const chain = new Chain([comp(x => x * x, () => final(), x => 2 * x + 1)]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [])'));
+      async.done();
+    });
+  },
+  function test_compEmpty(t) {
+    const async = t.startAsync('test_compEmpty');
+
+    const chain = new Chain([x => x * x, comp()]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_compAsync(t) {
+    const async = t.startAsync('test_compAsync');
+
+    const chain = new Chain([
+        comp(
+          async x =>
+            await new Promise(resolve => {
+              setTimeout(() => resolve(x * x), 20);
+            }),
+          x => 2 * x + 1
+        )
+      ]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 9, 19])'));
+      async.done();
+    });
+  },
+  function test_compGenerator(t) {
+    const async = t.startAsync('test_compGenerator');
+
+    const chain = new Chain([
+        comp(
+          x => x * x,
+          function*(x) {
+            yield x;
+            yield x + 1;
+            yield x + 2;
+          },
+          x => 2 * x + 1
+        )
+      ]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 5, 7, 9, 11, 13, 19, 21, 23])'));
+      async.done();
+    });
+  },
+  function test_compMany(t) {
+    const async = t.startAsync('test_compMany');
+
+    const chain = new Chain([
+        comp(
+          x => x * x,
+          x => many([x, x + 1, x + 2]),
+          x => 2 * x + 1
+        )
+      ]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 5, 7, 9, 11, 13, 19, 21, 23])'));
+      async.done();
+    });
+  },
+  function test_compCombined(t) {
+    const async = t.startAsync('test_compCombined');
+
+    const chain = new Chain([
+        comp(
+          async x =>
+            await new Promise(resolve => {
+              setTimeout(() => resolve(-x), 20);
+            }),
+          x => many([x, x * 10]),
+          function*(x) {
+            yield x;
+            yield x - 1;
+          },
+          x => -x
+        )
+      ]),
+      output = [];
+
+    streamFromArray([1, 2]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 2, 10, 11, 2, 3, 20, 21])'));
+      async.done();
+    });
+  },
+  function test_compCombinedFinal(t) {
+    const async = t.startAsync('test_compCombinedFinal');
+
+    const chain = new Chain([
+        comp(
+          async x =>
+            await new Promise(resolve => {
+              setTimeout(() => resolve(-x), 20);
+            }),
+          x => many([x, x * 10]),
+          function*(x) {
+            yield x;
+            yield final(x - 1);
+          },
+          x => -x
+        )
+      ]),
+      output = [];
+
+    streamFromArray([1, 2]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, -2, 10, -11, 2, -3, 20, -21])'));
+      async.done();
+    });
+  }
+]);

+ 1 - 1
tests/test_simple.js

@@ -32,7 +32,7 @@ unit.add(module, [
         function*(x) {
           yield x * x;
           yield x * x * x;
-          return 2 * x;
+          yield 2 * x;
         }
       ]),
       output = [];

+ 1 - 0
tests/tests.js

@@ -7,6 +7,7 @@ require('./test_readWrite');
 require('./test_errors');
 
 require('./test_transducers');
+require('./test_comp');
 
 require('./test_take');
 require('./test_skip');

+ 2 - 0
utils/Reduce.js

@@ -1,3 +1,5 @@
+'use strict';
+
 const {Writable} = require('stream');
 
 const defaultInitial = 0;

+ 59 - 0
utils/comp.js

@@ -0,0 +1,59 @@
+'use strict';
+
+const {Final, Many} = require('../index');
+
+const next = async (value, fns, index) => {
+  for (let i = index; i <= fns.length; ++i) {
+    if (value && typeof value.then == 'function') {
+      // thenable
+      value = await value;
+    }
+    if (value instanceof Final) {
+      return value.value;
+    }
+    if (value instanceof Many) {
+      if (i == fns.length) return value;
+      const results = [],
+        values = value.values;
+      for (let j = 0; j < values.length; ++j) {
+        const result = await next(values[j], fns, i);
+        if (result instanceof Many) {
+          results.push(...result.values);
+        } else {
+          results.push(result);
+        }
+      }
+      return new Many(results);
+    }
+    if (value && typeof value.next == 'function') {
+      // generator
+      const results = [];
+      for (;;) {
+        let data = value.next();
+        if (data && typeof data.then == 'function') {
+          data = await data;
+        }
+        if (data.done) break;
+        const result = await next(data.value, fns, i);
+        if (result instanceof Many) {
+          results.push(...result.values);
+        } else {
+          results.push(result);
+        }
+      }
+      return new Many(results);
+    }
+    const fn = fns[i];
+    if (!fn) break;
+    value = fn(value);
+  }
+  return value;
+};
+
+const comp = (...fns) => {
+  fns = fns.filter(fn => fn);
+  if (!fns.length) return null;
+  return async value => next(value, fns, 0);
+};
+
+module.exports = comp;

+ 2 - 0
utils/fold.js

@@ -1,3 +1,5 @@
+'use strict';
+
 const {Transform} = require('stream');
 
 const defaultInitial = 0;

+ 2 - 0
utils/scan.js

@@ -1,3 +1,5 @@
+'use strict';
+
 const {Transform} = require('stream');
 
 const defaultInitial = 0;

+ 2 - 0
utils/skip.js

@@ -1,3 +1,5 @@
+'use strict';
+
 const {Transform} = require('stream');
 
 class Skip extends Transform {

+ 2 - 0
utils/skipWhile.js

@@ -1,3 +1,5 @@
+'use strict';
+
 const {Transform} = require('stream');
 
 const alwaysFalse = () => false;

+ 2 - 0
utils/take.js

@@ -1,3 +1,5 @@
+'use strict';
+
 const {Transform} = require('stream');
 
 class Take extends Transform {

+ 2 - 0
utils/takeWhile.js

@@ -1,3 +1,5 @@
+'use strict';
+
 const {Transform} = require('stream');
 
 const alwaysTrue = () => true;