|
|
@@ -3,9 +3,6 @@
|
|
|
const EventEmitter = require('events');
|
|
|
const {Duplex, Transform} = require('stream');
|
|
|
|
|
|
-const GeneratorFunction = Object.getPrototypeOf(function*() {}).constructor;
|
|
|
-const AsyncFunction = Object.getPrototypeOf(async function() {}).constructor;
|
|
|
-
|
|
|
function processData(result, stream, callback) {
|
|
|
if (result !== undefined && result !== null) {
|
|
|
if (result instanceof Array) {
|
|
|
@@ -27,35 +24,30 @@ class Chain extends EventEmitter {
|
|
|
|
|
|
this.streams = fns.map((fn, index) => {
|
|
|
let transform, stream;
|
|
|
- if (fn instanceof AsyncFunction) {
|
|
|
- transform = function(chunk, encoding, callback) {
|
|
|
- fn.call(this, chunk, encoding).then(result => processData(result, this, callback), error => callback(error));
|
|
|
- };
|
|
|
- } else if (fn instanceof GeneratorFunction) {
|
|
|
+ if (typeof fn === 'function') {
|
|
|
transform = function(chunk, encoding, callback) {
|
|
|
try {
|
|
|
- const generator = fn(chunk, encoding);
|
|
|
- while (true) {
|
|
|
- const result = generator.next();
|
|
|
- processData(result.value, this);
|
|
|
- if (result.done) break;
|
|
|
+ const result = fn.call(this, chunk, encoding);
|
|
|
+ if (result && typeof result.then == 'function') {
|
|
|
+ // Promise
|
|
|
+ result.then(result => processData(result, this, callback), error => callback(error));
|
|
|
+ } else if (result && typeof result.next == 'function') {
|
|
|
+ // generator
|
|
|
+ while (true) {
|
|
|
+ const data = result.next();
|
|
|
+ processData(data.value, this);
|
|
|
+ if (data.done) break;
|
|
|
+ }
|
|
|
+ callback(null);
|
|
|
+ } else {
|
|
|
+ processData(result, this, callback);
|
|
|
}
|
|
|
- callback(null);
|
|
|
} catch (error) {
|
|
|
callback(error);
|
|
|
}
|
|
|
};
|
|
|
} else if (fn instanceof Duplex || fn instanceof Transform) {
|
|
|
stream = fn;
|
|
|
- } else if (typeof fn === 'function') {
|
|
|
- transform = function(chunk, encoding, callback) {
|
|
|
- try {
|
|
|
- const result = fn.call(this, chunk, encoding);
|
|
|
- processData(result, this, callback);
|
|
|
- } catch (error) {
|
|
|
- callback(error);
|
|
|
- }
|
|
|
- };
|
|
|
} else {
|
|
|
throw Error('Arguments should be functions or streams.');
|
|
|
}
|