index.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. const none = {};
  4. function Final(value) {
  5. this.value = value;
  6. }
  7. function Many(values) {
  8. this.values = values;
  9. }
  10. const runAsyncGenerator = async (gen, stream) => {
  11. for (;;) {
  12. let data = gen.next();
  13. if (data && typeof data.then == 'function') {
  14. data = await data;
  15. }
  16. if (data.done) break;
  17. const value = data.value;
  18. if (value && typeof value.then == 'function') {
  19. value = await value;
  20. }
  21. Chain.sanitize(value, stream);
  22. }
  23. };
  24. const wrapFunction = fn =>
  25. new Transform({
  26. writableObjectMode: true,
  27. readableObjectMode: true,
  28. transform(chunk, encoding, callback) {
  29. try {
  30. const result = fn.call(this, chunk, encoding);
  31. if (result && typeof result.then == 'function') {
  32. // thenable
  33. result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
  34. return;
  35. }
  36. if (result && typeof result.next == 'function') {
  37. // generator
  38. runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
  39. return;
  40. }
  41. Chain.sanitize(result, this);
  42. callback(null);
  43. } catch (error) {
  44. callback(error);
  45. }
  46. }
  47. });
  48. const wrapArray = fns =>
  49. new Transform({
  50. writableObjectMode: true,
  51. readableObjectMode: true,
  52. transform(chunk, encoding, callback) {
  53. try {
  54. let value = chunk;
  55. for (let i = 0; i < fns.length; ++i) {
  56. const result = fns[i].call(this, value, encoding);
  57. if (result === Chain.none) {
  58. callback(null);
  59. return;
  60. }
  61. if (result instanceof Chain.Final) {
  62. value = result.value;
  63. break;
  64. }
  65. value = result;
  66. }
  67. Chain.sanitize(value, this);
  68. callback(null);
  69. } catch (error) {
  70. callback(error);
  71. }
  72. }
  73. });
  74. class Chain extends Duplex {
  75. constructor(fns, options) {
  76. super(options || {writableObjectMode: true, readableObjectMode: true});
  77. if (!(fns instanceof Array) || !fns.length) {
  78. throw Error("Chain's argument should be a non-empty array.");
  79. }
  80. this.streams = fns
  81. .filter(fn => fn)
  82. .map((fn, index, fns) => {
  83. if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
  84. if (
  85. fn instanceof Duplex ||
  86. fn instanceof Transform ||
  87. (!index && fn instanceof Readable) ||
  88. (index === fns.length - 1 && fn instanceof Writable)
  89. ) {
  90. return fn;
  91. }
  92. throw Error('Arguments should be functions, arrays or streams.');
  93. })
  94. .filter(s => s);
  95. this.input = this.streams[0];
  96. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  97. if (!(this.input instanceof Writable)) {
  98. this._write = (_1, _2, callback) => callback(null);
  99. this._final = callback => callback(null); // unavailable in Node 6
  100. this.input.on('end', () => this.end());
  101. }
  102. if (this.output instanceof Readable) {
  103. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  104. this.output.on('end', () => this.push(null));
  105. } else {
  106. this._read = () => {}; // nop
  107. this.resume();
  108. this.output.on('finish', () => this.push(null));
  109. }
  110. // connect events
  111. if (!options || !options.skipEvents) {
  112. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  113. }
  114. }
  115. _write(chunk, encoding, callback) {
  116. let error = null;
  117. try {
  118. this.input.write(chunk, encoding, e => callback(e || error));
  119. } catch (e) {
  120. error = e;
  121. }
  122. }
  123. _final(callback) {
  124. let error = null;
  125. try {
  126. this.input.end(null, null, e => callback(e || error));
  127. } catch (e) {
  128. error = e;
  129. }
  130. }
  131. _read() {
  132. this.output.resume();
  133. }
  134. static make(fns, options) {
  135. return new Chain(fns, options);
  136. }
  137. static final(value) {
  138. return new Chain.Final(value);
  139. }
  140. static many(values) {
  141. return new Chain.Many(values);
  142. }
  143. static sanitize(result, stream) {
  144. if (result instanceof Chain.Final) {
  145. result = result.value;
  146. } else if (result instanceof Chain.Many) {
  147. result = result.values;
  148. }
  149. if (result !== undefined && result !== null && result !== Chain.none) {
  150. if (result instanceof Array) {
  151. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  152. } else {
  153. stream.push(result);
  154. }
  155. }
  156. }
  157. static convertToTransform(fn) {
  158. if (typeof fn === 'function') return wrapFunction(fn);
  159. if (fn instanceof Array) return fn.length ? wrapArray(fn) : null;
  160. return null;
  161. }
  162. }
  163. Chain.none = none;
  164. Chain.Final = Final;
  165. Chain.Many = Many;
  166. Chain.chain = Chain.make;
  167. Chain.make.Constructor = Chain;
  168. module.exports = Chain;