| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- 'use strict';
- const {Readable} = require('stream');
- const defs = require('../defs');
- const fromIterable = options => {
- if (!options || !options.iterable) {
- options = {iterable: options};
- }
- let fn = options && options.iterable;
- if (fn && typeof fn != 'function') {
- if (typeof fn[Symbol.asyncIterator] == 'function') {
- fn = fn[Symbol.asyncIterator].bind(fn);
- } else if (typeof fn[Symbol.iterator] == 'function') {
- fn = fn[Symbol.iterator].bind(fn);
- } else {
- fn = null;
- }
- }
- if (!fn)
- throw TypeError(
- 'Only a function or an object with an iterator is accepted as the first argument'
- );
- // pump variables
- let paused = Promise.resolve(),
- resolvePaused = null;
- const queue = [];
- // pause/resume
- const resume = () => {
- if (!resolvePaused) return;
- resolvePaused();
- resolvePaused = null;
- paused = Promise.resolve();
- };
- const pause = () => {
- if (resolvePaused) return;
- paused = new Promise(resolve => (resolvePaused = resolve));
- };
- let stream = null; // will be assigned later
- // data processing
- const pushResults = values => {
- if (values && typeof values.next == 'function') {
- // generator
- queue.push(values);
- return;
- }
- // array
- queue.push(values[Symbol.iterator]());
- };
- const pump = async () => {
- while (queue.length) {
- await paused;
- const gen = queue[queue.length - 1];
- let result = gen.next();
- if (result && typeof result.then == 'function') {
- result = await result;
- }
- if (result.done) {
- queue.pop();
- continue;
- }
- let value = result.value;
- if (value && typeof value.then == 'function') {
- value = await value;
- }
- await sanitize(value);
- }
- };
- const sanitize = async value => {
- if (value === undefined || value === null || value === defs.none) return;
- if (value === defs.stop) throw new defs.Stop();
- if (defs.isMany(value)) {
- pushResults(defs.getManyValues(value));
- return pump();
- }
- if (defs.isFinalValue(value)) {
- // a final value is not supported, it is treated as a regular value
- value = defs.getFinalValue(value);
- return processValue(value);
- }
- if (!stream.push(value)) {
- pause();
- }
- };
- const startPump = async () => {
- try {
- const value = fn();
- await processValue(value);
- stream.push(null);
- } catch (error) {
- if (error instanceof defs.Stop) {
- stream.push(null);
- stream.destroy();
- return;
- }
- throw error;
- }
- };
- const processValue = async value => {
- if (value && typeof value.then == 'function') {
- // thenable
- return value.then(value => processValue(value));
- }
- if (value && typeof value.next == 'function') {
- // generator
- pushResults(value);
- return pump();
- }
- return sanitize(value);
- };
- stream = new Readable(
- Object.assign({objectMode: true}, options, {
- read() {
- resume();
- }
- })
- );
- startPump();
- return stream;
- };
- module.exports = fromIterable;
|