From 58c86f75468438b26915f51d9f8df0da321fe27e Mon Sep 17 00:00:00 2001 From: Rasmus Porsager Date: Tue, 27 Nov 2018 21:11:24 +0100 Subject: [PATCH] Rewrite stream (#2207) * Rewrite stream * Rename HALT to SKIP * Rename HALT to SKIP * Remove valueOf and toString * Update docs for HALT to SKIP * Rename halt to skip in test * Add test for combining nested streams atomically * Update change-log.md * Test basic SKIP * Add deprecated HALT * Combine continues with ended streams * Fix fantasy-land/of to match spec * Don't use arrow function * Improve scan description * Fix merge artifact --- docs/stream.md | 26 ++-- stream/change-log.md | 2 + stream/stream.js | 246 +++++++++++++++++------------------- stream/tests/test-scan.js | 4 +- stream/tests/test-stream.js | 95 ++++++++++++-- 5 files changed, 214 insertions(+), 159 deletions(-) diff --git a/docs/stream.md b/docs/stream.md index be78d0a8..d49b1fd2 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -8,7 +8,7 @@ - [Stream.scan](#streamscan) - [Stream.scanMerge](#streamscanmerge) - [Stream.lift](#streamlift) - - [Stream.HALT](#streamhalt) + - [Stream.SKIP](#streamskip) - [Stream["fantasy-land/of"]](#streamfantasy-landof) - [Instance members](#instance-members) - [stream.map](#streammap) @@ -121,13 +121,13 @@ Argument | Type | Required | Description Creates a new stream with the results of calling the function on every value in the stream with an accumulator and the incoming value. -Note that you can prevent dependent streams from being updated by returning the special value `stream.HALT` inside the accumulator function. +Note that you can prevent dependent streams from being updated by returning the special value `stream.SKIP` inside the accumulator function. `stream = Stream.scan(fn, accumulator, stream)` Argument | Type | Required | Description ------------- | -------------------------------- | -------- | --- -`fn` | `(accumulator, value) -> result \| HALT` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value +`fn` | `(accumulator, value) -> result \| SKIP` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value of the same type `accumulator` | `any` | Yes | The starting value for the accumulator `stream` | `Stream` | Yes | Stream containing the values **returns** | `Stream` | | Returns a new stream containing the result @@ -183,9 +183,9 @@ Argument | Type | Required | Description --- -##### Stream.HALT +##### Stream.SKIP -A special value that can be returned to stream callbacks to halt execution of downstreams +A special value that can be returned to stream callbacks to skip execution of downstreams --- @@ -375,14 +375,14 @@ console.log(doubled()) // logs 2 Dependent streams are *reactive*: their values are updated any time the value of their parent stream is updated. This happens regardless of whether the dependent stream was created before or after the value of the parent stream was set. -You can prevent dependent streams from being updated by returning the special value `stream.HALT` +You can prevent dependent streams from being updated by returning the special value `stream.SKIP` ```javascript -var halted = stream(1).map(function(value) { - return stream.HALT +var skipped = stream(1).map(function(value) { + return stream.SKIP }) -halted.map(function() { +skipped.map(function() { // never runs }) ``` @@ -432,14 +432,14 @@ console.log(added()) // logs 12 A stream can depend on any number of streams and it's guaranteed to update atomically. For example, if a stream A has two dependent streams B and C, and a fourth stream D is dependent on both B and C, the stream D will only update once if the value of A changes. This guarantees that the callback for stream D is never called with unstable values such as when B has a new value but C has the old value. Atomicity also brings the performance benefits of not recomputing downstreams unnecessarily. -You can prevent dependent streams from being updated by returning the special value `stream.HALT` +You can prevent dependent streams from being updated by returning the special value `stream.SKIP` ```javascript -var halted = stream.combine(function(stream) { - return stream.HALT +var skipped = stream.combine(function(stream) { + return stream.SKIP }, [stream(1)]) -halted.map(function() { +skipped.map(function() { // never runs }) ``` diff --git a/stream/change-log.md b/stream/change-log.md index 45ccd0da..a7c5bf7b 100644 --- a/stream/change-log.md +++ b/stream/change-log.md @@ -1,6 +1,8 @@ # Change log for stream ## 2.0.0 +- renamed HALT to SKIP [#2207](https://github.com/MithrilJS/mithril.js/pull/2207) +- rewrote implementation [#2207](https://github.com/MithrilJS/mithril.js/pull/2207) - stream: Removed `valueOf` & `toString` methods ([#2150](https://github.com/MithrilJS/mithril.js/pull/2150) ## 1.1.0 diff --git a/stream/stream.js b/stream/stream.js index d51520cb..4a703ce9 100644 --- a/stream/stream.js +++ b/stream/stream.js @@ -2,150 +2,140 @@ ;(function() { "use strict" /* eslint-enable */ +Stream.SKIP = {} +Stream.lift = lift +Stream.scan = scan +Stream.merge = merge +Stream.combine = combine +Stream.scanMerge = scanMerge +Stream["fantasy-land/of"] = Stream -var guid = 0, HALT = {} -function createStream() { - function stream() { - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0]) - return stream._state.value +let warnedHalt = false +Object.defineProperty(Stream, "HALT", { + get: function() { + warnedHalt && console.log("HALT is deprecated and has been renamed to SKIP"); + warnedHalt = true + return Stream.SKIP } - initStream(stream) +}) - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0]) +function Stream(value) { + var dependentStreams = [] + var dependentFns = [] + + function stream(v) { + if (arguments.length && v !== Stream.SKIP && open(stream)) { + value = v + stream.changing() + stream.state = "active" + dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) }) + } + + return value + } + + stream.constructor = Stream + stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending" + + stream.changing = function() { + open(stream) && (stream.state = "changing") + dependentStreams.forEach(function(s) { + s.dependent && s.dependent.changing() + s.changing() + }) + } + + stream.map = function(fn, ignoreInitial) { + var target = stream.state === "active" && ignoreInitial !== Stream.SKIP + ? Stream(fn(value)) + : Stream() + + dependentStreams.push(target) + dependentFns.push(fn) + return target + } + + let end + function createEnd() { + end = Stream() + end.map(function(value) { + if (value === true) { + stream.state = "ended" + dependentStreams.length = dependentFns.length = 0 + } + return value + }) + return end + } + + stream.toJSON = function() { return value != null && typeof value.toJSON === "function" ? value.toJSON() : value } + + stream["fantasy-land/map"] = stream.map + stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) } + + Object.defineProperty(stream, "end", { + get: function() { return end || createEnd() } + }) return stream } -function initStream(stream) { - stream.constructor = createStream - stream._state = {id: guid++, value: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], endStream: undefined, unregister: undefined} - stream.map = stream["fantasy-land/map"] = map, stream["fantasy-land/ap"] = ap, stream["fantasy-land/of"] = createStream - stream.toJSON = toJSON - - Object.defineProperties(stream, { - end: {get: function() { - if (!stream._state.endStream) { - var endStream = createStream() - endStream.map(function(value) { - if (value === true) { - unregisterStream(stream) - endStream._state.unregister = function(){unregisterStream(endStream)} - } - return value - }) - stream._state.endStream = endStream - } - return stream._state.endStream - }} - }) -} -function updateStream(stream, value) { - updateState(stream, value) - for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) - if (stream._state.unregister != null) stream._state.unregister() - finalize(stream) -} -function updateState(stream, value) { - stream._state.value = value - stream._state.changed = true - if (stream._state.state !== 2) stream._state.state = 1 -} -function updateDependency(stream, mustSync) { - var state = stream._state, parents = state.parents - if (parents.length > 0 && parents.every(active) && (mustSync || parents.some(changed))) { - var value = stream._state.derive() - if (value === HALT) return unregisterStream(stream) - updateState(stream, value) - } -} -function finalize(stream) { - stream._state.changed = false - for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false -} function combine(fn, streams) { - if (!streams.every(valid)) throw new Error("Ensure that each item passed to stream.combine/merge/lift is a stream") - return initDependency(createStream(), streams, function() { - return fn.apply(this, streams.concat([streams.filter(changed)])) + var ready = streams.every(function(s) { + if (s.constructor !== Stream) + throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream") + return s.state === "active" }) + var stream = ready + ? Stream(fn.apply(null, streams.concat([streams]))) + : Stream() + + let changed = [] + + streams.forEach(function(s) { + s.map(function(value) { + changed.push(s) + if (ready || streams.every(function(s) { return s.state !== "pending" })) { + ready = true + stream(fn.apply(null, streams.concat([changed]))) + changed = [] + } + return value + }, Stream.SKIP).parent = stream + }) + + return stream } -function initDependency(dep, streams, derive) { - var state = dep._state - state.derive = derive - state.parents = streams.filter(notEnded) - - registerDependency(dep, state.parents) - updateDependency(dep, true) - - return dep -} -function registerDependency(stream, parents) { - for (var i = 0; i < parents.length; i++) { - parents[i]._state.deps[stream._state.id] = stream - registerDependency(stream, parents[i]._state.parents) - } -} -function unregisterStream(stream) { - for (var i = 0; i < stream._state.parents.length; i++) { - var parent = stream._state.parents[i] - delete parent._state.deps[stream._state.id] - } - for (var id in stream._state.deps) { - var dependent = stream._state.deps[id] - var index = dependent._state.parents.indexOf(stream) - if (index > -1) dependent._state.parents.splice(index, 1) - } - stream._state.state = 2 //ended - stream._state.deps = {} -} - -function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} -function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [stream, this])} -function toJSON() {return this._state.value != null && typeof this._state.value.toJSON === "function" ? this._state.value.toJSON() : this._state.value} - -function valid(stream) {return stream._state } -function active(stream) {return stream._state.state === 1} -function changed(stream) {return stream._state.changed} -function notEnded(stream) {return stream._state.state !== 2} - function merge(streams) { - return combine(function() { - return streams.map(function(s) {return s()}) - }, streams) + return combine(function() { return streams.map(function(s) { return s() }) }, streams) } -function scan(reducer, seed, stream) { - var newStream = combine(function (s) { - var next = reducer(seed, s._state.value) - if (next !== HALT) return seed = next - return HALT - }, [stream]) - - if (newStream._state.state === 0) newStream(seed) - - return newStream +function scan(fn, acc, origin) { + var stream = origin.map(function(v) { + acc = fn(acc, v) + return acc + }) + stream(acc) + return stream } function scanMerge(tuples, seed) { - var streams = tuples.map(function(tuple) { - var stream = tuple[0] - if (stream._state.state === 0) stream(undefined) - return stream - }) + var streams = tuples.map(function(tuple) { return tuple[0] }) - var newStream = combine(function() { + var stream = combine(function() { var changed = arguments[arguments.length - 1] - - streams.forEach(function(stream, idx) { - if (changed.indexOf(stream) > -1) { - seed = tuples[idx][1](seed, stream._state.value) - } + streams.forEach(function(stream, i) { + if (changed.indexOf(stream) > -1) + seed = tuples[i][1](seed, stream()) }) return seed }, streams) - return newStream + stream(seed) + + return stream } function lift() { @@ -156,16 +146,12 @@ function lift() { }) } -createStream["fantasy-land/of"] = createStream -createStream.merge = merge -createStream.combine = combine -createStream.scan = scan -createStream.scanMerge = scanMerge -createStream.lift = lift -createStream.HALT = HALT +function open(s) { + return s.state === "pending" || s.state === "active" || s.state === "changing" +} -if (typeof module !== "undefined") module["exports"] = createStream -else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = createStream -else window.m = {stream : createStream} +if (typeof module !== "undefined") module["exports"] = Stream +else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = Stream +else window.m = {stream : Stream} }()); diff --git a/stream/tests/test-scan.js b/stream/tests/test-scan.js index 2f2c2bb2..04423729 100644 --- a/stream/tests/test-scan.js +++ b/stream/tests/test-scan.js @@ -31,7 +31,7 @@ o.spec("scan", function() { o(result[3]).deepEquals({a: 1}) }) - o("reducer can return HALT to prevent child updates", function() { + o("reducer can return SKIP to prevent child updates", function() { var count = 0 var action = stream() var store = stream.scan(function (arr, value) { @@ -39,7 +39,7 @@ o.spec("scan", function() { case "number": return arr.concat(value) default: - return stream.HALT + return stream.SKIP } }, [], action) var child = store.map(function (p) { diff --git a/stream/tests/test-stream.js b/stream/tests/test-stream.js index 2ef0ef94..99e59b5c 100644 --- a/stream/tests/test-stream.js +++ b/stream/tests/test-stream.js @@ -30,6 +30,43 @@ o.spec("stream", function() { o(stream()()).equals(1) }) + o("can SKIP", function() { + var a = Stream(2) + var b = a.map(function(value) { + return value === 5 + ? Stream.SKIP + : value + }) + + a(5) + + o(b()).equals(2) + }) + o("can HALT", function() { + var a = Stream(2) + var b = a.map(function(value) { + return value === 5 + ? Stream.HALT + : value + }) + + a(5) + + o(b()).equals(2) + }) + o("warns HALT deprecated", function() { + var log = console.log + var warning = "" + console.log = function(a) { + warning = a + } + + Stream.HALT + + console.log = log + + o(warning).equals("HALT is deprecated and has been renamed to SKIP") + }) }) o.spec("combine", function() { o("transforms value", function() { @@ -87,6 +124,7 @@ o.spec("stream", function() { o(d()).equals(15) o(count).equals(1) }) + o("combines default value atomically", function() { var count = 0 var a = Stream(3) @@ -100,6 +138,21 @@ o.spec("stream", function() { o(d()).equals(15) o(count).equals(1) }) + o("combines and maps nested streams atomically", function() { + var count = 0 + var a = Stream(3) + var b = Stream.combine(function(a) {return a() * 2}, [a]) + var c = Stream.combine(function(a) {return a() * a()}, [a]) + var d = c.map(function(x){return x}) + var e = Stream.combine(function(x) {return x()}, [d]) + var f = Stream.combine(function(b, e) { + count++ + return b() + e() + }, [b, e]) + + o(f()).equals(15) + o(count).equals(1) + }) o("combine lists only changed upstreams in last arg", function() { var streams = [] var a = Stream() @@ -111,8 +164,22 @@ o.spec("stream", function() { a(3) b(5) - o(streams.length).equals(1) - o(streams[0]).equals(b) + o(streams.length).equals(2) + o(streams[0]).equals(a) + o(streams[1]).equals(b) + }) + o("combine continues with ended streams", function() { + var a = Stream() + var b = Stream() + var combined = Stream.combine(function(a, b) { + return a() + b() + }, [a, b]) + + a(3) + a.end(true) + b(5) + + o(combined()).equals(8) }) o("combine lists only changed upstreams in last arg with default value", function() { var streams = [] @@ -151,11 +218,11 @@ o.spec("stream", function() { o(b()()).equals(undefined) }) - o("combine can halt", function() { + o("combine can skip", function() { var count = 0 var a = Stream(1) var b = Stream.combine(function() { - return Stream.HALT + return Stream.SKIP }, [a])["fantasy-land/map"](function() { count++ return 1 @@ -164,13 +231,13 @@ o.spec("stream", function() { o(b()).equals(undefined) o(count).equals(0) }) - o("combine can conditionaly halt", function() { + o("combine can conditionaly skip", function() { var count = 0 - var halt = false + var skip = false var a = Stream(1) var b = Stream.combine(function(a) { - if (halt) { - return Stream.HALT + if (skip) { + return Stream.SKIP } return a() }, [a])["fantasy-land/map"](function(a) { @@ -179,7 +246,7 @@ o.spec("stream", function() { }) o(b()).equals(1) o(count).equals(1) - halt = true + skip = true count = 0 a(2) o(b()).equals(1) @@ -554,7 +621,7 @@ o.spec("stream", function() { }) o.spec("applicative", function() { o("identity", function() { - var a = Stream()["fantasy-land/of"](function(value) {return value}) + var a = Stream["fantasy-land/of"](function(value) {return value}) var v = Stream(5) o(v["fantasy-land/ap"](a)()).equals(5) @@ -565,16 +632,16 @@ o.spec("stream", function() { var f = function(value) {return value * 2} var x = 3 - o(a["fantasy-land/of"](x)["fantasy-land/ap"](a["fantasy-land/of"](f))()).equals(6) - o(a["fantasy-land/of"](x)["fantasy-land/ap"](a["fantasy-land/of"](f))()).equals(a["fantasy-land/of"](f(x))()) + o(a.constructor["fantasy-land/of"](x)["fantasy-land/ap"](a.constructor["fantasy-land/of"](f))()).equals(6) + o(a.constructor["fantasy-land/of"](x)["fantasy-land/ap"](a.constructor["fantasy-land/of"](f))()).equals(a.constructor["fantasy-land/of"](f(x))()) }) o("interchange", function() { var u = Stream(function(value) {return value * 2}) var a = Stream() var y = 3 - o(a["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(6) - o(a["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(u["fantasy-land/ap"](a["fantasy-land/of"](function(f) {return f(y)}))()) + o(a.constructor["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(6) + o(a.constructor["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(u["fantasy-land/ap"](a.constructor["fantasy-land/of"](function(f) {return f(y)}))()) }) }) })