Нема описа

Eugene Lazutkin b58ae1ce24 Improved documentation, new version. пре 7 година
tests 6c32ce1f3d More tests. пре 7 година
.editorconfig bf54b82c53 Initial commit. пре 7 година
.gitignore bf54b82c53 Initial commit. пре 7 година
.prettierrc bf54b82c53 Initial commit. пре 7 година
.travis.yml f8abf76582 Excluding version 6 - it didn't have async implemented yet. пре 7 година
README.md b58ae1ce24 Improved documentation, new version. пре 7 година
main.js 8b4fb044ec Simplification. пре 7 година
package-lock.json b58ae1ce24 Improved documentation, new version. пре 7 година
package.json b58ae1ce24 Improved documentation, new version. пре 7 година
yarn.lock b58ae1ce24 Improved documentation, new version. пре 7 година

README.md

stream-chain

Build status Dependencies devDependencies NPM version

stream-chain creates a chain of object mode transform streams out of regular functions, asynchronous functions, generator functions, and existing Transform and Duplex object mode streams, while properly handling backpressure. It eliminates a boilerplate helping to concentrate on functionality without losing the performance.

Originally stream-chain was used internally with stream-fork and stream-json to create flexible data processing pipelines.

stream-chain is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.

Intro

const Chain = require('stream-chain');

// the chain will work on a stream of number objects
const chain = new Chain([
  // transforms a value
  x => x * x,
  // returns several values
  x => [x - 1, x, x + 1],
  // waits for an asynchronous operation
  async x => await getTotalFromDatabaseByKey(x),
  // returns multiple values with a generator
  function* (x) {
    for (let i = x; i > 0; --i) {
      yield i;
    }
    return 0;
  },
  // filters out even values
  x => x % 2 ? x : null,
  // uses an arbitrary transform stream
  new Transform({
    objectMode: true,
    transform(x, _, callback) {
      callback(null, x + 1);
    }
  })
]);
chain.on('data', data => console.log(data));
dataSource.pipe(chain.input);

Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.

While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creaing a pipeline. stream-chain eliminates most of it.

Installation

npm i stream-chain

Documentation

Chain, which is returned by require('stream-chain'), is based on EventEmitter. It chains its dependents in a single pipeline optionally binding common stream events.

Many details about this package can be discovered by looking at test files located in tests/ and in the source code (main.js).

Constructor: new Chain(fns[, skipEvents])

The constructor accepts following arguments:

  • fns is an array of functions or stream instances.
    • If a value is a function, a Transform stream is created, which calls this function with two parameters: chunk (an object), and an optional encoding. See documentation for more details on those parameters. The function will be called in context of the created stream.
    • If it is a regular function, it can return:
      • Regular value:
      • Array of values to pass several or zero values to the next stream as they are.
      • Single value.
        • If it is undefined or null, no value shall be passed.
        • Otherwise, the value will be passed to the next stream.
      • Special value:
      • If it is an instance of Promise or "thenable" (an object with a method called then()), it will be waited for. Its result should be a regular value.
      • If it is an instance of a generator or "nextable" (an object with a method called next()), it will be iterated according to the generator protocol. The results should be regular values.
      • Any thrown exception will be catched and passed to a callback function effectively generating an error event.
    • If it is an asynchronous function, it can return a regular value.
    • In essence, it is covered under "special values" as a function that returns a promise.
    • If it is a generator function, each yield or return should produce a regular value.
    • In essence, it is covered under "special values" as a function that returns a generator object.
    • If a value is a valid stream, it is included as is in the pipeline.
    • The very first stream can be Readable.
    • The very last stream can be Writable.
    • Transform or Duplex can go anywhere.
      • Both Transform and Duplex are always Readable and Writable.
  • skipEvents is an optional flag. If it is falsy (the default), 'error' events from all streams are forwarded to the created instance, 'data' and 'end' events are forwarded from the last stream of a pipeline. If it is truthy, no event forwarding is made.
    • It is useful for handling non-standard events. In this case the forwarding of events can be done either manually or in constructor of a derived class.
const chain = new Chain([x => x * x, x => [x - 1, x, x + 1]]);
chain.on('error', error => console.error(error));
dataSource.pipe(chain.input);

An instance can be used to attach handlers for stream events.

Properties

Following public properties are available:

  • streams is an array of streams created by a constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are piped sequentially starting from the beginning.
  • input is the beginning of the pipeline. Effectively it is the first item of streams.
  • output is the end of the pipeline. Effectively it is the last item of streams.

input and output are helpers that used to combine the procesing pipeline with other streams, which usually provide I/O for the pipeline.

Release History

  • 1.0.3 Improved documentation.
  • 1.0.2 Better README.
  • 1.0.1 Fixed the README.
  • 1.0.0 The initial release.