|
|
@@ -0,0 +1,75 @@
|
|
|
+'use strict';
|
|
|
+
|
|
|
+const EventEmitter = require('events');
|
|
|
+const {Transform} = require('stream');
|
|
|
+
|
|
|
+const GeneratorFunction = Object.getPrototypeOf(function*() {}).constructor;
|
|
|
+const AsyncFunction = Object.getPrototypeOf(async function() {}).constructor;
|
|
|
+
|
|
|
+function processData(result, stream, callback) {
|
|
|
+ if (result !== undefined) {
|
|
|
+ if (result instanceof Array) {
|
|
|
+ result.forEach(value => value !== undefined && stream.push(value));
|
|
|
+ } else {
|
|
|
+ stream.push(result);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ callback && callback();
|
|
|
+}
|
|
|
+
|
|
|
+class Chain extends EventEmitter {
|
|
|
+ constructor(fns, skipEvents) {
|
|
|
+ super();
|
|
|
+
|
|
|
+ if (!(fns instanceof Array) || !fns.length) {
|
|
|
+ throw Error("Chain's argument should be a non-empty array.");
|
|
|
+ }
|
|
|
+
|
|
|
+ this.streams = fns.map((fn, index) => {
|
|
|
+ let transform;
|
|
|
+ if (fn instanceof AsyncFunction) {
|
|
|
+ transform = function(chunk, encoding, callback) {
|
|
|
+ fn.call(this, chunk, encoding).then(result => processData(result, this, callback), error => callback(error));
|
|
|
+ };
|
|
|
+ } else if (fn instanceof GeneratorFunction) {
|
|
|
+ transform = function(chunk, encoding, callback) {
|
|
|
+ try {
|
|
|
+ const generator = fn(chunk, encoding);
|
|
|
+ while (true) {
|
|
|
+ const result = generator.next();
|
|
|
+ processData(result.value, this);
|
|
|
+ if (result.done) break;
|
|
|
+ }
|
|
|
+ callback();
|
|
|
+ } catch (error) {
|
|
|
+ callback(error);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ } else if (typeof fn === 'function') {
|
|
|
+ transform = function(chunk, encoding, callback) {
|
|
|
+ try {
|
|
|
+ const result = fn.call(this, chunk, encoding);
|
|
|
+ processData(result, this, callback);
|
|
|
+ } catch (error) {
|
|
|
+ callback(error);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ } else {
|
|
|
+ throw Error('Arguments should be functions.');
|
|
|
+ }
|
|
|
+ const stream = new Transform({objectMode: true, transform});
|
|
|
+ !skipEvents && stream.on('error', error => this.emit('error', error));
|
|
|
+ return stream;
|
|
|
+ });
|
|
|
+ this.input = this.streams[0];
|
|
|
+ this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
|
|
|
+
|
|
|
+ // connect events
|
|
|
+ if (!skipEvents) {
|
|
|
+ this.output.on('data', item => this.emit('data', item));
|
|
|
+ this.output.on('end', () => this.emit('end'));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+module.exports = Chain;
|