Conform stream.map to FL spec and clarify stream internal properties (#2481)

* Conform stream.map to FL spec. Clarify stream internal properties.

* Streamline ignoreInitial logic, code style edit
This commit is contained in:
spacejack 2019-07-23 02:35:50 -04:00 committed by Isiah Meadows
parent 84baff8def
commit 61b087ea20

View file

@ -27,8 +27,8 @@ function Stream(value) {
if (arguments.length && v !== Stream.SKIP) { if (arguments.length && v !== Stream.SKIP) {
value = v value = v
if (open(stream)) { if (open(stream)) {
stream.changing() stream._changing()
stream.state = "active" stream._state = "active"
dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
} }
} }
@ -37,35 +37,36 @@ function Stream(value) {
} }
stream.constructor = Stream stream.constructor = Stream
stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending" stream._state = arguments.length && value !== Stream.SKIP ? "active" : "pending"
stream.parents = [] stream._parents = []
stream.changing = function() { stream._changing = function() {
open(stream) && (stream.state = "changing") if (open(stream)) stream._state = "changing"
dependentStreams.forEach(function(s) { dependentStreams.forEach(function(s) {
s.changing() s._changing()
}) })
} }
stream.map = function(fn, ignoreInitial) { stream._map = function(fn, ignoreInitial) {
var target = stream.state === "active" && ignoreInitial !== Stream.SKIP var target = ignoreInitial ? Stream() : Stream(fn(value))
? Stream(fn(value)) target._parents.push(stream)
: Stream()
target.parents.push(stream)
dependentStreams.push(target) dependentStreams.push(target)
dependentFns.push(fn) dependentFns.push(fn)
return target return target
} }
stream.map = function(fn) {
return stream._map(fn, stream._state !== "active")
}
var end var end
function createEnd() { function createEnd() {
end = Stream() end = Stream()
end.map(function(value) { end.map(function(value) {
if (value === true) { if (value === true) {
stream.parents.forEach(function (p) {p.unregisterChild(stream)}) stream._parents.forEach(function (p) {p._unregisterChild(stream)})
stream.state = "ended" stream._state = "ended"
stream.parents.length = dependentStreams.length = dependentFns.length = 0 stream._parents.length = dependentStreams.length = dependentFns.length = 0
} }
return value return value
}) })
@ -77,7 +78,7 @@ function Stream(value) {
stream["fantasy-land/map"] = stream.map stream["fantasy-land/map"] = stream.map
stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) } stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) }
stream.unregisterChild = function(child) { stream._unregisterChild = function(child) {
var childIndex = dependentStreams.indexOf(child) var childIndex = dependentStreams.indexOf(child)
if (childIndex !== -1) { if (childIndex !== -1) {
dependentStreams.splice(childIndex, 1) dependentStreams.splice(childIndex, 1)
@ -96,7 +97,7 @@ function combine(fn, streams) {
var ready = streams.every(function(s) { var ready = streams.every(function(s) {
if (s.constructor !== Stream) if (s.constructor !== Stream)
throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream") throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream")
return s.state === "active" return s._state === "active"
}) })
var stream = ready var stream = ready
? Stream(fn.apply(null, streams.concat([streams]))) ? Stream(fn.apply(null, streams.concat([streams])))
@ -105,15 +106,15 @@ function combine(fn, streams) {
var changed = [] var changed = []
var mappers = streams.map(function(s) { var mappers = streams.map(function(s) {
return s.map(function(value) { return s._map(function(value) {
changed.push(s) changed.push(s)
if (ready || streams.every(function(s) { return s.state !== "pending" })) { if (ready || streams.every(function(s) { return s._state !== "pending" })) {
ready = true ready = true
stream(fn.apply(null, streams.concat([changed]))) stream(fn.apply(null, streams.concat([changed])))
changed = [] changed = []
} }
return value return value
}, Stream.SKIP) }, true)
}) })
var endStream = stream.end.map(function(value) { var endStream = stream.end.map(function(value) {
@ -168,7 +169,7 @@ function lift() {
} }
function open(s) { function open(s) {
return s.state === "pending" || s.state === "active" || s.state === "changing" return s._state === "pending" || s._state === "active" || s._state === "changing"
} }
if (typeof module !== "undefined") module["exports"] = Stream if (typeof module !== "undefined") module["exports"] = Stream