main.js 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const {Transform} = require('stream');
  4. const GeneratorFunction = Object.getPrototypeOf(function*() {}).constructor;
  5. const AsyncFunction = Object.getPrototypeOf(async function() {}).constructor;
  6. function processData(result, stream, callback) {
  7. if (result !== undefined) {
  8. if (result instanceof Array) {
  9. result.forEach(value => value !== undefined && stream.push(value));
  10. } else {
  11. stream.push(result);
  12. }
  13. }
  14. callback && callback();
  15. }
  16. class Chain extends EventEmitter {
  17. constructor(fns, skipEvents) {
  18. super();
  19. if (!(fns instanceof Array) || !fns.length) {
  20. throw Error("Chain's argument should be a non-empty array.");
  21. }
  22. this.streams = fns.map((fn, index) => {
  23. let transform;
  24. if (fn instanceof AsyncFunction) {
  25. transform = function(chunk, encoding, callback) {
  26. fn.call(this, chunk, encoding).then(result => processData(result, this, callback), error => callback(error));
  27. };
  28. } else if (fn instanceof GeneratorFunction) {
  29. transform = function(chunk, encoding, callback) {
  30. try {
  31. const generator = fn(chunk, encoding);
  32. while (true) {
  33. const result = generator.next();
  34. processData(result.value, this);
  35. if (result.done) break;
  36. }
  37. callback();
  38. } catch (error) {
  39. callback(error);
  40. }
  41. };
  42. } else if (typeof fn === 'function') {
  43. transform = function(chunk, encoding, callback) {
  44. try {
  45. const result = fn.call(this, chunk, encoding);
  46. processData(result, this, callback);
  47. } catch (error) {
  48. callback(error);
  49. }
  50. };
  51. } else {
  52. throw Error('Arguments should be functions.');
  53. }
  54. const stream = new Transform({objectMode: true, transform});
  55. !skipEvents && stream.on('error', error => this.emit('error', error));
  56. return stream;
  57. });
  58. this.input = this.streams[0];
  59. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  60. // connect events
  61. if (!skipEvents) {
  62. this.output.on('data', item => this.emit('data', item));
  63. this.output.on('end', () => this.emit('end'));
  64. }
  65. }
  66. }
  67. module.exports = Chain;