Преглед изворни кода

Added readme, tests + minor improvements.

Eugene Lazutkin пре 7 година
родитељ
комит
bc69f17788
7 измењених фајлова са 262 додато и 11 уклоњено
  1. 65 0
      README.md
  2. 12 8
      main.js
  3. 33 0
      package-lock.json
  4. 6 3
      package.json
  5. 23 0
      tests/helper.js
  6. 116 0
      tests/test_simple.js
  7. 7 0
      tests/tests.js

+ 65 - 0
README.md

@@ -0,0 +1,65 @@
+# stream-chain
+
+[![Build status][travis-image]][travis-url]
+[![Dependencies][deps-image]][deps-url]
+[![devDependencies][dev-deps-image]][dev-deps-url]
+[![NPM version][npm-image]][npm-url]
+
+`stream-chain` is a simple helper to create a chain of object-mode transform streams out of regular functions, asynchronous functions, generator functions, and existing Transform and Duplex object-mode streams.
+
+It is distributed under New BSD license.
+
+## Intro
+
+```js
+const Chain = require('stream-chain');
+
+const chain = new Chain([
+  x => [x - 1, x, x + 1],
+  x => x * x,
+  async x => getFromDatabaseByKey(x),
+  function* (x) {
+    for (let i = x; i > 0; --i) {
+      yield i;
+    }
+    return 0;
+  }
+]);
+chain.on('data', data => console.log(data));
+dataSource.pipe(chain.input);
+```
+
+## Installation
+
+```
+npm i stream-chain
+```
+
+## Documentation
+
+The main module provides a class based on [EventEmitter](https://nodejs.org/dist/latest-v10.x/docs/api/events.html#events_class_eventemitter). It chains its arguments in a single pipeline optionally binding common stream events.
+
+The constructor accepts two parameters:
+
+* `fns` is an array of functions or instances of [Duplex](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_duplex) or [Transform](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_transform) streams.
+  * If a value is a function, a `Transform` stream is created, which calls this function with two parameters: `chunk` (an object), and an optional `encoding`. See [documentation](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_transform_transform_chunk_encoding_callback) for more details on those parameters. The function will be called in the content of created stream.
+    * If it is a regular function, it can return an array of values to pass to the next stream, a single value, `undefined` or `null`. Two latter values indicate that no value should be passed.
+    * If it is a generator function, it can yield and/or return all necessary values. Each yield/return value will be treated like a returned value from a regular function.
+    * If it is an asynchronous function, it will be waited to be resolved, and the resulting value will be treated like a returned value from a regular function.
+    * Any thrown exception will be catched and passed to a callback function effectively generating an error event.
+  * If a value is a valid stream, it is included as is in the pipeline.
+* `skipEvents` is an optional Boolean parameter. If it is `false` (the default), `'error'` events from all streams are forwarded to the created instance, `'data'` and `'end'` events are forwarded from the last stream of a pipeline. If it is `true`, no event forwarding is made.
+  * This parameter is useful for handling non-standard events. In this case the forwarding of events can be done either manually, or in a constructor of a derived class.
+
+## Release History
+
+- 1.0.0 *the initial release.*
+
+[npm-image]:      https://img.shields.io/npm/v/stream-chain.svg
+[npm-url]:        https://npmjs.org/package/stream-chain
+[deps-image]:     https://img.shields.io/david/uhop/stream-chain.svg
+[deps-url]:       https://david-dm.org/uhop/stream-chain
+[dev-deps-image]: https://img.shields.io/david/dev/uhop/stream-chain.svg
+[dev-deps-url]:   https://david-dm.org/uhop/stream-chain?type=dev
+[travis-image]:   https://img.shields.io/travis/uhop/stream-chain.svg
+[travis-url]:     https://travis-ci.org/uhop/stream-chain

+ 12 - 8
main.js

@@ -1,20 +1,20 @@
 'use strict';
 
 const EventEmitter = require('events');
-const {Transform} = require('stream');
+const {Duplex, Transform} = require('stream');
 
 const GeneratorFunction = Object.getPrototypeOf(function*() {}).constructor;
 const AsyncFunction = Object.getPrototypeOf(async function() {}).constructor;
 
 function processData(result, stream, callback) {
-  if (result !== undefined) {
+  if (result !== undefined && result !== null) {
     if (result instanceof Array) {
-      result.forEach(value => value !== undefined && stream.push(value));
+      result.forEach(value => value !== undefined && value !== null && stream.push(value));
     } else {
       stream.push(result);
     }
   }
-  callback && callback();
+  callback && callback(null);
 }
 
 class Chain extends EventEmitter {
@@ -26,7 +26,7 @@ class Chain extends EventEmitter {
     }
 
     this.streams = fns.map((fn, index) => {
-      let transform;
+      let transform, stream;
       if (fn instanceof AsyncFunction) {
         transform = function(chunk, encoding, callback) {
           fn.call(this, chunk, encoding).then(result => processData(result, this, callback), error => callback(error));
@@ -40,11 +40,13 @@ class Chain extends EventEmitter {
               processData(result.value, this);
               if (result.done) break;
             }
-            callback();
+            callback(null);
           } catch (error) {
             callback(error);
           }
         };
+      } else if (fn instanceof Duplex || fn instanceof Transform) {
+        stream = fn;
       } else if (typeof fn === 'function') {
         transform = function(chunk, encoding, callback) {
           try {
@@ -55,9 +57,11 @@ class Chain extends EventEmitter {
           }
         };
       } else {
-        throw Error('Arguments should be functions.');
+        throw Error('Arguments should be functions or streams.');
+      }
+      if (!stream) {
+        stream = new Transform({objectMode: true, transform});
       }
-      const stream = new Transform({objectMode: true, transform});
       !skipEvents && stream.on('error', error => this.emit('error', error));
       return stream;
     });

+ 33 - 0
package-lock.json

@@ -0,0 +1,33 @@
+{
+  "name": "stream-chain",
+  "version": "1.0.0",
+  "lockfileVersion": 1,
+  "requires": true,
+  "dependencies": {
+    "heya-ice": {
+      "version": "0.1.11",
+      "resolved": "https://registry.npmjs.org/heya-ice/-/heya-ice-0.1.11.tgz",
+      "integrity": "sha1-XW2lnGC1nHAjqDRw+26XcddwWEk=",
+      "dev": true
+    },
+    "heya-unify": {
+      "version": "0.2.5",
+      "resolved": "https://registry.npmjs.org/heya-unify/-/heya-unify-0.2.5.tgz",
+      "integrity": "sha1-FsNP1CbETusy+JyrI02qrymvxJs=",
+      "dev": true,
+      "requires": {
+        "heya-ice": ">=0.1"
+      }
+    },
+    "heya-unit": {
+      "version": "0.3.0",
+      "resolved": "https://registry.npmjs.org/heya-unit/-/heya-unit-0.3.0.tgz",
+      "integrity": "sha1-eXR4IIyBnUxbf+NWrEwbhO67ubc=",
+      "dev": true,
+      "requires": {
+        "heya-ice": ">=0.1",
+        "heya-unify": ">=0.2"
+      }
+    }
+  }
+}

+ 6 - 3
package.json

@@ -4,7 +4,7 @@
   "description": "Chain functions as transform streams.",
   "main": "main.js",
   "scripts": {
-    "test": "echo \"Error: no test specified\" && exit 1"
+    "test": "node tests/tests.js"
   },
   "repository": {
     "type": "git",
@@ -14,10 +14,13 @@
     "stream",
     "chain"
   ],
-  "author": "Eugene Lazutkin <eugene.lazutkin@gmail.com>",
+  "author": "Eugene Lazutkin <eugene.lazutkin@gmail.com> (http://lazutkin.com/)",
   "license": "BSD-3-Clause",
   "bugs": {
     "url": "https://github.com/uhop/stream-chain/issues"
   },
-  "homepage": "https://github.com/uhop/stream-chain#readme"
+  "homepage": "https://github.com/uhop/stream-chain#readme",
+  "devDependencies": {
+    "heya-unit": "^0.3.0"
+  }
 }

+ 23 - 0
tests/helper.js

@@ -0,0 +1,23 @@
+'use strict';
+
+const {Readable, Writable} = require('stream');
+
+const streamFromArray = array =>
+  new Readable({
+    objectMode: true,
+    read() {
+      if (isNaN(this.index)) this.index = 0;
+      this.push(this.index < array.length ? array[this.index++] : null);
+    }
+  });
+
+const streamToArray = array =>
+  new Writable({
+    objectMode: true,
+    write(chunk, encoding, callback) {
+      array.push(chunk);
+      callback(null);
+    }
+  });
+
+module.exports = {streamFromArray, streamToArray};

+ 116 - 0
tests/test_simple.js

@@ -0,0 +1,116 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../main');
+const {streamFromArray, streamToArray} = require('./helper');
+const {Transform} = require('stream');
+
+unit.add(module, [
+  function test_simple(t) {
+    const async = t.startAsync('test_simple');
+
+    const chain = new Chain([x => x * x]),
+      output1 = [],
+      output2 = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain.input);
+    chain.output.pipe(streamToArray(output1));
+
+    chain.on('data', value => output2.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output1, [1, 4, 9])'));
+      eval(t.TEST('t.unify(output2, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_gen(t) {
+    const async = t.startAsync('test_gen');
+
+    const chain = new Chain([
+        function*(x) {
+          yield x * x;
+          yield x * x * x;
+          return 2 * x;
+        }
+      ]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain.input);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 1, 2, 4, 8, 4, 9, 27, 6])'));
+      async.done();
+    });
+  },
+  function test_async(t) {
+    const async = t.startAsync('test_async');
+
+    const chain = new Chain([
+        async x =>
+          new Promise(resolve => {
+            setTimeout(() => resolve(x + 1), 20);
+          })
+      ]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain.input);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [2, 3, 4])'));
+      async.done();
+    });
+  },
+  function test_array(t) {
+    const async = t.startAsync('test_array');
+
+    const chain = new Chain([x => [x * x, x * x * x, 2 * x]]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain.input);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 1, 2, 4, 8, 4, 9, 27, 6])'));
+      async.done();
+    });
+  },
+  function test_chain(t) {
+    const async = t.startAsync('test_chain');
+
+    const chain = new Chain([x => x * x, x => 2 * x + 1]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain.input);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 9, 19])'));
+      async.done();
+    });
+  },
+  function test_stream(t) {
+    const async = t.startAsync('test_stream');
+
+    const chain = new Chain([
+        new Transform({
+          objectMode: true,
+          transform(x, _, callback) {
+            callback(null, x * x);
+          }
+        }),
+        x => 2 * x + 1
+      ]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain.input);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 9, 19])'));
+      async.done();
+    });
+  }
+]);

+ 7 - 0
tests/tests.js

@@ -0,0 +1,7 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+require('./test_simple');
+
+unit.run();