Stream.js 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('../defs');
  4. class Stream extends Duplex {
  5. constructor(fn, options) {
  6. super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
  7. if (typeof fn != 'function') throw Error('Only function is accepted as the first argument');
  8. this._fn = fn;
  9. // pump variables
  10. this._pause = true;
  11. this._queue = [];
  12. this._pending = false;
  13. this._chunk = this._encoding = this._callback = null;
  14. }
  15. _write(chunk, encoding, callback) {
  16. if (this._pause || this._queue.length) {
  17. this._pending = true;
  18. this._chunk = chunk;
  19. this._encoding = encoding;
  20. this._callback = callback;
  21. return;
  22. }
  23. this._process(chunk, encoding).then(
  24. () => callback(null),
  25. error => callback(error)
  26. );
  27. }
  28. _final(callback) {
  29. if (this._pause || this._queue.length) {
  30. this._pending = true;
  31. this._chunk = this._encoding = null;
  32. this._callback = callback;
  33. return;
  34. }
  35. this.push(null);
  36. callback(null);
  37. }
  38. _read() {
  39. this._pause = false;
  40. this._pump(this).then(() => this._pushPending());
  41. }
  42. async _process(chunk, encoding) {
  43. try {
  44. const result = this._fn(chunk, encoding);
  45. if (result && typeof result.then == 'function') {
  46. // thenable
  47. return await result.then(result => this._sanitize(result));
  48. }
  49. if (result && typeof result.next == 'function') {
  50. // generator
  51. this._pushResults(result);
  52. return await this._pump();
  53. }
  54. await this._sanitize(result);
  55. } catch (error) {
  56. if (error instanceof defs.Stop) {
  57. this.push(null);
  58. this.destroy();
  59. this._queue = [];
  60. this._chunk = this._encoding = this._callback = null;
  61. return;
  62. }
  63. throw error;
  64. }
  65. }
  66. async _pump() {
  67. const queue = this._queue;
  68. while (!this._pause && queue.length) {
  69. const gen = queue[queue.length - 1];
  70. let result = gen.next();
  71. if (result && typeof result.then == 'function') {
  72. result = await result;
  73. }
  74. if (result.done) {
  75. queue.pop();
  76. continue;
  77. }
  78. const value = result.value;
  79. if (value && typeof value.then == 'function') {
  80. value = await value;
  81. }
  82. await this._sanitize(value);
  83. }
  84. }
  85. _pushResults(results) {
  86. if (results && typeof results.next == 'function') {
  87. // generator
  88. this._queue.push(results);
  89. } else {
  90. // array
  91. this._queue.push(results[Symbol.iterator]());
  92. }
  93. }
  94. async _sanitize(result) {
  95. if (result !== undefined && result !== null && result === defs.none) return;
  96. if (result === defs.stop) throw new defs.Stop();
  97. if (defs.isMany(result)) {
  98. result = defs.getManyValues(result);
  99. this._pushResults(result);
  100. return this._pump();
  101. }
  102. if (defs.isFinalValue(result)) {
  103. result = defs.getFinalValue(result);
  104. }
  105. this._pause = !this.push(result);
  106. }
  107. async _pushPending() {
  108. if (this._pause || !this._pending) return;
  109. const chunk = this._chunk,
  110. encoding = this._encoding,
  111. callback = this._callback;
  112. this._pending = false;
  113. this._chunk = this._encoding = this._callback = null;
  114. return this._process(chunk, encoding).then(
  115. () => callback(null),
  116. error => callback(error)
  117. );
  118. }
  119. static make(fn, options) {
  120. return new Stream(fn, options);
  121. }
  122. }
  123. Stream.stream = Stream.make;
  124. Stream.make.Constructor = Stream;
  125. module.exports = Stream;