| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- 'use strict';
- const defs = require('./defs');
- const next = async (value, fns, index, collect) => {
- let cleanIndex;
- try {
- for (let i = index; i <= fns.length; ++i) {
- if (value && typeof value.then == 'function') {
- // thenable
- value = await value;
- }
- if (value === defs.none) break;
- if (value === defs.stop) {
- cleanIndex = i - 1;
- throw new defs.Stop();
- }
- if (value && value[defs.finalSymbol] === 1) {
- collect(value.value);
- break;
- }
- if (value && value[defs.manySymbol] === 1) {
- const values = value.values;
- if (i == fns.length) {
- values.forEach(val => collect(val));
- } else {
- for (let j = 0; j < values.length; ++j) {
- await next(values[j], fns, i, collect);
- }
- }
- break;
- }
- if (value && typeof value.next == 'function') {
- // generator
- for (;;) {
- let data = value.next();
- if (data && typeof data.then == 'function') {
- data = await data;
- }
- if (data.done) break;
- if (i == fns.length) {
- collect(data.value);
- } else {
- await next(data.value, fns, i, collect);
- }
- }
- break;
- }
- if (i == fns.length) {
- collect(value);
- break;
- }
- cleanIndex = i + 1;
- const f = fns[i];
- value = f(value);
- }
- } catch (error) {
- if (error instanceof defs.Stop) {
- await flush(fns, cleanIndex, collect);
- }
- throw error;
- }
- };
- const flush = async (fns, index, collect) => {
- for (let i = index; i < fns.length; ++i) {
- const f = fns[i];
- if (f[defs.flushSymbol] === 1) {
- await next(f(defs.none), fns, i + 1, collect);
- }
- }
- };
- const collect = (collect, fns) => {
- fns = fns.filter(fn => fn);
- if (fns.length) {
- if (Symbol.asyncIterator && fns[0][Symbol.asyncIterator]) {
- fns[0] = fns[0][Symbol.asyncIterator].bind(fns[0]);
- } else if (Symbol.iterator && fns[0][Symbol.iterator]) {
- fns[0] = fns[0][Symbol.iterator].bind(fns[0]);
- }
- } else {
- fns = [x => x];
- }
- let flushed = false;
- const g = async value => {
- if (flushed) throw Error('Call to a flushed pipe.');
- if (value !== defs.none) {
- await next(value, fns, 0, collect);
- } else {
- flushed = true;
- await flush(fns, 0, collect);
- }
- };
- const needToFlush = fns.some(fn => fn[defs.flushSymbol] === 1);
- return needToFlush ? defs.flushable(g) : g;
- };
- const asArray = (...fns) => {
- let results = null;
- const f = collect(value => results.push(value), fns);
- let g = async value => {
- results = [];
- await f(value);
- const r = results;
- results = null;
- return r;
- };
- if (f[defs.flushSymbol] === 1) g = defs.flushable(g);
- return g;
- };
- const fun = (...fns) => {
- const f = asArray(...fns);
- let g = async value =>
- f(value).then(results => {
- switch (results.length) {
- case 0:
- return defs.none;
- case 1:
- return results[0];
- }
- return {[defs.manySymbol]: 1, values: results};
- });
- if (f[defs.flushSymbol] === 1) g = defs.flushable(g);
- return g;
- };
- fun.next = next;
- fun.collect = collect;
- fun.asArray = asArray;
- Object.assign(fun, defs);
- module.exports = fun;
|