|
|
7 years ago | |
|---|---|---|
| tests | 7 years ago | |
| .editorconfig | 7 years ago | |
| .gitignore | 7 years ago | |
| .prettierrc | 7 years ago | |
| .travis.yml | 7 years ago | |
| README.md | 7 years ago | |
| main.js | 7 years ago | |
| package-lock.json | 7 years ago | |
| package.json | 7 years ago |
stream-chain creates a chain of object mode transform streams out of regular functions, asynchronous functions, generator functions, and existing Transform and Duplex object mode streams. It eliminates a boilerplate helping to concentrate on functionality without losing the performance.
It is a lightweight, no-dependencies packages, which is distributed under New BSD license.
const Chain = require('stream-chain');
// the chain will work on a stream of number objects
const chain = new Chain([
// transforms a value
x => x * x,
// returns several values
x => [x - 1, x, x + 1],
// waits for an asynchronous operation
async x => await getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x) {
for (let i = x; i > 0; --i) {
yield i;
}
return 0;
},
// filters out even values
x => x % 2 ? x : null,
// uses an arbitrary transform stream
new Transform({
objectMode: true,
transform(x, _, callback) {
callback(null, x + 1);
}
})
]);
chain.on('data', data => console.log(data));
dataSource.pipe(chain.input);
Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.
While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creaing a pipeline. stream-chain eliminates most of it.
npm i stream-chain
The main module provides a class based on EventEmitter. It chains its arguments in a single pipeline optionally binding common stream events.
The constructor accepts two parameters:
fns is an array of functions or instances of Duplex or Transform streams.
Transform stream is created, which calls this function with two parameters: chunk (an object), and an optional encoding. See documentation for more details on those parameters. The function will be called in context of the created stream.undefined or null, no value shall be passed.then()), it will be waited for. Its result should be a regular value.next()), it will be iterated according to the generator protocol. The results should be regular values.skipEvents is an optional Boolean parameter. If it is false (the default), 'error' events from all streams are forwarded to the created instance, 'data' and 'end' events are forwarded from the last stream of a pipeline. If it is true, no event forwarding is made.