index.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. const none = {};
  4. function Final(value) {
  5. this.value = value;
  6. }
  7. function Many(values) {
  8. this.values = values;
  9. }
  10. const runAsyncGenerator = async (gen, stream) => {
  11. for (;;) {
  12. let data = gen.next();
  13. if (data && typeof data.then == 'function') {
  14. data = await data;
  15. }
  16. if (data.done) break;
  17. Chain.sanitize(data.value, stream);
  18. }
  19. };
  20. const wrapFunction = fn =>
  21. new Transform({
  22. writableObjectMode: true,
  23. readableObjectMode: true,
  24. transform(chunk, encoding, callback) {
  25. try {
  26. const result = fn.call(this, chunk, encoding);
  27. if (result && typeof result.then == 'function') {
  28. // thenable
  29. result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
  30. return;
  31. }
  32. if (result && typeof result.next == 'function') {
  33. // generator
  34. runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
  35. return;
  36. }
  37. Chain.sanitize(result, this);
  38. callback(null);
  39. } catch (error) {
  40. callback(error);
  41. }
  42. }
  43. });
  44. const wrapArray = array =>
  45. new Transform({
  46. writableObjectMode: true,
  47. readableObjectMode: true,
  48. transform(chunk, encoding, callback) {
  49. try {
  50. let value = chunk;
  51. for (let i = 0; i < array.length; ++i) {
  52. const result = array[i].call(this, value, encoding);
  53. if (result instanceof Chain.Final) {
  54. value = result.value;
  55. break;
  56. }
  57. value = result;
  58. }
  59. Chain.sanitize(value, this);
  60. callback(null);
  61. } catch (error) {
  62. callback(error);
  63. }
  64. }
  65. });
  66. class Chain extends Duplex {
  67. constructor(fns, options) {
  68. super(options || {writableObjectMode: true, readableObjectMode: true});
  69. if (!(fns instanceof Array) || !fns.length) {
  70. throw Error("Chain's argument should be a non-empty array.");
  71. }
  72. this.streams = fns
  73. .filter(fn => fn)
  74. .map((fn, index) => {
  75. if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
  76. if (
  77. fn instanceof Duplex ||
  78. fn instanceof Transform ||
  79. (!index && fn instanceof Readable) ||
  80. (index === fns.length - 1 && fn instanceof Writable)
  81. ) {
  82. return fn;
  83. }
  84. throw Error('Arguments should be functions or streams.');
  85. })
  86. .filter(s => s);
  87. this.input = this.streams[0];
  88. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  89. if (!(this.input instanceof Writable)) {
  90. this._write = (_1, _2, callback) => callback(null);
  91. this._final = callback => callback(null); // unavailable in Node 6
  92. this.input.on('end', () => this.end());
  93. }
  94. if (this.output instanceof Readable) {
  95. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  96. this.output.on('end', () => this.push(null));
  97. } else {
  98. this._read = () => {}; // nop
  99. this.resume();
  100. this.output.on('finish', () => this.push(null));
  101. }
  102. // connect events
  103. if (!options || !options.skipEvents) {
  104. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  105. }
  106. }
  107. _write(chunk, encoding, callback) {
  108. let error = null;
  109. try {
  110. this.input.write(chunk, encoding, e => callback(e || error));
  111. } catch (e) {
  112. error = e;
  113. }
  114. }
  115. _final(callback) {
  116. let error = null;
  117. try {
  118. this.input.end(null, null, e => callback(e || error));
  119. } catch (e) {
  120. error = e;
  121. }
  122. }
  123. _read() {
  124. this.output.resume();
  125. }
  126. static make(fns, options) {
  127. return new Chain(fns, options);
  128. }
  129. static final(value) {
  130. return new Chain.Final(value);
  131. }
  132. static many(values) {
  133. return new Chain.Many(values);
  134. }
  135. static sanitize(result, stream) {
  136. if (result instanceof Chain.Final) {
  137. result = result.value;
  138. } else if (result instanceof Chain.Many) {
  139. result = result.values;
  140. }
  141. if (result !== undefined && result !== null && result !== Chain.none) {
  142. if (result instanceof Array) {
  143. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  144. } else {
  145. stream.push(result);
  146. }
  147. }
  148. };
  149. static convertToTransform(fn) {
  150. if (typeof fn === 'function') return wrapFunction(fn);
  151. if (fn instanceof Array) return fn.length ? wrapArray(fn) : 0;
  152. return null;
  153. }
  154. }
  155. Chain.none = none;
  156. Chain.Final = Final;
  157. Chain.Many = Many;
  158. Chain.chain = Chain.make;
  159. Chain.make.Constructor = Chain;
  160. module.exports = Chain;