index.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. const none = {};
  4. function Final(value) {
  5. this.value = value;
  6. }
  7. function Many(values) {
  8. this.values = values;
  9. }
  10. const runAsyncGenerator = async (gen, stream) => {
  11. for (;;) {
  12. let data = gen.next();
  13. if (data && typeof data.then == 'function') {
  14. data = await data;
  15. }
  16. if (data.done) break;
  17. const value = data.value;
  18. if (value && typeof value.then == 'function') {
  19. value = await value;
  20. }
  21. Chain.sanitize(value, stream);
  22. }
  23. };
  24. const wrapFunction = fn =>
  25. new Transform({
  26. writableObjectMode: true,
  27. readableObjectMode: true,
  28. transform(chunk, encoding, callback) {
  29. try {
  30. const result = fn.call(this, chunk, encoding);
  31. if (result && typeof result.then == 'function') {
  32. // thenable
  33. result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
  34. return;
  35. }
  36. if (result && typeof result.next == 'function') {
  37. // generator
  38. runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
  39. return;
  40. }
  41. Chain.sanitize(result, this);
  42. callback(null);
  43. } catch (error) {
  44. callback(error);
  45. }
  46. }
  47. });
  48. const wrapArray = array =>
  49. new Transform({
  50. writableObjectMode: true,
  51. readableObjectMode: true,
  52. transform(chunk, encoding, callback) {
  53. try {
  54. let value = chunk;
  55. for (let i = 0; i < array.length; ++i) {
  56. const result = array[i].call(this, value, encoding);
  57. if (result instanceof Chain.Final) {
  58. value = result.value;
  59. break;
  60. }
  61. value = result;
  62. }
  63. Chain.sanitize(value, this);
  64. callback(null);
  65. } catch (error) {
  66. callback(error);
  67. }
  68. }
  69. });
  70. class Chain extends Duplex {
  71. constructor(fns, options) {
  72. super(options || {writableObjectMode: true, readableObjectMode: true});
  73. if (!(fns instanceof Array) || !fns.length) {
  74. throw Error("Chain's argument should be a non-empty array.");
  75. }
  76. this.streams = fns
  77. .filter(fn => fn)
  78. .map((fn, index) => {
  79. if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
  80. if (
  81. fn instanceof Duplex ||
  82. fn instanceof Transform ||
  83. (!index && fn instanceof Readable) ||
  84. (index === fns.length - 1 && fn instanceof Writable)
  85. ) {
  86. return fn;
  87. }
  88. throw Error('Arguments should be functions, arrays or streams.');
  89. })
  90. .filter(s => s);
  91. this.input = this.streams[0];
  92. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  93. if (!(this.input instanceof Writable)) {
  94. this._write = (_1, _2, callback) => callback(null);
  95. this._final = callback => callback(null); // unavailable in Node 6
  96. this.input.on('end', () => this.end());
  97. }
  98. if (this.output instanceof Readable) {
  99. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  100. this.output.on('end', () => this.push(null));
  101. } else {
  102. this._read = () => {}; // nop
  103. this.resume();
  104. this.output.on('finish', () => this.push(null));
  105. }
  106. // connect events
  107. if (!options || !options.skipEvents) {
  108. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  109. }
  110. }
  111. _write(chunk, encoding, callback) {
  112. let error = null;
  113. try {
  114. this.input.write(chunk, encoding, e => callback(e || error));
  115. } catch (e) {
  116. error = e;
  117. }
  118. }
  119. _final(callback) {
  120. let error = null;
  121. try {
  122. this.input.end(null, null, e => callback(e || error));
  123. } catch (e) {
  124. error = e;
  125. }
  126. }
  127. _read() {
  128. this.output.resume();
  129. }
  130. static make(fns, options) {
  131. return new Chain(fns, options);
  132. }
  133. static final(value) {
  134. return new Chain.Final(value);
  135. }
  136. static many(values) {
  137. return new Chain.Many(values);
  138. }
  139. static sanitize(result, stream) {
  140. if (result instanceof Chain.Final) {
  141. result = result.value;
  142. } else if (result instanceof Chain.Many) {
  143. result = result.values;
  144. }
  145. if (result !== undefined && result !== null && result !== Chain.none) {
  146. if (result instanceof Array) {
  147. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  148. } else {
  149. stream.push(result);
  150. }
  151. }
  152. }
  153. static convertToTransform(fn) {
  154. if (typeof fn === 'function') return wrapFunction(fn);
  155. if (fn instanceof Array) return fn.length ? wrapArray(fn) : 0;
  156. return null;
  157. }
  158. }
  159. Chain.none = none;
  160. Chain.Final = Final;
  161. Chain.Many = Many;
  162. Chain.chain = Chain.make;
  163. Chain.make.Constructor = Chain;
  164. module.exports = Chain;