main.js 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const {Duplex, Transform} = require('stream');
  4. const GeneratorFunction = Object.getPrototypeOf(function*() {}).constructor;
  5. const AsyncFunction = Object.getPrototypeOf(async function() {}).constructor;
  6. function processData(result, stream, callback) {
  7. if (result !== undefined && result !== null) {
  8. if (result instanceof Array) {
  9. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  10. } else {
  11. stream.push(result);
  12. }
  13. }
  14. callback && callback(null);
  15. }
  16. class Chain extends EventEmitter {
  17. constructor(fns, skipEvents) {
  18. super();
  19. if (!(fns instanceof Array) || !fns.length) {
  20. throw Error("Chain's argument should be a non-empty array.");
  21. }
  22. this.streams = fns.map((fn, index) => {
  23. let transform, stream;
  24. if (fn instanceof AsyncFunction) {
  25. transform = function(chunk, encoding, callback) {
  26. fn.call(this, chunk, encoding).then(result => processData(result, this, callback), error => callback(error));
  27. };
  28. } else if (fn instanceof GeneratorFunction) {
  29. transform = function(chunk, encoding, callback) {
  30. try {
  31. const generator = fn(chunk, encoding);
  32. while (true) {
  33. const result = generator.next();
  34. processData(result.value, this);
  35. if (result.done) break;
  36. }
  37. callback(null);
  38. } catch (error) {
  39. callback(error);
  40. }
  41. };
  42. } else if (fn instanceof Duplex || fn instanceof Transform) {
  43. stream = fn;
  44. } else if (typeof fn === 'function') {
  45. transform = function(chunk, encoding, callback) {
  46. try {
  47. const result = fn.call(this, chunk, encoding);
  48. processData(result, this, callback);
  49. } catch (error) {
  50. callback(error);
  51. }
  52. };
  53. } else {
  54. throw Error('Arguments should be functions or streams.');
  55. }
  56. if (!stream) {
  57. stream = new Transform({objectMode: true, transform});
  58. }
  59. !skipEvents && stream.on('error', error => this.emit('error', error));
  60. return stream;
  61. });
  62. this.input = this.streams[0];
  63. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  64. // connect events
  65. if (!skipEvents) {
  66. this.output.on('data', item => this.emit('data', item));
  67. this.output.on('end', () => this.emit('end'));
  68. }
  69. }
  70. }
  71. module.exports = Chain;