index.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('./defs');
  4. const gen = require('./gen');
  5. const {asStream} = require('./AsStream');
  6. // is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
  7. const isReadableNodeStream = obj =>
  8. obj &&
  9. typeof obj.pipe === 'function' &&
  10. typeof obj.on === 'function' &&
  11. (!obj._writableState ||
  12. (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
  13. (!obj._writableState || obj._readableState); // Writable has .pipe.
  14. const isWritableNodeStream = obj =>
  15. obj &&
  16. typeof obj.write === 'function' &&
  17. typeof obj.on === 'function' &&
  18. (!obj._readableState ||
  19. (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false); // Duplex
  20. const isDuplexNodeStream = obj =>
  21. obj &&
  22. typeof obj.pipe === 'function' &&
  23. obj._readableState &&
  24. typeof obj.on === 'function' &&
  25. typeof obj.write === 'function';
  26. const getIterator = x => {
  27. if (!x) return null;
  28. if (typeof x[Symbol.asyncIterator] == 'function') return x[Symbol.asyncIterator].bind(x);
  29. if (typeof x[Symbol.iterator] == 'function') return x[Symbol.iterator].bind(x);
  30. return null;
  31. }
  32. class Chain extends Duplex {
  33. constructor(fns, options) {
  34. super(options || {writableObjectMode: true, readableObjectMode: true});
  35. if (!(fns instanceof Array) || !fns.length) {
  36. throw TypeError("Chain's argument should be a non-empty array.");
  37. }
  38. this.streams = fns
  39. .filter(fn => fn)
  40. .reduce((output, fn, index, fns) => {
  41. if (
  42. isDuplexNodeStream(fn) ||
  43. (!index && isReadableNodeStream(fn)) ||
  44. (index === fns.length - 1 && isWritableNodeStream(fn))
  45. ) {
  46. output.push(fn);
  47. return output;
  48. }
  49. if (typeof fn != 'function') {
  50. const iterator = getIterator(fn);
  51. if (!iterator) throw TypeError('Item #' + index + ' is not a proper stream, function, nor iterable.');
  52. fn = iterator;
  53. }
  54. if (!output.length) output.push([]);
  55. const last = output[output.length - 1];
  56. if (Array.isArray(last)) {
  57. last.push(fn);
  58. } else {
  59. output.push([fn]);
  60. }
  61. return output;
  62. }, [])
  63. .map(item => {
  64. if (Array.isArray(item)) {
  65. if (!item.length) return null;
  66. if (item.length == 1) return item[0] && Chain.asStream(item[0]);
  67. return Chain.asStream(Chain.gen(...item));
  68. }
  69. return item;
  70. })
  71. .filter(s => s);
  72. this.input = this.streams[0];
  73. this.output = this.streams.reduce(
  74. (output, stream) => (output && output.pipe(stream)) || stream
  75. );
  76. if (!isWritableNodeStream(this.input)) {
  77. this._write = (_1, _2, callback) => callback(null);
  78. this._final = callback => callback(null); // unavailable in Node 6
  79. this.input.on('end', () => this.end());
  80. }
  81. if (isReadableNodeStream(this.output)) {
  82. this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
  83. this.output.on('end', () => this.push(null));
  84. } else {
  85. this._read = () => {}; // nop
  86. this.resume();
  87. this.output.on('finish', () => this.push(null));
  88. }
  89. // connect events
  90. if (!options || !options.skipEvents) {
  91. this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
  92. }
  93. }
  94. _write(chunk, encoding, callback) {
  95. let error = null;
  96. try {
  97. this.input.write(chunk, encoding, e => callback(e || error));
  98. } catch (e) {
  99. error = e;
  100. }
  101. }
  102. _final(callback) {
  103. let error = null;
  104. try {
  105. this.input.end(null, null, e => callback(e || error));
  106. } catch (e) {
  107. error = e;
  108. }
  109. }
  110. _read() {
  111. this.output.resume();
  112. }
  113. static make(fns, options) {
  114. return new Chain(fns, options);
  115. }
  116. }
  117. Chain.chain = Chain.make;
  118. Chain.make.Constructor = Chain;
  119. Chain.gen = gen;
  120. Chain.asStream = asStream;
  121. module.exports = Chain;
  122. // to keep ESM happy:
  123. module.exports.none = defs.none;
  124. module.exports.stop = defs.stop;
  125. module.exports.Stop = defs.Stop;
  126. module.exports.finalSymbol = defs.finalSymbol;
  127. module.exports.manySymbol = defs.manySymbol;
  128. module.exports.flushSymbol = defs.flushSymbol;
  129. module.exports.finalValue = defs.finalValue;
  130. module.exports.many = defs.many;
  131. module.exports.flushable = defs.flushable;
  132. module.exports.isFinalValue = defs.isFinalValue;
  133. module.exports.isMany = defs.isMany;
  134. module.exports.isFlushable = defs.isFlushable;
  135. module.exports.getFinalValue = defs.getFinalValue;
  136. module.exports.getManyValues = defs.getManyValues;
  137. module.exports.final = defs.final;
  138. module.exports.chain = Chain.make;
  139. module.exports.gen = gen;
  140. module.exports.asStream = asStream;