Fix Stream.end() (#2369)

This commit is contained in:
Ivan Kupalov 2019-02-07 09:44:31 +01:00 committed by Isiah Meadows
parent 415862880d
commit 7eea1b1e62
2 changed files with 52 additions and 9 deletions

View file

@ -24,11 +24,13 @@ function Stream(value) {
var dependentFns = []
function stream(v) {
if (arguments.length && v !== Stream.SKIP && open(stream)) {
if (arguments.length && v !== Stream.SKIP) {
value = v
stream.changing()
stream.state = "active"
dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
if (open(stream)) {
stream.changing()
stream.state = "active"
dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
}
}
return value
@ -36,11 +38,11 @@ function Stream(value) {
stream.constructor = Stream
stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending"
stream.parents = []
stream.changing = function() {
open(stream) && (stream.state = "changing")
dependentStreams.forEach(function(s) {
s.dependent && s.dependent.changing()
s.changing()
})
}
@ -49,6 +51,7 @@ function Stream(value) {
var target = stream.state === "active" && ignoreInitial !== Stream.SKIP
? Stream(fn(value))
: Stream()
target.parents.push(stream)
dependentStreams.push(target)
dependentFns.push(fn)
@ -60,8 +63,9 @@ function Stream(value) {
end = Stream()
end.map(function(value) {
if (value === true) {
stream.parents.forEach(function (p) {p.unregisterChild(stream)})
stream.state = "ended"
dependentStreams.length = dependentFns.length = 0
stream.parents.length = dependentStreams.length = dependentFns.length = 0
}
return value
})
@ -73,6 +77,14 @@ function Stream(value) {
stream["fantasy-land/map"] = stream.map
stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) }
stream.unregisterChild = function(child) {
var childIndex = dependentStreams.indexOf(child)
if (childIndex !== -1) {
dependentStreams.splice(childIndex, 1)
dependentFns.splice(childIndex, 1)
}
}
Object.defineProperty(stream, "end", {
get: function() { return end || createEnd() }
})
@ -92,8 +104,8 @@ function combine(fn, streams) {
var changed = []
streams.forEach(function(s) {
s.map(function(value) {
var mappers = streams.map(function(s) {
return s.map(function(value) {
changed.push(s)
if (ready || streams.every(function(s) { return s.state !== "pending" })) {
ready = true
@ -101,7 +113,15 @@ function combine(fn, streams) {
changed = []
}
return value
}, Stream.SKIP).parent = stream
}, Stream.SKIP)
})
var endStream = stream.end.map(function(value) {
if (value === true) {
mappers.forEach(function(mapper) { mapper.end(true) })
endStream.end(true)
}
return undefined
})
return stream

View file

@ -261,6 +261,15 @@ o.spec("stream", function() {
o(thrown.constructor === TypeError).equals(false)
o(spy.callCount).equals(0)
})
o("combine callback not called when child stream was ended", function () {
var spy = o.spy()
var a = Stream(1)
var b = Stream(2)
var mapped = Stream.combine(spy, [a, b])
mapped.end(true)
a(11)
o(spy.callCount).equals(1)
})
})
o.spec("lift", function() {
o("transforms value", function() {
@ -479,6 +488,12 @@ o.spec("stream", function() {
o(spy.callCount).equals(1)
})
o("ended stream works like a container", function() {
var stream = Stream(1)
stream.end(true)
stream(2)
o(stream()).equals(2)
})
})
o.spec("toJSON", function() {
o("works", function() {
@ -544,6 +559,14 @@ o.spec("stream", function() {
o(stream["fantasy-land/map"]).equals(stream.map)
})
o("mapping function is not invoked after ending", function () {
var stream = Stream(undefined)
var fn = o.spy()
var mapped = stream.map(fn)
mapped.end(true)
stream(undefined)
o(fn.callCount).equals(1)
})
})
o.spec("ap", function() {
o("works", function() {