Stream.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('../defs');
  4. class Stream extends Duplex {
  5. static resolved = Promise.resolve();
  6. constructor(fn, options) {
  7. super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
  8. if (typeof fn != 'function') throw Error('Only function is accepted as the first argument');
  9. this._fn = fn;
  10. // pump variables
  11. this._paused = Stream.resolved;
  12. this._resolvePaused = null;
  13. this._queue = [];
  14. }
  15. _write(chunk, encoding, callback) {
  16. this._processChunk(chunk, encoding).then(
  17. () => callback(null),
  18. error => callback(error)
  19. );
  20. }
  21. _final(callback) {
  22. // TODO: add isFlushable()
  23. this.push(null);
  24. callback(null);
  25. }
  26. _read() {
  27. this._resume();
  28. }
  29. // pause/resume
  30. _resume() {
  31. if (!this._resolvePaused) return;
  32. this._resolvePaused();
  33. this._resolvePaused = null;
  34. this._paused = Stream.resolved;
  35. }
  36. _pause() {
  37. if (this._resolvePaused) return;
  38. this._paused = new Promise(resolve => (this._resolvePaused = resolve));
  39. }
  40. // data processing
  41. _pushResults(values) {
  42. if (values && typeof values.next == 'function') {
  43. // generator
  44. this._queue.push(values);
  45. return;
  46. }
  47. // array
  48. this._queue.push(values[Symbol.iterator]());
  49. }
  50. async _pump() {
  51. const queue = this._queue;
  52. while (queue.length) {
  53. await this._paused;
  54. const gen = queue[queue.length - 1];
  55. let result = gen.next();
  56. if (result && typeof result.then == 'function') {
  57. result = await result;
  58. }
  59. if (result.done) {
  60. queue.pop();
  61. continue;
  62. }
  63. const value = result.value;
  64. if (value && typeof value.then == 'function') {
  65. value = await value;
  66. }
  67. await this._sanitize(value);
  68. }
  69. }
  70. async _sanitize(value) {
  71. if (value === undefined || value === null || value === defs.none) return;
  72. if (value === defs.stop) throw new defs.Stop();
  73. if (defs.isMany(value)) {
  74. this._pushResults(defs.getManyValues(value));
  75. return this._pump();
  76. }
  77. if (defs.isFinalValue(value)) {
  78. value = defs.getFinalValue(value);
  79. await this._processValue(value);
  80. throw new defs.Stop(); // is it the correct handling of the final value?
  81. }
  82. if (!this.push(value)) {
  83. this._pause();
  84. }
  85. }
  86. async _processChunk(chunk, encoding) {
  87. try {
  88. const value = this._fn(chunk, encoding);
  89. await this._processValue(value);
  90. } catch (error) {
  91. if (error instanceof defs.Stop) {
  92. this.push(null);
  93. this.destroy();
  94. // clean up
  95. return;
  96. }
  97. throw error;
  98. }
  99. }
  100. async _processValue(value) {
  101. if (value && typeof value.then == 'function') {
  102. // thenable
  103. return value.then(value => this._processValue(value));
  104. }
  105. if (value && typeof value.next == 'function') {
  106. // generator
  107. this._pushResults(value);
  108. return this._pump();
  109. }
  110. return this._sanitize(value);
  111. }
  112. static make(fn, options) {
  113. return new Stream(fn, options);
  114. }
  115. }
  116. Stream.stream = Stream.make;
  117. Stream.make.Constructor = Stream;
  118. module.exports = Stream;