index.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. const {none, finalValue, many, isFinalValue, isMany, getFinalValue, getManyValues} = require('./defs');
  4. const runAsyncGenerator = async (gen, stream) => {
  5. for (;;) {
  6. let data = gen.next();
  7. if (data && typeof data.then == 'function') {
  8. data = await data;
  9. }
  10. if (data.done) break;
  11. const value = data.value;
  12. if (value && typeof value.then == 'function') {
  13. value = await value;
  14. }
  15. Chain.sanitize(value, stream);
  16. }
  17. };
  18. const wrapFunction = fn =>
  19. new Transform({
  20. writableObjectMode: true,
  21. readableObjectMode: true,
  22. transform(chunk, encoding, callback) {
  23. try {
  24. const result = fn.call(this, chunk, encoding);
  25. if (result && typeof result.then == 'function') {
  26. // thenable
  27. result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
  28. return;
  29. }
  30. if (result && typeof result.next == 'function') {
  31. // generator
  32. runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
  33. return;
  34. }
  35. Chain.sanitize(result, this);
  36. callback(null);
  37. } catch (error) {
  38. callback(error);
  39. }
  40. }
  41. });
  42. const wrapArray = fns =>
  43. new Transform({
  44. writableObjectMode: true,
  45. readableObjectMode: true,
  46. transform(chunk, encoding, callback) {
  47. try {
  48. let value = chunk;
  49. for (let i = 0; i < fns.length; ++i) {
  50. const result = fns[i].call(this, value, encoding);
  51. if (result === Chain.none) {
  52. callback(null);
  53. return;
  54. }
  55. if (Chain.isFinalValue(result)) {
  56. value = Chain.getFinalValue(result);
  57. break;
  58. }
  59. value = result;
  60. }
  61. Chain.sanitize(value, this);
  62. callback(null);
  63. } catch (error) {
  64. callback(error);
  65. }
  66. }
  67. });
  68. class Chain extends Duplex {
  69. constructor(fns, options) {
  70. super(options || {writableObjectMode: true, readableObjectMode: true});
  71. if (!(fns instanceof Array) || !fns.length) {
  72. throw Error("Chain's argument should be a non-empty array.");
  73. }
  74. this.streams = fns
  75. .filter(fn => fn)
  76. .map((fn, index, fns) => {
  77. if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
  78. if (
  79. fn instanceof Duplex ||
  80. fn instanceof Transform ||
  81. (!index && fn instanceof Readable) ||
  82. (index === fns.length - 1 && fn instanceof Writable)
  83. ) {
  84. return fn;
  85. }
  86. throw Error('Arguments should be functions, arrays or streams.');
  87. })
  88. .filter(s => s);
  89. this.input = this.streams[0];
  90. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  91. if (!(this.input instanceof Writable)) {
  92. this._write = (_1, _2, callback) => callback(null);
  93. this._final = callback => callback(null); // unavailable in Node 6
  94. this.input.on('end', () => this.end());
  95. }
  96. if (this.output instanceof Readable) {
  97. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  98. this.output.on('end', () => this.push(null));
  99. } else {
  100. this._read = () => {}; // nop
  101. this.resume();
  102. this.output.on('finish', () => this.push(null));
  103. }
  104. // connect events
  105. if (!options || !options.skipEvents) {
  106. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  107. }
  108. }
  109. _write(chunk, encoding, callback) {
  110. let error = null;
  111. try {
  112. this.input.write(chunk, encoding, e => callback(e || error));
  113. } catch (e) {
  114. error = e;
  115. }
  116. }
  117. _final(callback) {
  118. let error = null;
  119. try {
  120. this.input.end(null, null, e => callback(e || error));
  121. } catch (e) {
  122. error = e;
  123. }
  124. }
  125. _read() {
  126. this.output.resume();
  127. }
  128. static make(fns, options) {
  129. return new Chain(fns, options);
  130. }
  131. static sanitize(result, stream) {
  132. if (Chain.isFinalValue(result)) {
  133. result = Chain.getFinalValue(result);
  134. } else if (Chain.isMany(result)) {
  135. result = Chain.getManyValues(result);
  136. }
  137. if (result !== undefined && result !== null && result !== Chain.none) {
  138. if (result instanceof Array) {
  139. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  140. } else {
  141. stream.push(result);
  142. }
  143. }
  144. }
  145. static convertToTransform(fn) {
  146. if (typeof fn === 'function') return wrapFunction(fn);
  147. if (fn instanceof Array) return fn.length ? wrapArray(fn) : null;
  148. return null;
  149. }
  150. }
  151. Chain.none = none;
  152. Chain.finalValue = finalValue;
  153. Chain.isFinalValue = isFinalValue;
  154. Chain.getFinalValue = getFinalValue;
  155. Chain.many = many;
  156. Chain.isMany = isMany;
  157. Chain.getManyValues = getManyValues;
  158. Chain.chain = Chain.make;
  159. Chain.make.Constructor = Chain;
  160. module.exports = Chain;