asStream.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. 'use strict';
  2. const {Duplex} = require('stream');
  3. const defs = require('./defs');
  4. const asStream = (fn, options) => {
  5. if (typeof fn != 'function')
  6. throw TypeError(
  7. 'Only a function is accepted as the first argument'
  8. );
  9. // pump variables
  10. let paused = Promise.resolve(),
  11. resolvePaused = null;
  12. const queue = [];
  13. // pause/resume
  14. const resume = () => {
  15. if (!resolvePaused) return;
  16. resolvePaused();
  17. resolvePaused = null;
  18. paused = Promise.resolve();
  19. };
  20. const pause = () => {
  21. if (resolvePaused) return;
  22. paused = new Promise(resolve => (resolvePaused = resolve));
  23. };
  24. let stream = null; // will be assigned later
  25. // data processing
  26. const pushResults = values => {
  27. if (values && typeof values.next == 'function') {
  28. // generator
  29. queue.push(values);
  30. return;
  31. }
  32. // array
  33. queue.push(values[Symbol.iterator]());
  34. };
  35. const pump = async () => {
  36. while (queue.length) {
  37. await paused;
  38. const gen = queue[queue.length - 1];
  39. let result = gen.next();
  40. if (result && typeof result.then == 'function') {
  41. result = await result;
  42. }
  43. if (result.done) {
  44. queue.pop();
  45. continue;
  46. }
  47. let value = result.value;
  48. if (value && typeof value.then == 'function') {
  49. value = await value;
  50. }
  51. await sanitize(value);
  52. }
  53. };
  54. const sanitize = async value => {
  55. if (value === undefined || value === null || value === defs.none) return;
  56. if (value === defs.stop) throw new defs.Stop();
  57. if (defs.isMany(value)) {
  58. pushResults(defs.getManyValues(value));
  59. return pump();
  60. }
  61. if (defs.isFinalValue(value)) {
  62. // a final value is not supported, it is treated as a regular value
  63. value = defs.getFinalValue(value);
  64. return processValue(value);
  65. }
  66. if (!stream.push(value)) {
  67. pause();
  68. }
  69. };
  70. const processChunk = async (chunk, encoding) => {
  71. try {
  72. const value = fn(chunk, encoding);
  73. await processValue(value);
  74. } catch (error) {
  75. if (error instanceof defs.Stop) {
  76. stream.push(null);
  77. stream.destroy();
  78. return;
  79. }
  80. throw error;
  81. }
  82. };
  83. const processValue = async value => {
  84. if (value && typeof value.then == 'function') {
  85. // thenable
  86. return value.then(value => processValue(value));
  87. }
  88. if (value && typeof value.next == 'function') {
  89. // generator
  90. pushResults(value);
  91. return pump();
  92. }
  93. return sanitize(value);
  94. };
  95. stream = new Duplex(
  96. Object.assign({writableObjectMode: true, readableObjectMode: true}, options, {
  97. write(chunk, encoding, callback) {
  98. processChunk(chunk, encoding).then(
  99. () => callback(null),
  100. error => callback(error)
  101. );
  102. },
  103. final(callback) {
  104. if (!defs.isFlushable(fn)) {
  105. stream.push(null);
  106. callback(null);
  107. return;
  108. }
  109. processChunk(defs.none, null).then(
  110. () => (stream.push(null), callback(null)),
  111. error => callback(error)
  112. );
  113. },
  114. read() {
  115. resume();
  116. }
  117. })
  118. );
  119. return stream;
  120. };
  121. module.exports = asStream;