瀏覽代碼

Imported files from object-stream.

Eugene Lazutkin 5 年之前
父節點
當前提交
343f96263f
共有 3 個文件被更改,包括 357 次插入0 次删除
  1. 135 0
      fun.js
  2. 87 0
      gen.js
  3. 135 0
      utils/Stream.js

+ 135 - 0
fun.js

@@ -0,0 +1,135 @@
+'use strict';
+
+const defs = require('./defs');
+
+const next = async (value, fns, index, collect) => {
+  let cleanIndex;
+  try {
+    for (let i = index; i <= fns.length; ++i) {
+      if (value && typeof value.then == 'function') {
+        // thenable
+        value = await value;
+      }
+      if (value === defs.none) break;
+      if (value === defs.stop) {
+        cleanIndex = i - 1;
+        throw new defs.Stop();
+      }
+      if (value && value[defs.finalSymbol] === 1) {
+        collect(value.value);
+        break;
+      }
+      if (value && value[defs.manySymbol] === 1) {
+        const values = value.values;
+        if (i == fns.length) {
+          values.forEach(val => collect(val));
+        } else {
+          for (let j = 0; j < values.length; ++j) {
+            await next(values[j], fns, i, collect);
+          }
+        }
+        break;
+      }
+      if (value && typeof value.next == 'function') {
+        // generator
+        for (;;) {
+          let data = value.next();
+          if (data && typeof data.then == 'function') {
+            data = await data;
+          }
+          if (data.done) break;
+          if (i == fns.length) {
+            collect(data.value);
+          } else {
+            await next(data.value, fns, i, collect);
+          }
+        }
+        break;
+      }
+      if (i == fns.length) {
+        collect(value);
+        break;
+      }
+      cleanIndex = i + 1;
+      const f = fns[i];
+      value = f(value);
+    }
+  } catch (error) {
+    if (error instanceof defs.Stop) {
+      await flush(fns, cleanIndex, collect);
+    }
+    throw error;
+  }
+};
+
+const flush = async (fns, index, collect) => {
+  for (let i = index; i < fns.length; ++i) {
+    const f = fns[i];
+    if (f[defs.flushSymbol] === 1) {
+      await next(f(defs.none), fns, i + 1, collect);
+    }
+  }
+};
+
+const collect = (collect, fns) => {
+  fns = fns.filter(fn => fn);
+  if (fns.length) {
+    if (Symbol.asyncIterator && fns[0][Symbol.asyncIterator]) {
+      fns[0] = fns[0][Symbol.asyncIterator].bind(fns[0]);
+    } else if (Symbol.iterator && fns[0][Symbol.iterator]) {
+      fns[0] = fns[0][Symbol.iterator].bind(fns[0]);
+    }
+  } else {
+    fns = [x => x];
+  }
+  let flushed = false;
+  const g = async value => {
+    if (flushed) throw Error('Call to a flushed pipe.');
+    if (value !== defs.none) {
+      await next(value, fns, 0, collect);
+    } else {
+      flushed = true;
+      await flush(fns, 0, collect);
+    }
+  };
+  const needToFlush = fns.some(fn => fn[defs.flushSymbol] === 1);
+  return needToFlush ? defs.flushable(g) : g;
+};
+
+const asArray = (...fns) => {
+  let results = null;
+  const f = collect(value => results.push(value), fns);
+  let g = async value => {
+    results = [];
+    await f(value);
+    const r = results;
+    results = null;
+    return r;
+  };
+  if (f[defs.flushSymbol] === 1) g = defs.flushable(g);
+  return g;
+};
+
+const fun = (...fns) => {
+  const f = asArray(...fns);
+  let g = async value =>
+    f(value).then(results => {
+      switch (results.length) {
+        case 0:
+          return defs.none;
+        case 1:
+          return results[0];
+      }
+      return {[defs.manySymbol]: 1, values: results};
+    });
+  if (f[defs.flushSymbol] === 1) g = defs.flushable(g);
+  return g;
+};
+
+fun.next = next;
+fun.collect = collect;
+fun.asArray = asArray;
+
+Object.assign(fun, defs);
+
+module.exports = fun;

+ 87 - 0
gen.js

@@ -0,0 +1,87 @@
+'use strict';
+
+const defs = require('./defs');
+
+const next = async function* (value, fns, index) {
+  for (let i = index; i <= fns.length; ++i) {
+    if (value && typeof value.then == 'function') {
+      // thenable
+      value = await value;
+    }
+    if (value === defs.none) break;
+    if (value === defs.stop) throw new defs.Stop();
+    if (value && value[defs.finalSymbol] === 1) {
+      yield value.value;
+      break;
+    }
+    if (value && value[defs.manySymbol] === 1) {
+      const values = value.values;
+      if (i == fns.length) {
+        yield* values;
+      } else {
+        for (let j = 0; j < values.length; ++j) {
+          yield* next(values[j], fns, i);
+        }
+      }
+      break;
+    }
+    if (value && typeof value.next == 'function') {
+      // generator
+      for (;;) {
+        let data = value.next();
+        if (data && typeof data.then == 'function') {
+          data = await data;
+        }
+        if (data.done) break;
+        if (i == fns.length) {
+          yield data.value;
+        } else {
+          yield* next(data.value, fns, i);
+        }
+      }
+      break;
+    }
+    if (i == fns.length) {
+      yield value;
+      break;
+    }
+    const f = fns[i];
+    value = f(value);
+  }
+};
+
+const gen = (...fns) => {
+  fns = fns.filter(fn => fn);
+  if (fns.length) {
+    if (Symbol.asyncIterator && fns[0][Symbol.asyncIterator]) {
+      fns[0] = fns[0][Symbol.asyncIterator].bind(fns[0]);
+    } else if (Symbol.iterator && fns[0][Symbol.iterator]) {
+      fns[0] = fns[0][Symbol.iterator].bind(fns[0]);
+    }
+  } else {
+    fns = [x => x];
+  }
+  let flushed = false;
+  const g = async function* (value) {
+    if (flushed) throw Error('Call to a flushed pipe.');
+    if (value !== defs.none) {
+      yield* next(value, fns, 0);
+    } else {
+      flushed = true;
+      for (let i = 0; i < fns.length; ++i) {
+        const f = fns[i];
+        if (f[defs.flushSymbol] === 1) {
+          yield* next(f(defs.none), fns, i + 1);
+        }
+      }
+    }
+  };
+  const needToFlush = fns.some(fn => fn[defs.flushSymbol] === 1);
+  return needToFlush ? defs.flushable(g) : g;
+};
+
+gen.next = next;
+
+Object.assign(gen, defs);
+
+module.exports = gen;

+ 135 - 0
utils/Stream.js

@@ -0,0 +1,135 @@
+'use strict';
+
+const {Duplex} = require('stream');
+const defs = require('../defs');
+
+class Stream extends Duplex {
+  constructor(fn, options) {
+    super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
+    if (typeof fn != 'function') throw Error('Only function is accepted as the first argument');
+    this._fn = fn;
+    // pump variables
+    this._pause = true;
+    this._queue = [];
+    this._pending = false;
+    this._chunk = this._encoding = this._callback = null;
+  }
+
+  _write(chunk, encoding, callback) {
+    if (this._pause || this._queue.length) {
+      this._pending = true;
+      this._chunk = chunk;
+      this._encoding = encoding;
+      this._callback = callback;
+      return;
+    }
+    this._process(chunk, encoding).then(
+      () => callback(null),
+      error => callback(error)
+    );
+  }
+  _final(callback) {
+    if (this._pause || this._queue.length) {
+      this._pending = true;
+      this._chunk = this._encoding = null;
+      this._callback = callback;
+      return;
+    }
+    this.push(null);
+    callback(null);
+  }
+  _read() {
+    this._pause = false;
+    this._pump(this).then(() => this._pushPending());
+  }
+
+  async _process(chunk, encoding) {
+    try {
+      const result = this._fn(chunk, encoding);
+      if (result && typeof result.then == 'function') {
+        // thenable
+        return await result.then(result => this._sanitize(result));
+      }
+      if (result && typeof result.next == 'function') {
+        // generator
+        this._pushResults(result);
+        return await this._pump();
+      }
+      await this._sanitize(result);
+    } catch (error) {
+      if (error instanceof defs.Stop) {
+        this.push(null);
+        this.destroy();
+        this._queue = [];
+        this._chunk = this._encoding = this._callback = null;
+        return;
+      }
+      throw error;
+    }
+  }
+  async _pump() {
+    const queue = this._queue;
+    while (!this._pause && queue.length) {
+      const gen = queue[queue.length - 1];
+      let result = gen.next();
+      if (result && typeof result.then == 'function') {
+        result = await result;
+      }
+      if (result.done) {
+        queue.pop();
+        continue;
+      }
+      const value = result.value;
+      if (value && typeof value.then == 'function') {
+        value = await value;
+      }
+      await this._sanitize(value);
+    }
+  }
+  _pushResults(results) {
+    if (results && typeof results.next == 'function') {
+      // generator
+      this._queue.push(results);
+    } else {
+      // array
+      this._queue.push(results[Symbol.iterator]());
+    }
+  }
+  async _sanitize(result) {
+    if (result !== undefined && result !== null && result === defs.none) return;
+    if (result === defs.stop) throw new defs.Stop();
+
+    if (defs.isMany(result)) {
+      result = defs.getManyValues(result);
+      this._pushResults(result);
+      return this._pump();
+    }
+
+    if (defs.isFinalValue(result)) {
+      result = defs.getFinalValue(result);
+    }
+
+    this._pause = !this.push(result);
+  }
+  async _pushPending() {
+    if (this._pause || !this._pending) return;
+    const chunk = this._chunk,
+      encoding = this._encoding,
+      callback = this._callback;
+    this._pending = false;
+    this._chunk = this._encoding = this._callback = null;
+    return this._process(chunk, encoding).then(
+      () => callback(null),
+      error => callback(error)
+    );
+  }
+
+  static make(fn, options) {
+    return new Stream(fn, options);
+  }
+}
+
+Stream.stream = Stream.make;
+Stream.make.Constructor = Stream;
+
+module.exports = Stream;