fun.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. 'use strict';
  2. const defs = require('./defs');
  3. const next = async (value, fns, index, collect) => {
  4. let cleanIndex;
  5. try {
  6. for (let i = index; i <= fns.length; ++i) {
  7. if (value && typeof value.then == 'function') {
  8. // thenable
  9. value = await value;
  10. }
  11. if (value === defs.none) break;
  12. if (value === defs.stop) {
  13. cleanIndex = i - 1;
  14. throw new defs.Stop();
  15. }
  16. if (value && value[defs.finalSymbol] === 1) {
  17. collect(value.value);
  18. break;
  19. }
  20. if (value && value[defs.manySymbol] === 1) {
  21. const values = value.values;
  22. if (i == fns.length) {
  23. values.forEach(val => collect(val));
  24. } else {
  25. for (let j = 0; j < values.length; ++j) {
  26. await next(values[j], fns, i, collect);
  27. }
  28. }
  29. break;
  30. }
  31. if (value && typeof value.next == 'function') {
  32. // generator
  33. for (;;) {
  34. let data = value.next();
  35. if (data && typeof data.then == 'function') {
  36. data = await data;
  37. }
  38. if (data.done) break;
  39. if (i == fns.length) {
  40. collect(data.value);
  41. } else {
  42. await next(data.value, fns, i, collect);
  43. }
  44. }
  45. break;
  46. }
  47. if (i == fns.length) {
  48. collect(value);
  49. break;
  50. }
  51. cleanIndex = i + 1;
  52. const f = fns[i];
  53. value = f(value);
  54. }
  55. } catch (error) {
  56. if (error instanceof defs.Stop) {
  57. await flush(fns, cleanIndex, collect);
  58. }
  59. throw error;
  60. }
  61. };
  62. const flush = async (fns, index, collect) => {
  63. for (let i = index; i < fns.length; ++i) {
  64. const f = fns[i];
  65. if (f[defs.flushSymbol] === 1) {
  66. await next(f(defs.none), fns, i + 1, collect);
  67. }
  68. }
  69. };
  70. const collect = (collect, fns) => {
  71. fns = fns.filter(fn => fn);
  72. if (fns.length) {
  73. if (Symbol.asyncIterator && fns[0][Symbol.asyncIterator]) {
  74. fns[0] = fns[0][Symbol.asyncIterator].bind(fns[0]);
  75. } else if (Symbol.iterator && fns[0][Symbol.iterator]) {
  76. fns[0] = fns[0][Symbol.iterator].bind(fns[0]);
  77. }
  78. } else {
  79. fns = [x => x];
  80. }
  81. let flushed = false;
  82. const g = async value => {
  83. if (flushed) throw Error('Call to a flushed pipe.');
  84. if (value !== defs.none) {
  85. await next(value, fns, 0, collect);
  86. } else {
  87. flushed = true;
  88. await flush(fns, 0, collect);
  89. }
  90. };
  91. const needToFlush = fns.some(fn => fn[defs.flushSymbol] === 1);
  92. return needToFlush ? defs.flushable(g) : g;
  93. };
  94. const asArray = (...fns) => {
  95. let results = null;
  96. const f = collect(value => results.push(value), fns);
  97. let g = async value => {
  98. results = [];
  99. await f(value);
  100. const r = results;
  101. results = null;
  102. return r;
  103. };
  104. if (f[defs.flushSymbol] === 1) g = defs.flushable(g);
  105. return g;
  106. };
  107. const fun = (...fns) => {
  108. const f = asArray(...fns);
  109. let g = async value =>
  110. f(value).then(results => {
  111. switch (results.length) {
  112. case 0:
  113. return defs.none;
  114. case 1:
  115. return results[0];
  116. }
  117. return {[defs.manySymbol]: 1, values: results};
  118. });
  119. if (f[defs.flushSymbol] === 1) g = defs.flushable(g);
  120. return g;
  121. };
  122. fun.next = next;
  123. fun.collect = collect;
  124. fun.asArray = asArray;
  125. Object.assign(fun, defs);
  126. module.exports = fun;