main.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. function processData(result, stream) {
  4. if (result !== undefined && result !== null) {
  5. if (result instanceof Array) {
  6. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  7. } else {
  8. stream.push(result);
  9. }
  10. }
  11. }
  12. class Chain extends Duplex {
  13. constructor(fns, options) {
  14. super(options || {writableObjectMode: true, readableObjectMode: true});
  15. if (!(fns instanceof Array) || !fns.length) {
  16. throw Error("Chain's argument should be a non-empty array.");
  17. }
  18. this.streams = fns.map((fn, index) => {
  19. if (typeof fn === 'function') {
  20. return new Transform({
  21. writableObjectMode: true,
  22. readableObjectMode: true,
  23. transform(chunk, encoding, callback) {
  24. try {
  25. const result = fn.call(this, chunk, encoding);
  26. if (result && typeof result.then == 'function') {
  27. // Promise
  28. return result.then(result => (processData(result, this), callback(null)), error => callback(error));
  29. }
  30. if (result && typeof result.next == 'function') {
  31. // generator
  32. while (true) {
  33. const data = result.next();
  34. processData(data.value, this);
  35. if (data.done) break;
  36. }
  37. } else {
  38. processData(result, this);
  39. }
  40. callback(null);
  41. } catch (error) {
  42. callback(error);
  43. }
  44. }
  45. });
  46. }
  47. if (
  48. fn instanceof Duplex ||
  49. fn instanceof Transform ||
  50. (!index && fn instanceof Readable) ||
  51. (index === fns.length - 1 && fn instanceof Writable)
  52. ) {
  53. return fn;
  54. }
  55. throw Error('Arguments should be functions or streams.');
  56. });
  57. this.input = this.streams[0];
  58. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  59. if (this.input instanceof Writable) {
  60. this.on('finish', () => this.input.end(null, null)); // for Node 6
  61. } else {
  62. this._write = (_1, _2, callback) => callback(null);
  63. // this._final = callback => callback(null); // unavailable in Node 6
  64. this.input.on('end', () => this.end());
  65. }
  66. if (this.output instanceof Readable) {
  67. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  68. this.output.on('end', () => this.push(null));
  69. } else {
  70. this._read = () => {}; // nop
  71. this.resume();
  72. this.output.on('finish', () => this.push(null));
  73. }
  74. // connect events
  75. if (!options || !options.skipEvents) {
  76. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  77. }
  78. }
  79. _write(chunk, encoding, callback) {
  80. let error = null;
  81. try {
  82. this.input.write(chunk, encoding, e => callback(e || error));
  83. } catch (e) {
  84. error = e;
  85. }
  86. }
  87. // _final(callback) { // unavailable in Node 6
  88. // let error = null;
  89. // try {
  90. // this.input.end(null, null, e => callback(e || error));
  91. // } catch (e) {
  92. // error = e;
  93. // }
  94. // }
  95. _read() {
  96. this.output.resume();
  97. }
  98. static chain(fns, options) {
  99. return new Chain(fns, options);
  100. }
  101. }
  102. module.exports = Chain;