diff --git a/stream/stream.js b/stream/stream.js index bc914723..359396dd 100644 --- a/stream/stream.js +++ b/stream/stream.js @@ -27,8 +27,8 @@ function Stream(value) { if (arguments.length && v !== Stream.SKIP) { value = v if (open(stream)) { - stream.changing() - stream.state = "active" + stream._changing() + stream._state = "active" dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) } } @@ -37,35 +37,36 @@ function Stream(value) { } stream.constructor = Stream - stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending" - stream.parents = [] + stream._state = arguments.length && value !== Stream.SKIP ? "active" : "pending" + stream._parents = [] - stream.changing = function() { - open(stream) && (stream.state = "changing") + stream._changing = function() { + if (open(stream)) stream._state = "changing" dependentStreams.forEach(function(s) { - s.changing() + s._changing() }) } - stream.map = function(fn, ignoreInitial) { - var target = stream.state === "active" && ignoreInitial !== Stream.SKIP - ? Stream(fn(value)) - : Stream() - target.parents.push(stream) - + stream._map = function(fn, ignoreInitial) { + var target = ignoreInitial ? Stream() : Stream(fn(value)) + target._parents.push(stream) dependentStreams.push(target) dependentFns.push(fn) return target } + stream.map = function(fn) { + return stream._map(fn, stream._state !== "active") + } + var end function createEnd() { end = Stream() end.map(function(value) { if (value === true) { - stream.parents.forEach(function (p) {p.unregisterChild(stream)}) - stream.state = "ended" - stream.parents.length = dependentStreams.length = dependentFns.length = 0 + stream._parents.forEach(function (p) {p._unregisterChild(stream)}) + stream._state = "ended" + stream._parents.length = dependentStreams.length = dependentFns.length = 0 } return value }) @@ -77,7 +78,7 @@ function Stream(value) { stream["fantasy-land/map"] = stream.map stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) } - stream.unregisterChild = function(child) { + stream._unregisterChild = function(child) { var childIndex = dependentStreams.indexOf(child) if (childIndex !== -1) { dependentStreams.splice(childIndex, 1) @@ -96,7 +97,7 @@ function combine(fn, streams) { var ready = streams.every(function(s) { if (s.constructor !== Stream) throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream") - return s.state === "active" + return s._state === "active" }) var stream = ready ? Stream(fn.apply(null, streams.concat([streams]))) @@ -105,15 +106,15 @@ function combine(fn, streams) { var changed = [] var mappers = streams.map(function(s) { - return s.map(function(value) { + return s._map(function(value) { changed.push(s) - if (ready || streams.every(function(s) { return s.state !== "pending" })) { + if (ready || streams.every(function(s) { return s._state !== "pending" })) { ready = true stream(fn.apply(null, streams.concat([changed]))) changed = [] } return value - }, Stream.SKIP) + }, true) }) var endStream = stream.end.map(function(value) { @@ -168,7 +169,7 @@ function lift() { } function open(s) { - return s.state === "pending" || s.state === "active" || s.state === "changing" + return s._state === "pending" || s._state === "active" || s._state === "changing" } if (typeof module !== "undefined") module["exports"] = Stream