index.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. // const {none, Final, Many, final, many} = require('./defs');
  4. const none = Symbol.for('stream-chain.none');
  5. const finalSymbol = Symbol.for('stream-chain.final');
  6. const manySymbol = Symbol.for('stream-chain.many');
  7. const final = value => ({[finalSymbol]: value});
  8. const many = values => ({[manySymbol]: values});
  9. const isFinal = o => o && typeof o == 'object' && finalSymbol in o;
  10. const isMany = o => o && typeof o == 'object' && manySymbol in o;
  11. const getFinalValue = o => o[finalSymbol];
  12. const getManyValues = o => o[manySymbol];
  13. const runAsyncGenerator = async (gen, stream) => {
  14. for (;;) {
  15. let data = gen.next();
  16. if (data && typeof data.then == 'function') {
  17. data = await data;
  18. }
  19. if (data.done) break;
  20. const value = data.value;
  21. if (value && typeof value.then == 'function') {
  22. value = await value;
  23. }
  24. Chain.sanitize(value, stream);
  25. }
  26. };
  27. const wrapFunction = fn =>
  28. new Transform({
  29. writableObjectMode: true,
  30. readableObjectMode: true,
  31. transform(chunk, encoding, callback) {
  32. try {
  33. const result = fn.call(this, chunk, encoding);
  34. if (result && typeof result.then == 'function') {
  35. // thenable
  36. result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
  37. return;
  38. }
  39. if (result && typeof result.next == 'function') {
  40. // generator
  41. runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
  42. return;
  43. }
  44. Chain.sanitize(result, this);
  45. callback(null);
  46. } catch (error) {
  47. callback(error);
  48. }
  49. }
  50. });
  51. const wrapArray = fns =>
  52. new Transform({
  53. writableObjectMode: true,
  54. readableObjectMode: true,
  55. transform(chunk, encoding, callback) {
  56. try {
  57. let value = chunk;
  58. for (let i = 0; i < fns.length; ++i) {
  59. const result = fns[i].call(this, value, encoding);
  60. if (result === Chain.none) {
  61. callback(null);
  62. return;
  63. }
  64. if (Chain.isFinal(result)) {
  65. value = Chain.getFinalValue(result);
  66. break;
  67. }
  68. value = result;
  69. }
  70. Chain.sanitize(value, this);
  71. callback(null);
  72. } catch (error) {
  73. callback(error);
  74. }
  75. }
  76. });
  77. class Chain extends Duplex {
  78. constructor(fns, options) {
  79. super(options || {writableObjectMode: true, readableObjectMode: true});
  80. if (!(fns instanceof Array) || !fns.length) {
  81. throw Error("Chain's argument should be a non-empty array.");
  82. }
  83. this.streams = fns
  84. .filter(fn => fn)
  85. .map((fn, index, fns) => {
  86. if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
  87. if (
  88. fn instanceof Duplex ||
  89. fn instanceof Transform ||
  90. (!index && fn instanceof Readable) ||
  91. (index === fns.length - 1 && fn instanceof Writable)
  92. ) {
  93. return fn;
  94. }
  95. throw Error('Arguments should be functions, arrays or streams.');
  96. })
  97. .filter(s => s);
  98. this.input = this.streams[0];
  99. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  100. if (!(this.input instanceof Writable)) {
  101. this._write = (_1, _2, callback) => callback(null);
  102. this._final = callback => callback(null); // unavailable in Node 6
  103. this.input.on('end', () => this.end());
  104. }
  105. if (this.output instanceof Readable) {
  106. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  107. this.output.on('end', () => this.push(null));
  108. } else {
  109. this._read = () => {}; // nop
  110. this.resume();
  111. this.output.on('finish', () => this.push(null));
  112. }
  113. // connect events
  114. if (!options || !options.skipEvents) {
  115. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  116. }
  117. }
  118. _write(chunk, encoding, callback) {
  119. let error = null;
  120. try {
  121. this.input.write(chunk, encoding, e => callback(e || error));
  122. } catch (e) {
  123. error = e;
  124. }
  125. }
  126. _final(callback) {
  127. let error = null;
  128. try {
  129. this.input.end(null, null, e => callback(e || error));
  130. } catch (e) {
  131. error = e;
  132. }
  133. }
  134. _read() {
  135. this.output.resume();
  136. }
  137. static make(fns, options) {
  138. return new Chain(fns, options);
  139. }
  140. static sanitize(result, stream) {
  141. if (Chain.isFinal(result)) {
  142. result = Chain.getFinalValue(result);
  143. } else if (Chain.isMany(result)) {
  144. result = Chain.getManyValues(result);
  145. }
  146. if (result !== undefined && result !== null && result !== Chain.none) {
  147. if (result instanceof Array) {
  148. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  149. } else {
  150. stream.push(result);
  151. }
  152. }
  153. }
  154. static convertToTransform(fn) {
  155. if (typeof fn === 'function') return wrapFunction(fn);
  156. if (fn instanceof Array) return fn.length ? wrapArray(fn) : null;
  157. return null;
  158. }
  159. }
  160. Chain.none = none;
  161. Chain.final = final;
  162. Chain.isFinal = isFinal;
  163. Chain.getFinalValue = getFinalValue;
  164. Chain.many = many;
  165. Chain.isMany = isMany;
  166. Chain.getManyValues = getManyValues;
  167. Chain.chain = Chain.make;
  168. Chain.make.Constructor = Chain;
  169. module.exports = Chain;