AsStream.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('./defs');
  4. class AsStream extends Duplex {
  5. static resolved = Promise.resolve();
  6. constructor(fn, options) {
  7. super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
  8. if (typeof fn == 'function') {
  9. this._fn = fn;
  10. } else if (fn) {
  11. if (typeof fn[Symbol.asyncIterator] == 'function') {
  12. this._fn = fn[Symbol.asyncIterator].bind(fn);
  13. } else if (typeof fn[Symbol.iterator] == 'function') {
  14. this._fn = fn[Symbol.iterator].bind(fn);
  15. }
  16. }
  17. if (!this._fn)
  18. throw TypeError(
  19. 'Only a function or an object with an iterator is accepted as the first argument'
  20. );
  21. // pump variables
  22. this._paused = AsStream.resolved;
  23. this._resolvePaused = null;
  24. this._queue = [];
  25. }
  26. _write(chunk, encoding, callback) {
  27. this._processChunk(chunk, encoding).then(
  28. () => callback(null),
  29. error => callback(error)
  30. );
  31. }
  32. _final(callback) {
  33. if (!defs.isFlushable(this._fn)) {
  34. this.push(null);
  35. callback(null);
  36. return;
  37. }
  38. this._processChunk(defs.none, null).then(
  39. () => callback(null),
  40. error => callback(error)
  41. );
  42. }
  43. _read() {
  44. this._resume();
  45. }
  46. // pause/resume
  47. _resume() {
  48. if (!this._resolvePaused) return;
  49. this._resolvePaused();
  50. this._resolvePaused = null;
  51. this._paused = AsStream.resolved;
  52. }
  53. _pause() {
  54. if (this._resolvePaused) return;
  55. this._paused = new Promise(resolve => (this._resolvePaused = resolve));
  56. }
  57. // data processing
  58. _pushResults(values) {
  59. if (values && typeof values.next == 'function') {
  60. // generator
  61. this._queue.push(values);
  62. return;
  63. }
  64. // array
  65. this._queue.push(values[Symbol.iterator]());
  66. }
  67. async _pump() {
  68. const queue = this._queue;
  69. while (queue.length) {
  70. await this._paused;
  71. const gen = queue[queue.length - 1];
  72. let result = gen.next();
  73. if (result && typeof result.then == 'function') {
  74. result = await result;
  75. }
  76. if (result.done) {
  77. queue.pop();
  78. continue;
  79. }
  80. const value = result.value;
  81. if (value && typeof value.then == 'function') {
  82. value = await value;
  83. }
  84. await this._sanitize(value);
  85. }
  86. }
  87. async _sanitize(value) {
  88. if (value === undefined || value === null || value === defs.none) return;
  89. if (value === defs.stop) throw new defs.Stop();
  90. if (defs.isMany(value)) {
  91. this._pushResults(defs.getManyValues(value));
  92. return this._pump();
  93. }
  94. if (defs.isFinalValue(value)) {
  95. // a final value is not supported, it is treated as a regular value
  96. value = defs.getFinalValue(value);
  97. return this._processValue(value);
  98. }
  99. if (!this.push(value)) {
  100. this._pause();
  101. }
  102. }
  103. async _processChunk(chunk, encoding) {
  104. try {
  105. const value = this._fn(chunk, encoding);
  106. await this._processValue(value);
  107. } catch (error) {
  108. if (error instanceof defs.Stop) {
  109. this.push(null);
  110. this.destroy();
  111. return;
  112. }
  113. throw error;
  114. }
  115. }
  116. async _processValue(value) {
  117. if (value && typeof value.then == 'function') {
  118. // thenable
  119. return value.then(value => this._processValue(value));
  120. }
  121. if (value && typeof value.next == 'function') {
  122. // generator
  123. this._pushResults(value);
  124. return this._pump();
  125. }
  126. return this._sanitize(value);
  127. }
  128. static make(fn, options) {
  129. return new AsStream(fn, options);
  130. }
  131. }
  132. AsStream.stream = AsStream.make;
  133. AsStream.make.Constructor = AsStream;
  134. module.exports = AsStream;
  135. // to keep ESM happy:
  136. module.exports.asStream = AsStream.make;
  137. module.exports.make = AsStream.make;