index.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('./defs');
  4. const gen = require('./gen');
  5. const asStream = require('./asStream');
  6. // is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
  7. const isReadableNodeStream = obj =>
  8. obj &&
  9. typeof obj.pipe === 'function' &&
  10. typeof obj.on === 'function' &&
  11. (!obj._writableState ||
  12. (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
  13. (!obj._writableState || obj._readableState); // Writable has .pipe.
  14. const isWritableNodeStream = obj =>
  15. obj &&
  16. typeof obj.write === 'function' &&
  17. typeof obj.on === 'function' &&
  18. (!obj._readableState ||
  19. (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false); // Duplex
  20. const isDuplexNodeStream = obj =>
  21. obj &&
  22. typeof obj.pipe === 'function' &&
  23. obj._readableState &&
  24. typeof obj.on === 'function' &&
  25. typeof obj.write === 'function';
  26. const groupFunctions = (output, fn, index, fns) => {
  27. if (
  28. isDuplexNodeStream(fn) ||
  29. (!index && isReadableNodeStream(fn)) ||
  30. (index === fns.length - 1 && isWritableNodeStream(fn))
  31. ) {
  32. output.push(fn);
  33. return output;
  34. }
  35. if (typeof fn != 'function') throw TypeError('Item #' + index + ' is not a proper stream, nor a function.');
  36. if (!output.length) output.push([]);
  37. const last = output[output.length - 1];
  38. if (Array.isArray(last)) {
  39. last.push(fn);
  40. } else {
  41. output.push([fn]);
  42. }
  43. return output;
  44. };
  45. const produceStreams = item => {
  46. if (Array.isArray(item)) {
  47. if (!item.length) return null;
  48. if (item.length == 1) return item[0] && chain.asStream(item[0]);
  49. return chain.asStream(chain.gen(...item));
  50. }
  51. return item;
  52. };
  53. const wrapFunctions = (fn, index, fns) => {
  54. if (
  55. isDuplexNodeStream(fn) ||
  56. (!index && isReadableNodeStream(fn)) ||
  57. (index === fns.length - 1 && isWritableNodeStream(fn))
  58. ) {
  59. return fn; // an acceptable stream
  60. }
  61. if (typeof fn == 'function') return chain.asStream(fn); // a function
  62. throw TypeError('Item #' + index + ' is not a proper stream, nor a function.');
  63. };
  64. // default implementation of required stream methods
  65. const write = (input, chunk, encoding, callback) => {
  66. let error = null;
  67. try {
  68. input.write(chunk, encoding, e => callback(e || error));
  69. } catch (e) {
  70. error = e;
  71. }
  72. };
  73. const final = (input, callback) => {
  74. let error = null;
  75. try {
  76. input.end(null, null, e => callback(e || error));
  77. } catch (e) {
  78. error = e;
  79. }
  80. };
  81. const read = output => {
  82. output.resume();
  83. };
  84. // the chain creator
  85. const chain = (fns, options) => {
  86. if (!Array.isArray(fns) || !fns.length) {
  87. throw TypeError("Chain's first argument should be a non-empty array.");
  88. }
  89. fns = fns.filter(fn => fn).flat(Infinity); // remove nulls and flatten
  90. const streams = (
  91. options && options.noGrouping
  92. ? fns.map(wrapFunctions)
  93. : fns.reduce(groupFunctions, []).map(produceStreams)
  94. ).filter(s => s),
  95. input = streams[0],
  96. output = streams.reduce((output, item) => (output && output.pipe(item)) || item);
  97. let stream = null; // will be assigned later
  98. let writeMethod = (chunk, encoding, callback) => write(input, chunk, encoding, callback),
  99. finalMethod = callback => final(input, callback),
  100. readMethod = () => read(output);
  101. if (!isWritableNodeStream(input)) {
  102. writeMethod = (_1, _2, callback) => callback(null);
  103. finalMethod = callback => callback(null); // unavailable in Node 6
  104. input.on('end', () => stream.end());
  105. }
  106. if (isReadableNodeStream(output)) {
  107. output.on('data', chunk => !stream.push(chunk) && output.pause());
  108. output.on('end', () => stream.push(null));
  109. } else {
  110. readMethod = () => {}; // nop
  111. output.on('finish', () => stream.push(null));
  112. }
  113. stream = new Duplex(
  114. Object.assign({}, {writableObjectMode: true, readableObjectMode: true}, options, {
  115. write: writeMethod,
  116. final: finalMethod,
  117. read: readMethod
  118. })
  119. );
  120. stream.streams = streams;
  121. stream.input = input;
  122. stream.output = output;
  123. if (!isReadableNodeStream(output)) {
  124. stream.resume();
  125. }
  126. // connect events
  127. if (!options || !options.skipEvents) {
  128. streams.forEach(item => item.on('error', error => stream.emit('error', error)));
  129. }
  130. return stream;
  131. };
  132. module.exports = chain;
  133. // to keep ESM happy
  134. module.exports.none = chain.none = defs.none;
  135. module.exports.stop = chain.stop = defs.stop;
  136. module.exports.Stop = chain.Stop = defs.Stop;
  137. module.exports.finalSymbol = chain.finalSymbol = defs.finalSymbol;
  138. module.exports.manySymbol = chain.manySymbol = defs.manySymbol;
  139. module.exports.flushSymbol = chain.flushSymbol = defs.flushSymbol;
  140. module.exports.finalValue = chain.finalValue = defs.finalValue;
  141. module.exports.many = chain.many = defs.many;
  142. module.exports.flushable = chain.flushable = defs.flushable;
  143. module.exports.isFinalValue = chain.isFinalValue = defs.isFinalValue;
  144. module.exports.isMany = chain.isMany = defs.isMany;
  145. module.exports.isFlushable = chain.isFlushable = defs.isFlushable;
  146. module.exports.getFinalValue = chain.getFinalValue = defs.getFinalValue;
  147. module.exports.getManyValues = chain.getManyValues = defs.getManyValues;
  148. module.exports.final = chain.final = defs.final;
  149. module.exports.chain = chain.chain = chain; // for compatibility with 2.x
  150. module.exports.gen = chain.gen = gen;
  151. module.exports.asStream = chain.asStream = asStream;