| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- 'use strict';
- const {Duplex} = require('stream');
- const defs = require('./defs');
- const gen = require('./gen');
- const {asStream} = require('./AsStream');
- // is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
- const isReadableNodeStream = obj =>
- obj &&
- typeof obj.pipe === 'function' &&
- typeof obj.on === 'function' &&
- (!obj._writableState ||
- (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
- (!obj._writableState || obj._readableState); // Writable has .pipe.
- const isWritableNodeStream = obj =>
- obj &&
- typeof obj.write === 'function' &&
- typeof obj.on === 'function' &&
- (!obj._readableState ||
- (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false); // Duplex
- const isDuplexNodeStream = obj =>
- obj &&
- typeof obj.pipe === 'function' &&
- obj._readableState &&
- typeof obj.on === 'function' &&
- typeof obj.write === 'function';
- const getIterator = x => {
- if (!x) return null;
- if (typeof x[Symbol.asyncIterator] == 'function') return x[Symbol.asyncIterator].bind(x);
- if (typeof x[Symbol.iterator] == 'function') return x[Symbol.iterator].bind(x);
- return null;
- }
- class Chain extends Duplex {
- constructor(fns, options) {
- super(options || {writableObjectMode: true, readableObjectMode: true});
- if (!(fns instanceof Array) || !fns.length) {
- throw TypeError("Chain's argument should be a non-empty array.");
- }
- this.streams = fns
- .filter(fn => fn)
- .reduce((output, fn, index, fns) => {
- if (
- isDuplexNodeStream(fn) ||
- (!index && isReadableNodeStream(fn)) ||
- (index === fns.length - 1 && isWritableNodeStream(fn))
- ) {
- output.push(fn);
- return output;
- }
- if (typeof fn != 'function') {
- const iterator = getIterator(fn);
- if (!iterator) throw TypeError('Item #' + index + ' is not a proper stream, function, nor iterable.');
- fn = iterator;
- }
- if (!output.length) output.push([]);
- const last = output[output.length - 1];
- if (Array.isArray(last)) {
- last.push(fn);
- } else {
- output.push([fn]);
- }
- return output;
- }, [])
- .map(item => {
- if (Array.isArray(item)) {
- if (!item.length) return null;
- if (item.length == 1) return item[0] && Chain.asStream(item[0]);
- return Chain.asStream(Chain.gen(...item));
- }
- return item;
- })
- .filter(s => s);
- this.input = this.streams[0];
- this.output = this.streams.reduce(
- (output, stream) => (output && output.pipe(stream)) || stream
- );
- if (!isWritableNodeStream(this.input)) {
- this._write = (_1, _2, callback) => callback(null);
- this._final = callback => callback(null); // unavailable in Node 6
- this.input.on('end', () => this.end());
- }
- if (isReadableNodeStream(this.output)) {
- this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
- this.output.on('end', () => this.push(null));
- } else {
- this._read = () => {}; // nop
- this.resume();
- this.output.on('finish', () => this.push(null));
- }
- // connect events
- if (!options || !options.skipEvents) {
- this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
- }
- }
- _write(chunk, encoding, callback) {
- let error = null;
- try {
- this.input.write(chunk, encoding, e => callback(e || error));
- } catch (e) {
- error = e;
- }
- }
- _final(callback) {
- let error = null;
- try {
- this.input.end(null, null, e => callback(e || error));
- } catch (e) {
- error = e;
- }
- }
- _read() {
- this.output.resume();
- }
- static make(fns, options) {
- return new Chain(fns, options);
- }
- }
- Chain.chain = Chain.make;
- Chain.make.Constructor = Chain;
- Chain.gen = gen;
- Chain.asStream = asStream;
- module.exports = Chain;
- // to keep ESM happy:
- module.exports.none = defs.none;
- module.exports.stop = defs.stop;
- module.exports.Stop = defs.Stop;
- module.exports.finalSymbol = defs.finalSymbol;
- module.exports.manySymbol = defs.manySymbol;
- module.exports.flushSymbol = defs.flushSymbol;
- module.exports.finalValue = defs.finalValue;
- module.exports.many = defs.many;
- module.exports.flushable = defs.flushable;
- module.exports.isFinalValue = defs.isFinalValue;
- module.exports.isMany = defs.isMany;
- module.exports.isFlushable = defs.isFlushable;
- module.exports.getFinalValue = defs.getFinalValue;
- module.exports.getManyValues = defs.getManyValues;
- module.exports.final = defs.final;
- module.exports.chain = Chain.make;
- module.exports.gen = gen;
- module.exports.asStream = asStream;
|