Stream.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('../defs');
  4. class Stream extends Duplex {
  5. static resolved = Promise.resolve();
  6. constructor(fn, options) {
  7. super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
  8. if (typeof fn != 'function') throw Error('Only function is accepted as the first argument');
  9. this._fn = fn;
  10. // pump variables
  11. this._paused = Stream.resolved;
  12. this._resolvePaused = null;
  13. this._queue = [];
  14. }
  15. _write(chunk, encoding, callback) {
  16. this._processChunk(chunk, encoding).then(
  17. () => callback(null),
  18. error => callback(error)
  19. );
  20. }
  21. _final(callback) {
  22. if (!defs.isFlushable(this._fn)) {
  23. this.push(null);
  24. callback(null);
  25. return;
  26. }
  27. this._processChunk(defs.none, null).then(
  28. () => callback(null),
  29. error => callback(error)
  30. );
  31. }
  32. _read() {
  33. this._resume();
  34. }
  35. // pause/resume
  36. _resume() {
  37. if (!this._resolvePaused) return;
  38. this._resolvePaused();
  39. this._resolvePaused = null;
  40. this._paused = Stream.resolved;
  41. }
  42. _pause() {
  43. if (this._resolvePaused) return;
  44. this._paused = new Promise(resolve => (this._resolvePaused = resolve));
  45. }
  46. // data processing
  47. _pushResults(values) {
  48. if (values && typeof values.next == 'function') {
  49. // generator
  50. this._queue.push(values);
  51. return;
  52. }
  53. // array
  54. this._queue.push(values[Symbol.iterator]());
  55. }
  56. async _pump() {
  57. const queue = this._queue;
  58. while (queue.length) {
  59. await this._paused;
  60. const gen = queue[queue.length - 1];
  61. let result = gen.next();
  62. if (result && typeof result.then == 'function') {
  63. result = await result;
  64. }
  65. if (result.done) {
  66. queue.pop();
  67. continue;
  68. }
  69. const value = result.value;
  70. if (value && typeof value.then == 'function') {
  71. value = await value;
  72. }
  73. await this._sanitize(value);
  74. }
  75. }
  76. async _sanitize(value) {
  77. if (value === undefined || value === null || value === defs.none) return;
  78. if (value === defs.stop) throw new defs.Stop();
  79. if (defs.isMany(value)) {
  80. this._pushResults(defs.getManyValues(value));
  81. return this._pump();
  82. }
  83. if (defs.isFinalValue(value)) {
  84. // a final value is not supported, it is treated as a regular value
  85. value = defs.getFinalValue(value);
  86. return this._processValue(value);
  87. }
  88. if (!this.push(value)) {
  89. this._pause();
  90. }
  91. }
  92. async _processChunk(chunk, encoding) {
  93. try {
  94. const value = this._fn(chunk, encoding);
  95. await this._processValue(value);
  96. } catch (error) {
  97. if (error instanceof defs.Stop) {
  98. this.push(null);
  99. this.destroy();
  100. return;
  101. }
  102. throw error;
  103. }
  104. }
  105. async _processValue(value) {
  106. if (value && typeof value.then == 'function') {
  107. // thenable
  108. return value.then(value => this._processValue(value));
  109. }
  110. if (value && typeof value.next == 'function') {
  111. // generator
  112. this._pushResults(value);
  113. return this._pump();
  114. }
  115. return this._sanitize(value);
  116. }
  117. static make(fn, options) {
  118. return new Stream(fn, options);
  119. }
  120. }
  121. Stream.stream = Stream.make;
  122. Stream.make.Constructor = Stream;
  123. module.exports = Stream;