diff --git a/stream/stream.mjs b/stream/stream.mjs index f67e8438..37a0a8ba 100644 --- a/stream/stream.mjs +++ b/stream/stream.mjs @@ -21,11 +21,13 @@ function Stream(value) { var dependentFns = [] function stream(v) { - if (arguments.length && v !== Stream.SKIP && open(stream)) { + if (arguments.length && v !== Stream.SKIP) { value = v - stream.changing() - stream.state = "active" - dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) + if (open(stream)) { + stream.changing() + stream.state = "active" + dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) + } } return value @@ -33,11 +35,11 @@ function Stream(value) { stream.constructor = Stream stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending" + stream.parents = [] stream.changing = function() { open(stream) && (stream.state = "changing") dependentStreams.forEach(function(s) { - s.dependent && s.dependent.changing() s.changing() }) } @@ -46,6 +48,7 @@ function Stream(value) { var target = stream.state === "active" && ignoreInitial !== Stream.SKIP ? Stream(fn(value)) : Stream() + target.parents.push(stream) dependentStreams.push(target) dependentFns.push(fn) @@ -57,8 +60,9 @@ function Stream(value) { end = Stream() end.map(function(value) { if (value === true) { + stream.parents.forEach(function (p) {p.unregisterChild(stream)}) stream.state = "ended" - dependentStreams.length = dependentFns.length = 0 + stream.parents.length = dependentStreams.length = dependentFns.length = 0 } return value }) @@ -70,6 +74,14 @@ function Stream(value) { stream["fantasy-land/map"] = stream.map stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) } + stream.unregisterChild = function(child) { + var childIndex = dependentStreams.indexOf(child) + if (childIndex !== -1) { + dependentStreams.splice(childIndex, 1) + dependentFns.splice(childIndex, 1) + } + } + Object.defineProperty(stream, "end", { get: function() { return end || createEnd() } }) @@ -89,8 +101,8 @@ function combine(fn, streams) { var changed = [] - streams.forEach(function(s) { - s.map(function(value) { + var mappers = streams.map(function(s) { + return s.map(function(value) { changed.push(s) if (ready || streams.every(function(s) { return s.state !== "pending" })) { ready = true @@ -98,7 +110,15 @@ function combine(fn, streams) { changed = [] } return value - }, Stream.SKIP).parent = stream + }, Stream.SKIP) + }) + + var endStream = stream.end.map(function(value) { + if (value === true) { + mappers.forEach(function(mapper) { mapper.end(true) }) + endStream.end(true) + } + return undefined }) return stream