index.js 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. function Final(value) {
  4. this.value = value;
  5. }
  6. function Many(values) {
  7. this.values = values;
  8. }
  9. const processData = (result, stream) => {
  10. if (result instanceof Chain.Final) {
  11. result = result.value;
  12. } else if (result instanceof Chain.Many) {
  13. result = result.values;
  14. }
  15. if (result !== undefined && result !== null) {
  16. if (result instanceof Array) {
  17. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  18. } else {
  19. stream.push(result);
  20. }
  21. }
  22. };
  23. const runAsyncGenerator = async (gen, stream) => {
  24. for (;;) {
  25. let data = gen.next();
  26. if (data && typeof data.then == 'function') {
  27. data = await data;
  28. }
  29. if (data.done) break;
  30. processData(data.value, stream);
  31. }
  32. };
  33. const wrapFunction = fn =>
  34. new Transform({
  35. writableObjectMode: true,
  36. readableObjectMode: true,
  37. transform(chunk, encoding, callback) {
  38. try {
  39. const result = fn.call(this, chunk, encoding);
  40. if (result && typeof result.then == 'function') {
  41. // thenable
  42. result.then(result => (processData(result, this), callback(null)), error => callback(error));
  43. return;
  44. }
  45. if (result && typeof result.next == 'function') {
  46. // generator
  47. runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
  48. return;
  49. }
  50. processData(result, this);
  51. callback(null);
  52. } catch (error) {
  53. callback(error);
  54. }
  55. }
  56. });
  57. const wrapArray = array =>
  58. new Transform({
  59. writableObjectMode: true,
  60. readableObjectMode: true,
  61. transform(chunk, encoding, callback) {
  62. try {
  63. let value = chunk;
  64. for (let i = 0; i < array.length; ++i) {
  65. const result = array[i].call(this, value, encoding);
  66. if (result instanceof Chain.Final) {
  67. value = result.value;
  68. break;
  69. }
  70. value = result;
  71. }
  72. processData(value, this);
  73. callback(null);
  74. } catch (error) {
  75. callback(error);
  76. }
  77. }
  78. });
  79. class Chain extends Duplex {
  80. constructor(fns, options) {
  81. super(options || {writableObjectMode: true, readableObjectMode: true});
  82. if (!(fns instanceof Array) || !fns.length) {
  83. throw Error("Chain's argument should be a non-empty array.");
  84. }
  85. this.streams = fns
  86. .filter(fn => fn)
  87. .map((fn, index) => {
  88. if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
  89. if (
  90. fn instanceof Duplex ||
  91. fn instanceof Transform ||
  92. (!index && fn instanceof Readable) ||
  93. (index === fns.length - 1 && fn instanceof Writable)
  94. ) {
  95. return fn;
  96. }
  97. throw Error('Arguments should be functions or streams.');
  98. })
  99. .filter(s => s);
  100. this.input = this.streams[0];
  101. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  102. if (!(this.input instanceof Writable)) {
  103. this._write = (_1, _2, callback) => callback(null);
  104. this._final = callback => callback(null); // unavailable in Node 6
  105. this.input.on('end', () => this.end());
  106. }
  107. if (this.output instanceof Readable) {
  108. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  109. this.output.on('end', () => this.push(null));
  110. } else {
  111. this._read = () => {}; // nop
  112. this.resume();
  113. this.output.on('finish', () => this.push(null));
  114. }
  115. // connect events
  116. if (!options || !options.skipEvents) {
  117. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  118. }
  119. }
  120. _write(chunk, encoding, callback) {
  121. let error = null;
  122. try {
  123. this.input.write(chunk, encoding, e => callback(e || error));
  124. } catch (e) {
  125. error = e;
  126. }
  127. }
  128. _final(callback) {
  129. let error = null;
  130. try {
  131. this.input.end(null, null, e => callback(e || error));
  132. } catch (e) {
  133. error = e;
  134. }
  135. }
  136. _read() {
  137. this.output.resume();
  138. }
  139. static make(fns, options) {
  140. return new Chain(fns, options);
  141. }
  142. static final(value) {
  143. return new Chain.Final(value);
  144. }
  145. static many(values) {
  146. return new Chain.Many(values);
  147. }
  148. static convertToTransform(fn) {
  149. if (typeof fn === 'function') return wrapFunction(fn);
  150. if (fn instanceof Array) return fn.length ? wrapArray(fn) : 0;
  151. return null;
  152. }
  153. }
  154. Chain.Final = Final;
  155. Chain.Many = Many;
  156. Chain.chain = Chain.make;
  157. Chain.make.Constructor = Chain;
  158. module.exports = Chain;