aasStream.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('./defs');
  4. const asStream = (fn, options) => {
  5. if (fn && typeof fn != 'function') {
  6. if (typeof fn[Symbol.asyncIterator] == 'function') {
  7. fn = fn[Symbol.asyncIterator].bind(fn);
  8. } else if (typeof fn[Symbol.iterator] == 'function') {
  9. fn = fn[Symbol.iterator].bind(fn);
  10. } else {
  11. fn = null;
  12. }
  13. }
  14. if (!fn)
  15. throw TypeError(
  16. 'Only a function or an object with an iterator is accepted as the first argument'
  17. );
  18. // pump variables
  19. let paused = Promise.resolve(),
  20. resolvePaused = null;
  21. const queue = [];
  22. // pause/resume
  23. const resume = () => {
  24. if (!resolvePaused) return;
  25. resolvePaused();
  26. resolvePaused = null;
  27. paused = Promise.resolve();
  28. };
  29. const pause = () => {
  30. if (resolvePaused) return;
  31. paused = new Promise(resolve => (resolvePaused = resolve));
  32. };
  33. let stream = null; // will be assigned later
  34. // data processing
  35. const pushResults = values => {
  36. if (values && typeof values.next == 'function') {
  37. // generator
  38. queue.push(values);
  39. return;
  40. }
  41. // array
  42. queue.push(values[Symbol.iterator]());
  43. };
  44. const pump = async () => {
  45. while (queue.length) {
  46. await paused;
  47. const gen = queue[queue.length - 1];
  48. let result = gen.next();
  49. if (result && typeof result.then == 'function') {
  50. result = await result;
  51. }
  52. if (result.done) {
  53. queue.pop();
  54. continue;
  55. }
  56. const value = result.value;
  57. if (value && typeof value.then == 'function') {
  58. value = await value;
  59. }
  60. await sanitize(value);
  61. }
  62. };
  63. const sanitize = async value => {
  64. if (value === undefined || value === null || value === defs.none) return;
  65. if (value === defs.stop) throw new defs.Stop();
  66. if (defs.isMany(value)) {
  67. pushResults(defs.getManyValues(value));
  68. return pump();
  69. }
  70. if (defs.isFinalValue(value)) {
  71. // a final value is not supported, it is treated as a regular value
  72. value = defs.getFinalValue(value);
  73. return processValue(value);
  74. }
  75. if (!stream.push(value)) {
  76. pause();
  77. }
  78. };
  79. const processChunk = async (chunk, encoding) => {
  80. try {
  81. const value = fn(chunk, encoding);
  82. await processValue(value);
  83. } catch (error) {
  84. if (error instanceof defs.Stop) {
  85. stream.push(null);
  86. stream.destroy();
  87. return;
  88. }
  89. throw error;
  90. }
  91. };
  92. const processValue = async value => {
  93. if (value && typeof value.then == 'function') {
  94. // thenable
  95. return value.then(value => processValue(value));
  96. }
  97. if (value && typeof value.next == 'function') {
  98. // generator
  99. pushResults(value);
  100. return pump();
  101. }
  102. return sanitize(value);
  103. };
  104. stream = new Duplex(
  105. Object.assign({}, {writableObjectMode: true, readableObjectMode: true}, options, {
  106. write(chunk, encoding, callback) {
  107. processChunk(chunk, encoding).then(
  108. () => callback(null),
  109. error => callback(error)
  110. );
  111. },
  112. final(callback) {
  113. if (!defs.isFlushable(fn)) {
  114. stream.push(null);
  115. callback(null);
  116. return;
  117. }
  118. processChunk(defs.none, null).then(
  119. () => (stream.push(null), callback(null)),
  120. error => callback(error)
  121. );
  122. },
  123. read() {
  124. resume();
  125. }
  126. })
  127. );
  128. return stream;
  129. };
  130. module.exports = asStream;