Procházet zdrojové kódy

Added simple transducers.

Eugene Lazutkin před 7 roky
rodič
revize
8f665795d0
6 změnil soubory, kde provedl 284 přidání a 57 odebrání
  1. 81 4
      README.md
  2. 105 52
      index.js
  3. 3 1
      tests/test_readWrite.js
  4. 14 0
      tests/test_simple.js
  5. 80 0
      tests/test_transducers.js
  6. 1 0
      tests/tests.js

+ 81 - 4
README.md

@@ -81,7 +81,7 @@ The constructor accepts following arguments:
   * If a value is a function, a [Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform) stream is created, which calls this function with two parameters: `chunk` (an object), and an optional `encoding`. See [Node's documentation](https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback) for more details on those parameters. The function will be called in context of the created stream.
     * If it is a regular function, it can return:
       * Regular value:
-        * Array of values to pass several or zero values to the next stream as they are.
+        * *(depricated since 2.1.0)* Array of values to pass several or zero values to the next stream as they are.
           ```js
           // produces no values:
           x => []
@@ -104,14 +104,23 @@ The constructor accepts following arguments:
         * If it is an instance of [Promise](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise) or "thenable" (an object with a method called `then()`), it will be waited for. Its result should be a regular value.
           ```js
           // delays by 0.5s:
-          x => new Promise(resolve => setTimeout(() => resolve(x), 500))
+          x => new Promise(
+            resolve => setTimeout(() => resolve(x), 500))
           ```
         * If it is an instance of a generator or "nextable" (an object with a method called `next()`), it will be iterated according to the [generator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Generator) protocol. The results should be regular values.
           ```js
           // produces multiple values:
           class Nextable {
-            constructor(x) { this.x = x; this.i = -1; }
-            next() { return {done: this.i <= 1, value: this.x + this.i++}; }
+            constructor(x) {
+              this.x = x;
+              this.i = -1;
+            }
+            next() {
+              return {
+                done:  this.i <= 1,
+                value: this.x + this.i++
+              };
+            }
           }
           x => new Nextable(x)
           ```
@@ -140,6 +149,13 @@ The constructor accepts following arguments:
       return x;
     }
     ```
+  * *(since 2.1.0)* If a value is an array, it is assumed an array of regular functions.
+    Their values are passed in a chain. All values (including `null`, `undefined`, and arrays) are allowed
+    and passed without modifications. The last value is a subject to precessing defined above for regular functions.
+    * Empty arrays are ignored.
+    * If any function returns a value produced by `Chain.final(value)` (see below), it terminates the chain using
+      `value` as the final value of the chain.
+    * This feature bypasses streams. It is implemented for performance reasons.
   * If a value is a valid stream, it is included as is in the pipeline.
     * [Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform).
     * [Duplex](https://nodejs.org/api/stream.html#stream_class_stream_duplex).
@@ -227,9 +243,70 @@ Following static methods are available:
     fs.createWriteStream('output.txt.gz')
   ])
   ```
+* *(since 2.1.0)* `final(value)` is a helper factory function, which can be used in by chained functions (see above the array of functions).
+  It returns a special value, which terminates the chain and uses the passed value as the result of the chain.
+  ```js
+  const {chain, final} = require('stream-chain');
+
+  // simple
+  dataSource
+    .pipe(chain([[x => x * x, x => 2 * x + 1]]));
+  // faster than [x => x * x, x => 2 * x + 1]
+
+  // final
+  dataSource
+    .pipe(chain([[
+      x => x * x,
+      x => final(x),
+      x => 2 * x + 1
+    ]]));
+  // the same as [[x => x * x, x => x]]
+  // the same as [[x => x * x]]
+  // the same as [x => x * x]
+
+  // final as a terminator
+  dataSource
+    .pipe(chain([[
+      x => x * x,
+      x => final(),
+      x => 2 * x + 1
+    ]]));
+  // produces no values, because the final value is undefined,
+  // which is interpreted as "no value shall be passed"
+  // see the doc above
+
+  // final() as a filter
+  dataSource
+    .pipe(chain([[
+      x => x * x,
+      x => x % 2 ? final() : x,
+      x => 2 * x + 1
+    ]]));
+  // only even values are passed, odd values are ignored
+
+  // if you want to be really performant...
+  const none = final();
+  dataSource
+    .pipe(chain([[
+      x => x * x,
+      x => x % 2 ? none : x,
+      x => 2 * x + 1
+    ]]));
+  ```
+* *(since 2.1.0)* `many(array)` is a helper factory function, which is used to wrap arrays to be interpreted as multiple values returned from a function.
+  At the moment it is redundant: you can use a simple array to indicate that, but a naked array is being depricated and in future versions it will be passed as is.
+  The thinking is that using `many()` is better indicates the intention. Additionally, in the future versions it will be used by array of functions (see above).
+  ```js
+  const {chain, many} = require('stream-chain');
+
+  dataSource
+    .pipe(chain([x => many([x, x + 1, x + 2])]));
+  // currently the same as [x => [x, x + 1, x + 2]]
+  ```
 
 ## Release History
 
+- 2.1.0 *Added simple transducers, dropped Node 6.*
 - 2.0.3 *Added TypeScript typings and the badge.*
 - 2.0.2 *Workaround for Node 6: use `'finish'` event instead of `_final()`.*
 - 2.0.1 *Improved documentation.*

+ 105 - 52
index.js

@@ -2,7 +2,19 @@
 
 const {Readable, Writable, Duplex, Transform} = require('stream');
 
-function processData(result, stream) {
+function Final(value) {
+  this.value = value;
+}
+function Many(values) {
+  this.values = values;
+}
+
+const processData = (result, stream) => {
+  if (result instanceof Final) {
+    result = result.value;
+  } else if (result instanceof Many) {
+    result = result.values;
+  }
   if (result !== undefined && result !== null) {
     if (result instanceof Array) {
       result.forEach(value => value !== undefined && value !== null && stream.push(value));
@@ -10,7 +22,59 @@ function processData(result, stream) {
       stream.push(result);
     }
   }
-}
+};
+
+const wrapFunction = fn =>
+  new Transform({
+    writableObjectMode: true,
+    readableObjectMode: true,
+    transform(chunk, encoding, callback) {
+      try {
+        const result = fn.call(this, chunk, encoding);
+        if (result && typeof result.then == 'function') {
+          // Promise
+          result.then(result => (processData(result, this), callback(null)), error => callback(error));
+          return;
+        }
+        if (result && typeof result.next == 'function') {
+          // generator
+          while (true) {
+            const data = result.next();
+            processData(data.value, this);
+            if (data.done) break;
+          }
+        } else {
+          processData(result, this);
+        }
+        callback(null);
+      } catch (error) {
+        callback(error);
+      }
+    }
+  });
+
+const wrapArray = array =>
+  new Transform({
+    writableObjectMode: true,
+    readableObjectMode: true,
+    transform(chunk, encoding, callback) {
+      try {
+        let value = chunk;
+        for (let i = 0; i < array.length; ++i) {
+          const result = array[i].call(this, value, encoding);
+          if (result instanceof Final) {
+            value = result.value;
+            break;
+          }
+          value = result;
+        }
+        processData(value, this);
+        callback(null);
+      } catch (error) {
+        callback(error);
+      }
+    }
+  });
 
 class Chain extends Duplex {
   constructor(fns, options) {
@@ -20,53 +84,35 @@ class Chain extends Duplex {
       throw Error("Chain's argument should be a non-empty array.");
     }
 
-    this.streams = fns.map((fn, index) => {
-      if (typeof fn === 'function') {
-        return new Transform({
-          writableObjectMode: true,
-          readableObjectMode: true,
-          transform(chunk, encoding, callback) {
-            try {
-              const result = fn.call(this, chunk, encoding);
-              if (result && typeof result.then == 'function') {
-                // Promise
-                return result.then(result => (processData(result, this), callback(null)), error => callback(error));
-              }
-              if (result && typeof result.next == 'function') {
-                // generator
-                while (true) {
-                  const data = result.next();
-                  processData(data.value, this);
-                  if (data.done) break;
-                }
-              } else {
-                processData(result, this);
-              }
-              callback(null);
-            } catch (error) {
-              callback(error);
-            }
+    this.streams = fns
+      .map((fn, index) => {
+        if (typeof fn === 'function') return wrapFunction(fn);
+        if (fn instanceof Array) {
+          switch (fn.length) {
+            case 0:
+              return null;
+            case 1:
+              return wrapFunction(fn[0]);
           }
-        });
-      }
-      if (
-        fn instanceof Duplex ||
-        fn instanceof Transform ||
-        (!index && fn instanceof Readable) ||
-        (index === fns.length - 1 && fn instanceof Writable)
-      ) {
-        return fn;
-      }
-      throw Error('Arguments should be functions or streams.');
-    });
+          return fn.length ? wrapArray(fn) : 0;
+        }
+        if (
+          fn instanceof Duplex ||
+          fn instanceof Transform ||
+          (!index && fn instanceof Readable) ||
+          (index === fns.length - 1 && fn instanceof Writable)
+        ) {
+          return fn;
+        }
+        throw Error('Arguments should be functions or streams.');
+      })
+      .filter(s => s);
     this.input = this.streams[0];
     this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
 
-    if (this.input instanceof Writable) {
-      this.on('finish', () => this.input.end(null, null)); // for Node 6
-    } else {
+    if (!(this.input instanceof Writable)) {
       this._write = (_1, _2, callback) => callback(null);
-      // this._final = callback => callback(null); // unavailable in Node 6
+      this._final = callback => callback(null); // unavailable in Node 6
       this.input.on('end', () => this.end());
     }
 
@@ -92,20 +138,27 @@ class Chain extends Duplex {
       error = e;
     }
   }
-  // _final(callback) { // unavailable in Node 6
-  //   let error = null;
-  //   try {
-  //     this.input.end(null, null, e => callback(e || error));
-  //   } catch (e) {
-  //     error = e;
-  //   }
-  // }
+  _final(callback) {
+    // unavailable in Node 6
+    let error = null;
+    try {
+      this.input.end(null, null, e => callback(e || error));
+    } catch (e) {
+      error = e;
+    }
+  }
   _read() {
     this.output.resume();
   }
   static chain(fns, options) {
     return new Chain(fns, options);
   }
+  static final(value) {
+    return new Final(value);
+  }
+  static many(values) {
+    return new Many(values);
+  }
 }
 
 module.exports = Chain;

+ 3 - 1
tests/test_readWrite.js

@@ -82,7 +82,9 @@ unit.add(module, [
       output2 = [],
       chain = new Chain([streamFromArray([1, 2, 3]), streamToArray(output1)]);
 
-    streamFromArray([4, 5, 6]).pipe(chain).pipe(streamToArray(output2));
+    streamFromArray([4, 5, 6])
+      .pipe(chain)
+      .pipe(streamToArray(output2));
 
     chain.on('end', () => {
       eval(t.TEST('t.unify(output1, [1, 2, 3])'));

+ 14 - 0
tests/test_simple.js

@@ -78,6 +78,20 @@ unit.add(module, [
       async.done();
     });
   },
+  function test_simpleMany(t) {
+    const async = t.startAsync('test_simpleMany');
+
+    const chain = new Chain([x => Chain.many([x * x, x * x * x, 2 * x])]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 1, 2, 4, 8, 4, 9, 27, 6])'));
+      async.done();
+    });
+  },
   function test_simpleChain(t) {
     const async = t.startAsync('test_simpleChain');
 

+ 80 - 0
tests/test_transducers.js

@@ -0,0 +1,80 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../index');
+const {streamFromArray, streamToArray} = require('./helpers');
+const {Transform} = require('stream');
+
+unit.add(module, [
+  function test_transducers(t) {
+    const async = t.startAsync('test_transducers');
+
+    const chain = new Chain([[x => x * x, x => 2 * x + 1]]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 9, 19])'));
+      async.done();
+    });
+  },
+  function test_transducersFinal(t) {
+    const async = t.startAsync('test_transducersFinal');
+
+    const chain = new Chain([[x => x * x, x => Chain.final(x), x => 2 * x + 1]]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_transducersNothing(t) {
+    const async = t.startAsync('test_transducersNothing');
+
+    const chain = new Chain([[x => x * x, () => Chain.final(), x => 2 * x + 1]]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [])'));
+      async.done();
+    });
+  },
+  function test_transducersEmpty(t) {
+    const async = t.startAsync('test_transducersEmpty');
+
+    const chain = new Chain([x => x * x, []]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_transducersOne(t) {
+    const async = t.startAsync('test_transducersOne');
+
+    const chain = new Chain([x => x * x, [x => 2 * x + 1]]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 9, 19])'));
+      async.done();
+    });
+  }
+]);

+ 1 - 0
tests/tests.js

@@ -5,6 +5,7 @@ const unit = require('heya-unit');
 require('./test_simple');
 require('./test_readWrite');
 require('./test_errors');
+require('./test_transducers');
 require('./test_demo');
 
 unit.run();