fix stream absorption #1196

This commit is contained in:
Leo Horie 2016-08-10 09:12:36 -04:00
parent 3a19dddb22
commit c077cd80de
2 changed files with 74 additions and 50 deletions

View file

@ -1,4 +1,4 @@
"use strict"
//"use strict"
var guid = 0, noop = function() {}, HALT = {}
function createStream() {
@ -53,15 +53,7 @@ function updateStream(stream, value, error) {
function updateState(stream, value, error) {
error = unwrapError(value, error)
if (error !== undefined && typeof stream._state.recover === "function") {
try {
var recovered = stream._state.recover()
if (recovered === HALT) return
updateValues(stream, recovered, undefined)
}
catch (e) {
updateValues(stream, undefined, e)
reportUncaughtError(stream, e)
}
if (!resolve(stream, updateValues, true)) return
}
else updateValues(stream, value, error)
stream._state.changed = true
@ -76,19 +68,21 @@ function updateDependency(stream, mustSync) {
if (parents.length > 0 && parents.filter(active).length === parents.length && (mustSync || parents.filter(changed).length > 0)) {
var failed = parents.filter(errored)
if (failed.length > 0) updateState(stream, undefined, failed[0]._state.error)
else {
try {
var value = state.derive()
if (value === HALT) return
updateState(stream, value, undefined)
}
catch (e) {
updateState(stream, undefined, e)
reportUncaughtError(stream, e)
}
}
else resolve(stream, updateState, false)
}
}
function resolve(stream, update, shouldRecover) {
try {
var value = shouldRecover ? stream._state.recover() : stream._state.derive()
if (value === HALT) return false
update(stream, value, undefined)
}
catch (e) {
update(stream, undefined, e)
reportUncaughtError(stream, e)
}
return true
}
function unwrapError(value, error) {
if (value != null && value.constructor === createStream) {
if (value._state.error !== undefined) error = value._state.error
@ -101,7 +95,7 @@ function finalize(stream) {
for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false
}
function reportUncaughtError(stream, e) {
if (Object.keys(stream._state.deps).length === 0) {
if (Object.keys(stream._state.deps).length === 0 && stream._state.derive == null) {
setTimeout(function() {
if (Object.keys(stream._state.deps).length === 0) console.error(e)
}, 0)
@ -129,11 +123,19 @@ function combine(fn, streams) {
}
function absorb(stream, value) {
if (value != null && value.constructor === createStream) {
value.error.map(stream.error)
value.map(stream)
if (value._state.state === 0) return HALT
if (value._state.error) throw value._state.error
value = value._state.value
var absorbable = value
var update = function() {
updateState(stream, absorbable._state.value, absorbable._state.error)
for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false)
}
absorbable.map(update).catch(function(e) {
update()
throw e
})
if (absorbable._state.state === 0) return HALT
if (absorbable._state.error) throw absorbable._state.error
value = absorbable._state.value
}
return value
}
@ -187,7 +189,7 @@ function reject(e) {
function merge(streams) {
return combine(function () {
return streams.map(function (s) {return s()})
return streams.map(function(s) {return s()})
}, streams)
}