|
|
7 éve | |
|---|---|---|
| tests | 7 éve | |
| .editorconfig | 7 éve | |
| .gitignore | 7 éve | |
| .prettierrc | 7 éve | |
| .travis.yml | 7 éve | |
| README.md | 7 éve | |
| main.js | 7 éve | |
| package-lock.json | 7 éve | |
| package.json | 7 éve | |
| yarn.lock | 7 éve |
stream-chain creates a chain of object mode transform streams out of regular functions, asynchronous functions, generator functions, and existing Transform and Duplex object mode streams, while properly handling backpressure. It eliminates a boilerplate helping to concentrate on functionality without losing the performance.
Originally stream-chain was used internally with stream-fork and stream-json to create flexible data processing pipelines.
stream-chain is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.
const Chain = require('stream-chain');
const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');
// the chain will work on a stream of number objects
const chain = new Chain([
// transforms a value
x => x * x,
// returns several values
x => [x - 1, x, x + 1],
// waits for an asynchronous operation
async x => await getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x) {
for (let i = x; i > 0; --i) {
yield i;
}
return 0;
},
// filters out even values
x => x % 2 ? x : null,
// uses an arbitrary transform stream
new Transform({
writableObjectMode: true,
transform(x, _, callback) {
// transform to text
callback(null, x.toString());
}
}),
// compress
zlib.createGzip(),
// save to a file
fs.createWriteStream('output.txt.gz')
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain
dataSource.pipe(chain.input);
Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.
While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creaing a pipeline. stream-chain eliminates most of it.
npm i stream-chain
Chain, which is returned by require('stream-chain'), is based on EventEmitter. It chains its dependents in a single pipeline optionally binding common stream events.
Many details about this package can be discovered by looking at test files located in tests/ and in the source code (main.js).
new Chain(fns[, skipEvents])The constructor accepts following arguments:
fns is an array of functions or stream instances.
Transform stream is created, which calls this function with two parameters: chunk (an object), and an optional encoding. See documentation for more details on those parameters. The function will be called in context of the created stream.undefined or null, no value shall be passed.then()), it will be waited for. Its result should be a regular value.next()), it will be iterated according to the generator protocol. The results should be regular values.Transform and Duplex are always Readable and Writable.skipEvents is an optional flag. If it is falsy (the default), 'error' events from all streams are forwarded to the created instance, 'data' and 'end' events are forwarded from the last stream of a pipeline. If it is truthy, no event forwarding is made.
const chain = new Chain([x => x * x, x => [x - 1, x, x + 1]]);
chain.on('error', error => console.error(error));
dataSource.pipe(chain.input);
An instance can be used to attach handlers for stream events.
Following public properties are available:
streams is an array of streams created by a constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are piped sequentially starting from the beginning.input is the beginning of the pipeline. Effectively it is the first item of streams.output is the end of the pipeline. Effectively it is the last item of streams.input and output are helpers that used to combine the procesing pipeline with other streams, which usually provide I/O for the pipeline.
const chain = new Chain([
x => x * x,
x => [x - 1, x, x + 1],
new Transform({
writableObjectMode: true,
transform(chunk, _, callback) {
callback(null, chunk.toString());
}
})
]);
chain.output
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(chain.input);