main.js 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const {Duplex, Transform} = require('stream');
  4. function processData(result, stream, callback) {
  5. if (result !== undefined && result !== null) {
  6. if (result instanceof Array) {
  7. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  8. } else {
  9. stream.push(result);
  10. }
  11. }
  12. callback && callback(null);
  13. }
  14. class Chain extends EventEmitter {
  15. constructor(fns, skipEvents) {
  16. super();
  17. if (!(fns instanceof Array) || !fns.length) {
  18. throw Error("Chain's argument should be a non-empty array.");
  19. }
  20. this.streams = fns.map((fn, index) => {
  21. let transform, stream;
  22. if (typeof fn === 'function') {
  23. transform = function(chunk, encoding, callback) {
  24. try {
  25. const result = fn.call(this, chunk, encoding);
  26. if (result && typeof result.then == 'function') {
  27. // Promise
  28. result.then(result => processData(result, this, callback), error => callback(error));
  29. } else if (result && typeof result.next == 'function') {
  30. // generator
  31. while (true) {
  32. const data = result.next();
  33. processData(data.value, this);
  34. if (data.done) break;
  35. }
  36. callback(null);
  37. } else {
  38. processData(result, this, callback);
  39. }
  40. } catch (error) {
  41. callback(error);
  42. }
  43. };
  44. } else if (fn instanceof Duplex || fn instanceof Transform) {
  45. stream = fn;
  46. } else {
  47. throw Error('Arguments should be functions or streams.');
  48. }
  49. if (!stream) {
  50. stream = new Transform({objectMode: true, transform});
  51. }
  52. !skipEvents && stream.on('error', error => this.emit('error', error));
  53. return stream;
  54. });
  55. this.input = this.streams[0];
  56. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  57. // connect events
  58. if (!skipEvents) {
  59. this.output.on('data', item => this.emit('data', item));
  60. this.output.on('end', () => this.emit('end'));
  61. }
  62. }
  63. }
  64. module.exports = Chain;