fun.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. 'use strict';
  2. const defs = require('./defs');
  3. const next = async (value, fns, index, collect) => {
  4. let cleanIndex;
  5. try {
  6. for (let i = index; i <= fns.length; ++i) {
  7. if (value && typeof value.then == 'function') {
  8. // thenable
  9. value = await value;
  10. }
  11. if (value === defs.none) break;
  12. if (value === defs.stop) {
  13. cleanIndex = i - 1;
  14. throw new defs.Stop();
  15. }
  16. if (value && value[defs.finalSymbol] === 1) {
  17. collect(value.value);
  18. break;
  19. }
  20. if (value && value[defs.manySymbol] === 1) {
  21. const values = value.values;
  22. if (i == fns.length) {
  23. values.forEach(val => collect(val));
  24. } else {
  25. for (let j = 0; j < values.length; ++j) {
  26. await next(values[j], fns, i, collect);
  27. }
  28. }
  29. break;
  30. }
  31. if (value && typeof value.next == 'function') {
  32. // generator
  33. for (;;) {
  34. let data = value.next();
  35. if (data && typeof data.then == 'function') {
  36. data = await data;
  37. }
  38. if (data.done) break;
  39. if (i == fns.length) {
  40. collect(data.value);
  41. } else {
  42. await next(data.value, fns, i, collect);
  43. }
  44. }
  45. break;
  46. }
  47. if (i == fns.length) {
  48. collect(value);
  49. break;
  50. }
  51. cleanIndex = i + 1;
  52. const f = fns[i];
  53. value = f(value);
  54. }
  55. } catch (error) {
  56. if (error instanceof defs.Stop) {
  57. await flush(fns, cleanIndex, collect);
  58. }
  59. throw error;
  60. }
  61. };
  62. const flush = async (fns, index, collect) => {
  63. for (let i = index; i < fns.length; ++i) {
  64. const f = fns[i];
  65. if (f[defs.flushSymbol] === 1) {
  66. await next(f(defs.none), fns, i + 1, collect);
  67. }
  68. }
  69. };
  70. const collect = (collect, fns) => {
  71. fns = fns
  72. .filter(fn => fn)
  73. .flat(Infinity)
  74. .map(fn => (defs.isFunctionList(fn) ? defs.getFunctionList(fn) : fn))
  75. .flat(Infinity);
  76. if (!fns.length) {
  77. fns = [x => x];
  78. }
  79. let flushed = false;
  80. let g = async value => {
  81. if (flushed) throw Error('Call to a flushed pipe.');
  82. if (value !== defs.none) {
  83. await next(value, fns, 0, collect);
  84. } else {
  85. flushed = true;
  86. await flush(fns, 0, collect);
  87. }
  88. };
  89. const needToFlush = fns.some(fn => defs.isFlushable(fn));
  90. if (needToFlush) g = defs.flushable(g);
  91. return defs.setFunctionList(g, fns);
  92. };
  93. const asArray = (...fns) => {
  94. let results = null;
  95. const f = collect(value => results.push(value), fns);
  96. let g = async value => {
  97. results = [];
  98. await f(value);
  99. const r = results;
  100. results = null;
  101. return r;
  102. };
  103. if (defs.isFlushable(f)) g = defs.flushable(g);
  104. return defs.setFunctionList(g, defs.getFunctionList(f));
  105. };
  106. const fun = (...fns) => {
  107. const f = asArray(...fns);
  108. let g = value =>
  109. f(value).then(results => {
  110. switch (results.length) {
  111. case 0:
  112. return defs.none;
  113. case 1:
  114. return results[0];
  115. }
  116. return {[defs.manySymbol]: 1, values: results};
  117. });
  118. if (defs.isFlushable(f)) g = defs.flushable(g);
  119. return defs.setFunctionList(g, defs.getFunctionList(f));
  120. };
  121. module.exports = fun;
  122. module.exports.next = next;
  123. module.exports.collect = collect;
  124. module.exports.asArray = asArray;