main.js 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const {Duplex, Transform} = require('stream');
  4. function processData(result, stream) {
  5. if (result !== undefined && result !== null) {
  6. if (result instanceof Array) {
  7. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  8. } else {
  9. stream.push(result);
  10. }
  11. }
  12. }
  13. class Chain extends EventEmitter {
  14. constructor(fns, skipEvents) {
  15. super();
  16. if (!(fns instanceof Array) || !fns.length) {
  17. throw Error("Chain's argument should be a non-empty array.");
  18. }
  19. this.streams = fns.map(fn => {
  20. if (typeof fn === 'function') {
  21. return new Transform({
  22. objectMode: true,
  23. transform(chunk, encoding, callback) {
  24. try {
  25. const result = fn.call(this, chunk, encoding);
  26. if (result && typeof result.then == 'function') {
  27. // Promise
  28. return result.then(result => (processData(result, this), callback(null)), error => callback(error));
  29. }
  30. if (result && typeof result.next == 'function') {
  31. // generator
  32. while (true) {
  33. const data = result.next();
  34. processData(data.value, this);
  35. if (data.done) break;
  36. }
  37. } else {
  38. processData(result, this);
  39. }
  40. callback(null);
  41. } catch (error) {
  42. callback(error);
  43. }
  44. }
  45. });
  46. }
  47. if (fn instanceof Duplex || fn instanceof Transform) {
  48. return fn;
  49. }
  50. throw Error('Arguments should be functions or streams.');
  51. });
  52. this.input = this.streams[0];
  53. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  54. // connect events
  55. if (!skipEvents) {
  56. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  57. this.output.on('data', item => this.emit('data', item));
  58. this.output.on('end', () => this.emit('end'));
  59. }
  60. }
  61. }
  62. module.exports = Chain;