fromIterable.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. 'use strict';
  2. const {Readable} = require('stream');
  3. const defs = require('../defs');
  4. const fromIterable = options => {
  5. if (!options || !options.iterable) {
  6. options = {iterable: options};
  7. }
  8. let fn = options && options.iterable;
  9. if (fn && typeof fn != 'function') {
  10. if (typeof fn[Symbol.asyncIterator] == 'function') {
  11. fn = fn[Symbol.asyncIterator].bind(fn);
  12. } else if (typeof fn[Symbol.iterator] == 'function') {
  13. fn = fn[Symbol.iterator].bind(fn);
  14. } else {
  15. fn = null;
  16. }
  17. }
  18. if (!fn)
  19. throw TypeError(
  20. 'Only a function or an object with an iterator is accepted as the first argument'
  21. );
  22. // pump variables
  23. let paused = Promise.resolve(),
  24. resolvePaused = null;
  25. const queue = [];
  26. // pause/resume
  27. const resume = () => {
  28. if (!resolvePaused) return;
  29. resolvePaused();
  30. resolvePaused = null;
  31. paused = Promise.resolve();
  32. };
  33. const pause = () => {
  34. if (resolvePaused) return;
  35. paused = new Promise(resolve => (resolvePaused = resolve));
  36. };
  37. let stream = null; // will be assigned later
  38. // data processing
  39. const pushResults = values => {
  40. if (values && typeof values.next == 'function') {
  41. // generator
  42. queue.push(values);
  43. return;
  44. }
  45. // array
  46. queue.push(values[Symbol.iterator]());
  47. };
  48. const pump = async () => {
  49. while (queue.length) {
  50. await paused;
  51. const gen = queue[queue.length - 1];
  52. let result = gen.next();
  53. if (result && typeof result.then == 'function') {
  54. result = await result;
  55. }
  56. if (result.done) {
  57. queue.pop();
  58. continue;
  59. }
  60. let value = result.value;
  61. if (value && typeof value.then == 'function') {
  62. value = await value;
  63. }
  64. await sanitize(value);
  65. }
  66. };
  67. const sanitize = async value => {
  68. if (value === undefined || value === null || value === defs.none) return;
  69. if (value === defs.stop) throw new defs.Stop();
  70. if (defs.isMany(value)) {
  71. pushResults(defs.getManyValues(value));
  72. return pump();
  73. }
  74. if (defs.isFinalValue(value)) {
  75. // a final value is not supported, it is treated as a regular value
  76. value = defs.getFinalValue(value);
  77. return processValue(value);
  78. }
  79. if (!stream.push(value)) {
  80. pause();
  81. }
  82. };
  83. const startPump = async () => {
  84. try {
  85. const value = fn();
  86. await processValue(value);
  87. stream.push(null);
  88. } catch (error) {
  89. if (error instanceof defs.Stop) {
  90. stream.push(null);
  91. stream.destroy();
  92. return;
  93. }
  94. throw error;
  95. }
  96. };
  97. const processValue = async value => {
  98. if (value && typeof value.then == 'function') {
  99. // thenable
  100. return value.then(value => processValue(value));
  101. }
  102. if (value && typeof value.next == 'function') {
  103. // generator
  104. pushResults(value);
  105. return pump();
  106. }
  107. return sanitize(value);
  108. };
  109. stream = new Readable(
  110. Object.assign({objectMode: true}, options, {
  111. read() {
  112. resume();
  113. }
  114. })
  115. );
  116. startPump();
  117. return stream;
  118. };
  119. module.exports = fromIterable;