|
|
před 7 roky | |
|---|---|---|
| tests | před 7 roky | |
| .editorconfig | před 7 roky | |
| .gitignore | před 7 roky | |
| .prettierrc | před 7 roky | |
| .travis.yml | před 7 roky | |
| README.md | před 7 roky | |
| main.js | před 7 roky | |
| package-lock.json | před 7 roky | |
| package.json | před 7 roky | |
| yarn.lock | před 7 roky |
stream-chain creates a chain of object mode transform streams out of regular functions, asynchronous functions, generator functions, and existing Transform and Duplex object mode streams, while properly handling backpressure. The resulting chain is represented as a Duplex stream, which can be combined with other streams the usual way. It eliminates a boilerplate helping to concentrate on functionality without losing the performance.
Originally stream-chain was used internally with stream-fork and stream-json to create flexible data processing pipelines.
stream-chain is a lightweight, no-dependencies micro-package. It is distributed under New BSD license.
const Chain = require('stream-chain');
const fs = require('fs');
const zlib = require('zlib');
const {Transform} = require('stream');
// the chain will work on a stream of number objects
const chain = new Chain([
// transforms a value
x => x * x,
// returns several values
x => [x - 1, x, x + 1],
// waits for an asynchronous operation
async x => await getTotalFromDatabaseByKey(x),
// returns multiple values with a generator
function* (x) {
for (let i = x; i > 0; --i) {
yield i;
}
return 0;
},
// filters out even values
x => x % 2 ? x : null,
// uses an arbitrary transform stream
new Transform({
writableObjectMode: true,
transform(x, _, callback) {
// transform to text
callback(null, x.toString());
}
}),
// compress
zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));
Making processing pipelines appears to be easy: just chain functions one after another, and we are done. Real life pipelines filter objects out and/or produce more objects out of a few ones. On top of that we have to deal with asynchronous operations, while processing or producing data: networking, databases, files, user responses, and so on. Unequal number of values per stage, and unequal throughput of stages introduced problems like backpressure, which requires algorithms implemented by streams.
While a lot of API improvements were made to make streams easy to use, in reality, a lot of boilerplate is required when creaing a pipeline. stream-chain eliminates most of it.
npm i --save stream-chain
Chain, which is returned by require('stream-chain'), is based on Duplex. It chains its dependents in a single pipeline optionally binding error events.
Many details about this package can be discovered by looking at test files located in tests/ and in the source code (main.js).
new Chain(fns[, options])The constructor accepts following arguments:
fns is an array of functions or stream instances.
Transform stream is created, which calls this function with two parameters: chunk (an object), and an optional encoding. See Node's documentation for more details on those parameters. The function will be called in context of the created stream.js
// produces no values:
x => []
// produces two values:
x => [x, x + 1]
// produces one array value:
x => [[x, x + 1]]
undefined or null, no value shall be passed.js
// produces no values:
x => null
x => undefined
// produces one value:
x => x
then()), it will be waited for. Its result should be a regular value.
js
// delays by 0.5s:
x => new Promise(resolve => setTimeout(() => resolve(x), 500))
next()), it will be iterated according to the generator protocol. The results should be regular values.
js
// produces multiple values:
class Nextable {
constructor(x) { this.x = x; this.i = -1; }
next() { return {done: this.i <= 1, value: this.x + this.i++}; }
}
x => new Nextable(x)
js
// fails
x => { throw new Error('Bad!'); }
js
// delays by 0.5s:
async x => {
await new Promise(resolve => setTimeout(() => resolve(), 500));
return x;
}
js
// produces multiple values:
function* (x) {
for (let i = -1; i <= 1; ++i) {
if (i) yield x + i;
}
return x;
}
Chain instance ignores all possible writes to the front, and ends when the first stream ends.Chain instance does not produce any output, and finishes when the last stream finishes.'data' event is not used in this case, the instance resumes itself automatically. Read about it in Node's documentation:options is an optional object detailed in the Node's documentation.
options is not specified, or falsy, it is assumed to be:
js
{writableObjectMode: true, readableObjectMode: true}
writableObjectMode is the same as the corresponding object mode of the first stream, and readableObjectMode is the same as the corresponding object mode of the last stream.skipEvents is an optional flag. If it is falsy (the default), 'error' events from all streams are forwarded to the created instance. If it is truthy, no event forwarding is made. A user can always do so externally or in a constructor of derived classes.An instance can be used to attach handlers for stream events.
const chain = new Chain([x => x * x, x => [x - 1, x, x + 1]]);
chain.on('error', error => console.error(error));
dataSource.pipe(chain);
Following public properties are available:
streams is an array of streams created by a constructor. Its values either Transform streams that use corresponding functions from a constructor parameter, or user-provided streams. All streams are piped sequentially starting from the beginning.input is the beginning of the pipeline. Effectively it is the first item of streams.output is the end of the pipeline. Effectively it is the last item of streams.Generally, a Chain instance should be used to represent a chain:
const chain = new Chain([
x => x * x,
x => [x - 1, x, x + 1],
new Transform({
writableObjectMode: true,
transform(chunk, _, callback) {
callback(null, chunk.toString());
}
})
]);
dataSource
.pipe(chain);
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
But in some cases input and output provide a better control over how a data processing pipeline should be organized:
chain.output
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('output.txt.gz'));
dataSource.pipe(chain.input);
Please select what style you want to use, and never mix them together with the same object.
Following static methods are available:
chain(fns[, options) is a factory function, which has the same arguments as the constructor and returns a Chain instance.
```js
const {chain} = require('stream-chain');// simple dataSource
.pipe(chain([x => x * x, x => [x - 1, x, x + 1]]));
// all inclusive chain([
dataSource,
x => x * x,
x => [x - 1, x, x + 1],
zlib.createGzip(),
fs.createWriteStream('output.txt.gz')
]) ```