diff --git a/stream/stream.js b/stream/stream.js index bc5d58f5..bc914723 100644 --- a/stream/stream.js +++ b/stream/stream.js @@ -24,11 +24,13 @@ function Stream(value) { var dependentFns = [] function stream(v) { - if (arguments.length && v !== Stream.SKIP && open(stream)) { + if (arguments.length && v !== Stream.SKIP) { value = v - stream.changing() - stream.state = "active" - dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) + if (open(stream)) { + stream.changing() + stream.state = "active" + dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) + } } return value @@ -36,11 +38,11 @@ function Stream(value) { stream.constructor = Stream stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending" + stream.parents = [] stream.changing = function() { open(stream) && (stream.state = "changing") dependentStreams.forEach(function(s) { - s.dependent && s.dependent.changing() s.changing() }) } @@ -49,6 +51,7 @@ function Stream(value) { var target = stream.state === "active" && ignoreInitial !== Stream.SKIP ? Stream(fn(value)) : Stream() + target.parents.push(stream) dependentStreams.push(target) dependentFns.push(fn) @@ -60,8 +63,9 @@ function Stream(value) { end = Stream() end.map(function(value) { if (value === true) { + stream.parents.forEach(function (p) {p.unregisterChild(stream)}) stream.state = "ended" - dependentStreams.length = dependentFns.length = 0 + stream.parents.length = dependentStreams.length = dependentFns.length = 0 } return value }) @@ -73,6 +77,14 @@ function Stream(value) { stream["fantasy-land/map"] = stream.map stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) } + stream.unregisterChild = function(child) { + var childIndex = dependentStreams.indexOf(child) + if (childIndex !== -1) { + dependentStreams.splice(childIndex, 1) + dependentFns.splice(childIndex, 1) + } + } + Object.defineProperty(stream, "end", { get: function() { return end || createEnd() } }) @@ -92,8 +104,8 @@ function combine(fn, streams) { var changed = [] - streams.forEach(function(s) { - s.map(function(value) { + var mappers = streams.map(function(s) { + return s.map(function(value) { changed.push(s) if (ready || streams.every(function(s) { return s.state !== "pending" })) { ready = true @@ -101,7 +113,15 @@ function combine(fn, streams) { changed = [] } return value - }, Stream.SKIP).parent = stream + }, Stream.SKIP) + }) + + var endStream = stream.end.map(function(value) { + if (value === true) { + mappers.forEach(function(mapper) { mapper.end(true) }) + endStream.end(true) + } + return undefined }) return stream diff --git a/stream/tests/test-stream.js b/stream/tests/test-stream.js index 19295183..30f3a1f3 100644 --- a/stream/tests/test-stream.js +++ b/stream/tests/test-stream.js @@ -261,6 +261,15 @@ o.spec("stream", function() { o(thrown.constructor === TypeError).equals(false) o(spy.callCount).equals(0) }) + o("combine callback not called when child stream was ended", function () { + var spy = o.spy() + var a = Stream(1) + var b = Stream(2) + var mapped = Stream.combine(spy, [a, b]) + mapped.end(true) + a(11) + o(spy.callCount).equals(1) + }) }) o.spec("lift", function() { o("transforms value", function() { @@ -479,6 +488,12 @@ o.spec("stream", function() { o(spy.callCount).equals(1) }) + o("ended stream works like a container", function() { + var stream = Stream(1) + stream.end(true) + stream(2) + o(stream()).equals(2) + }) }) o.spec("toJSON", function() { o("works", function() { @@ -544,6 +559,14 @@ o.spec("stream", function() { o(stream["fantasy-land/map"]).equals(stream.map) }) + o("mapping function is not invoked after ending", function () { + var stream = Stream(undefined) + var fn = o.spy() + var mapped = stream.map(fn) + mapped.end(true) + stream(undefined) + o(fn.callCount).equals(1) + }) }) o.spec("ap", function() { o("works", function() {