index.js 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. 'use strict';
  2. const {Readable, Writable, Duplex, Transform} = require('stream');
  3. const none = Symbol.for('object-stream.none');
  4. const finalSymbol = Symbol.for('object-stream.final');
  5. const manySymbol = Symbol.for('object-stream.many');
  6. const final = value => ({[finalSymbol]: value});
  7. const many = values => ({[manySymbol]: values});
  8. const isFinal = o => o && typeof o == 'object' && finalSymbol in o;
  9. const isMany = o => o && typeof o == 'object' && manySymbol in o;
  10. const getFinalValue = o => o[finalSymbol];
  11. const getManyValues = o => o[manySymbol];
  12. const runAsyncGenerator = async (gen, stream) => {
  13. for (;;) {
  14. let data = gen.next();
  15. if (data && typeof data.then == 'function') {
  16. data = await data;
  17. }
  18. if (data.done) break;
  19. let value = data.value;
  20. if (value && typeof value.then == 'function') {
  21. value = await value;
  22. }
  23. Chain.sanitize(value, stream);
  24. }
  25. };
  26. const wrapFunction = fn =>
  27. new Transform({
  28. writableObjectMode: true,
  29. readableObjectMode: true,
  30. transform(chunk, encoding, callback) {
  31. try {
  32. const result = fn.call(this, chunk, encoding);
  33. if (result && typeof result.then == 'function') {
  34. // thenable
  35. result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
  36. return;
  37. }
  38. if (result && typeof result.next == 'function') {
  39. // generator
  40. runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
  41. return;
  42. }
  43. Chain.sanitize(result, this);
  44. callback(null);
  45. } catch (error) {
  46. callback(error);
  47. }
  48. }
  49. });
  50. const wrapArray = fns =>
  51. new Transform({
  52. writableObjectMode: true,
  53. readableObjectMode: true,
  54. transform(chunk, encoding, callback) {
  55. try {
  56. let value = chunk;
  57. for (let i = 0; i < fns.length; ++i) {
  58. const result = fns[i].call(this, value, encoding);
  59. if (result === Chain.none) {
  60. callback(null);
  61. return;
  62. }
  63. if (Chain.isFinal(result)) {
  64. value = Chain.getFinalValue(result);
  65. break;
  66. }
  67. value = result;
  68. }
  69. Chain.sanitize(value, this);
  70. callback(null);
  71. } catch (error) {
  72. callback(error);
  73. }
  74. }
  75. });
  76. // is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
  77. const isReadableNodeStream = obj => !!(obj &&
  78. typeof obj.pipe === 'function' &&
  79. typeof obj.on === 'function' &&
  80. (!obj._writableState || (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
  81. (!obj._writableState || obj._readableState) // Writable has .pipe.
  82. );
  83. const isWritableNodeStream = obj => !!(
  84. obj &&
  85. typeof obj.write === 'function' &&
  86. typeof obj.on === 'function' &&
  87. (!obj._readableState || (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false) // Duplex
  88. );
  89. const isDuplexNodeStream = obj => !!(
  90. obj &&
  91. (typeof obj.pipe === 'function' && obj._readableState) &&
  92. typeof obj.on === 'function' &&
  93. typeof obj.write === 'function'
  94. );
  95. class Chain extends Duplex {
  96. constructor(fns, options) {
  97. super(options || {writableObjectMode: true, readableObjectMode: true});
  98. if (!(fns instanceof Array) || !fns.length) {
  99. throw Error("Chain's argument should be a non-empty array.");
  100. }
  101. this.streams = fns
  102. .filter(fn => fn)
  103. .map((fn, index, fns) => {
  104. if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
  105. if (
  106. isDuplexNodeStream(fn) ||
  107. (!index && isReadableNodeStream(fn)) ||
  108. (index === fns.length - 1 && isWritableNodeStream(fn))
  109. ) {
  110. return fn;
  111. }
  112. throw Error('Arguments should be functions, arrays or streams.');
  113. })
  114. .filter(s => s);
  115. this.input = this.streams[0];
  116. this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
  117. if (!isWritableNodeStream(this.input)) {
  118. this._write = (_1, _2, callback) => callback(null);
  119. this._final = callback => callback(null); // unavailable in Node 6
  120. this.input.on('end', () => this.end());
  121. }
  122. if (isReadableNodeStream(this.output)) {
  123. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  124. this.output.on('end', () => this.push(null));
  125. } else {
  126. this._read = () => {}; // nop
  127. this.resume();
  128. this.output.on('finish', () => this.push(null));
  129. }
  130. // connect events
  131. if (!options || !options.skipEvents) {
  132. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  133. }
  134. }
  135. _write(chunk, encoding, callback) {
  136. let error = null;
  137. try {
  138. this.input.write(chunk, encoding, e => callback(e || error));
  139. } catch (e) {
  140. error = e;
  141. }
  142. }
  143. _final(callback) {
  144. let error = null;
  145. try {
  146. this.input.end(null, null, e => callback(e || error));
  147. } catch (e) {
  148. error = e;
  149. }
  150. }
  151. _read() {
  152. this.output.resume();
  153. }
  154. static make(fns, options) {
  155. return new Chain(fns, options);
  156. }
  157. static sanitize(result, stream) {
  158. if (Chain.isFinal(result)) {
  159. result = Chain.getFinalValue(result);
  160. } else if (Chain.isMany(result)) {
  161. result = Chain.getManyValues(result);
  162. }
  163. if (result !== undefined && result !== null && result !== Chain.none) {
  164. if (result instanceof Array) {
  165. result.forEach(value => value !== undefined && value !== null && stream.push(value));
  166. } else {
  167. stream.push(result);
  168. }
  169. }
  170. }
  171. static convertToTransform(fn) {
  172. if (typeof fn === 'function') return wrapFunction(fn);
  173. if (fn instanceof Array) return fn.length ? wrapArray(fn) : null;
  174. return null;
  175. }
  176. }
  177. Chain.none = none;
  178. Chain.final = final;
  179. Chain.isFinal = isFinal;
  180. Chain.getFinalValue = getFinalValue;
  181. Chain.many = many;
  182. Chain.isMany = isMany;
  183. Chain.getManyValues = getManyValues;
  184. Chain.chain = Chain.make;
  185. Chain.make.Constructor = Chain;
  186. module.exports = Chain;