|
|
@@ -23,9 +23,15 @@ class Stream extends Duplex {
|
|
|
);
|
|
|
}
|
|
|
_final(callback) {
|
|
|
- // TODO: add isFlushable()
|
|
|
- this.push(null);
|
|
|
- callback(null);
|
|
|
+ if (!defs.isFlushable(this._fn)) {
|
|
|
+ this.push(null);
|
|
|
+ callback(null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ this._processChunk(defs.none, null).then(
|
|
|
+ () => callback(null),
|
|
|
+ error => callback(error)
|
|
|
+ );
|
|
|
}
|
|
|
_read() {
|
|
|
this._resume();
|
|
|
@@ -83,9 +89,9 @@ class Stream extends Duplex {
|
|
|
}
|
|
|
|
|
|
if (defs.isFinalValue(value)) {
|
|
|
+ // a final value is not supported, it is treated as a regular value
|
|
|
value = defs.getFinalValue(value);
|
|
|
- await this._processValue(value);
|
|
|
- throw new defs.Stop(); // is it the correct handling of the final value?
|
|
|
+ return this._processValue(value);
|
|
|
}
|
|
|
|
|
|
if (!this.push(value)) {
|
|
|
@@ -100,7 +106,6 @@ class Stream extends Duplex {
|
|
|
if (error instanceof defs.Stop) {
|
|
|
this.push(null);
|
|
|
this.destroy();
|
|
|
- // clean up
|
|
|
return;
|
|
|
}
|
|
|
throw error;
|