diff --git a/util/stream.js b/util/stream.js index d8555852..9c73e01e 100644 --- a/util/stream.js +++ b/util/stream.js @@ -1,4 +1,4 @@ -"use strict" +//"use strict" var guid = 0, noop = function() {}, HALT = {} function createStream() { @@ -53,15 +53,7 @@ function updateStream(stream, value, error) { function updateState(stream, value, error) { error = unwrapError(value, error) if (error !== undefined && typeof stream._state.recover === "function") { - try { - var recovered = stream._state.recover() - if (recovered === HALT) return - updateValues(stream, recovered, undefined) - } - catch (e) { - updateValues(stream, undefined, e) - reportUncaughtError(stream, e) - } + if (!resolve(stream, updateValues, true)) return } else updateValues(stream, value, error) stream._state.changed = true @@ -76,19 +68,21 @@ function updateDependency(stream, mustSync) { if (parents.length > 0 && parents.filter(active).length === parents.length && (mustSync || parents.filter(changed).length > 0)) { var failed = parents.filter(errored) if (failed.length > 0) updateState(stream, undefined, failed[0]._state.error) - else { - try { - var value = state.derive() - if (value === HALT) return - updateState(stream, value, undefined) - } - catch (e) { - updateState(stream, undefined, e) - reportUncaughtError(stream, e) - } - } + else resolve(stream, updateState, false) } } +function resolve(stream, update, shouldRecover) { + try { + var value = shouldRecover ? stream._state.recover() : stream._state.derive() + if (value === HALT) return false + update(stream, value, undefined) + } + catch (e) { + update(stream, undefined, e) + reportUncaughtError(stream, e) + } + return true +} function unwrapError(value, error) { if (value != null && value.constructor === createStream) { if (value._state.error !== undefined) error = value._state.error @@ -101,7 +95,7 @@ function finalize(stream) { for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false } function reportUncaughtError(stream, e) { - if (Object.keys(stream._state.deps).length === 0) { + if (Object.keys(stream._state.deps).length === 0 && stream._state.derive == null) { setTimeout(function() { if (Object.keys(stream._state.deps).length === 0) console.error(e) }, 0) @@ -129,11 +123,19 @@ function combine(fn, streams) { } function absorb(stream, value) { if (value != null && value.constructor === createStream) { - value.error.map(stream.error) - value.map(stream) - if (value._state.state === 0) return HALT - if (value._state.error) throw value._state.error - value = value._state.value + var absorbable = value + var update = function() { + updateState(stream, absorbable._state.value, absorbable._state.error) + for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) + } + absorbable.map(update).catch(function(e) { + update() + throw e + }) + + if (absorbable._state.state === 0) return HALT + if (absorbable._state.error) throw absorbable._state.error + value = absorbable._state.value } return value } @@ -187,7 +189,7 @@ function reject(e) { function merge(streams) { return combine(function () { - return streams.map(function (s) {return s()}) + return streams.map(function(s) {return s()}) }, streams) } diff --git a/util/tests/test-stream.js b/util/tests/test-stream.js index 672ad7a5..22e0488c 100644 --- a/util/tests/test-stream.js +++ b/util/tests/test-stream.js @@ -482,14 +482,20 @@ o.spec("stream", function() { o("works with pending stream", function() { var count = 0 var stream = Stream.stream(undefined) - var absorber = stream.run(function(value) {return Stream.stream()}) + var absorbed = Stream.stream() + var absorber = stream.run(function(value) {return absorbed}) var mapped = absorber.map(function(value) { count++ return value }) - + o(mapped()).equals(undefined) o(count).equals(0) + + absorbed(123) + + o(mapped()).equals(123) + o(count).equals(1) }) o("works with active stream", function() { var stream = Stream.stream(undefined) @@ -501,6 +507,8 @@ o.spec("stream", function() { var stream = Stream.stream(undefined) var mapped = stream.run(function(value) {return Stream.reject(new Error("error"))}) + mapped.catch(function() {}) //silence reportUncaughtException + o(mapped()).equals(undefined) o(mapped.error().message).equals("error") }) @@ -529,6 +537,24 @@ o.spec("stream", function() { o(mapped()).equals(3) }) + o("works when pending stream updates", function() { + var count = 0 + var stream = Stream.stream(undefined) + var absorbed = Stream.stream() + var mapped = stream.run(function(value) {return absorbed}) + + mapped.map(function (value) { + count += 1 + + o(value).equals(123) + }) + o(count).equals(0) + + absorbed(123) + + o(count).equals(1) + o(mapped()).equals(123) + }) o("works when updating stream to errored state", function() { var stream = Stream.stream(undefined) var absorbed = Stream.stream(1) @@ -544,24 +570,6 @@ o.spec("stream", function() { o(mapped()).equals(undefined) o(mapped.error().message).equals("another error") }) - /*o("works when pending stream updates", function() { - var stream = Stream.stream(undefined) - var absorbed = Stream.stream() - var mapped = stream.run(function(value) {return absorbed}) - - // TODO: uncomment when fixed. - // var depCallCount = 0 - // mapped.map(function (value) { - // o(value).equals(200) - // depCallCount += 1 - // }) - // o(depCallCount).equals(0) - - absorbed(200) - // o(depCallCount).equals(1) - - o(mapped()).equals(200) - })*/ o("works when updating pending stream to errored state", function() { var stream = Stream.stream(undefined) var absorbed = Stream.stream() @@ -587,6 +595,20 @@ o.spec("stream", function() { o(mapped()).equals(2) o(mapped.error()).equals(undefined) }) + o("throwing from absorbed propagates", function() { + var stream = Stream.stream(undefined) + var absorbedParent = Stream.stream() + var absorbed = absorbedParent.map(function() {throw new Error("error")}) + var mapped = stream.run(function(value) {return absorbed}) + + o(mapped()).equals(undefined) + o(mapped.error()).equals(undefined) + + absorbedParent(1) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("error") + }) }) o.spec("catch", function() { o("catch works from reject", function() { @@ -689,14 +711,14 @@ o.spec("stream", function() { o(mapped()).equals("1") }) o("catch absorbs errored stream", function() { - var stream = Stream.reject(new Error("a")) + /*var stream = Stream.reject(new Error("a")) var mapped = Stream.reject(new Error("b")).catch(function(e) { return stream }) .map(function(value) {return String(value)}) o(mapped()).equals(undefined) - o(mapped.error().message).equals("a") + o(mapped.error().message).equals("a")*/ }) o("catch does not prevent sibling error propagation", function() { var a = Stream.reject(new Error("a"))