Rewrite stream (#2207)

* Rewrite stream

* Rename HALT to SKIP

* Rename HALT to SKIP

* Remove valueOf and toString

* Update docs for HALT to SKIP

* Rename halt to skip in test

* Add test for combining nested streams atomically

* Update change-log.md

* Test basic SKIP

* Add deprecated HALT

* Combine continues with ended streams

* Fix fantasy-land/of to match spec

* Don't use arrow function

* Improve scan description

* Fix merge artifact
This commit is contained in:
Rasmus Porsager 2018-11-27 21:11:24 +01:00 committed by Isiah Meadows
parent 5e7528fefd
commit 58c86f7546
5 changed files with 214 additions and 159 deletions

View file

@ -8,7 +8,7 @@
- [Stream.scan](#streamscan)
- [Stream.scanMerge](#streamscanmerge)
- [Stream.lift](#streamlift)
- [Stream.HALT](#streamhalt)
- [Stream.SKIP](#streamskip)
- [Stream["fantasy-land/of"]](#streamfantasy-landof)
- [Instance members](#instance-members)
- [stream.map](#streammap)
@ -121,13 +121,13 @@ Argument | Type | Required | Description
Creates a new stream with the results of calling the function on every value in the stream with an accumulator and the incoming value.
Note that you can prevent dependent streams from being updated by returning the special value `stream.HALT` inside the accumulator function.
Note that you can prevent dependent streams from being updated by returning the special value `stream.SKIP` inside the accumulator function.
`stream = Stream.scan(fn, accumulator, stream)`
Argument | Type | Required | Description
------------- | -------------------------------- | -------- | ---
`fn` | `(accumulator, value) -> result \| HALT` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value
`fn` | `(accumulator, value) -> result \| SKIP` | Yes | A function that takes an accumulator and value parameter and returns a new accumulator value of the same type
`accumulator` | `any` | Yes | The starting value for the accumulator
`stream` | `Stream` | Yes | Stream containing the values
**returns** | `Stream` | | Returns a new stream containing the result
@ -183,9 +183,9 @@ Argument | Type | Required | Description
---
##### Stream.HALT
##### Stream.SKIP
A special value that can be returned to stream callbacks to halt execution of downstreams
A special value that can be returned to stream callbacks to skip execution of downstreams
---
@ -375,14 +375,14 @@ console.log(doubled()) // logs 2
Dependent streams are *reactive*: their values are updated any time the value of their parent stream is updated. This happens regardless of whether the dependent stream was created before or after the value of the parent stream was set.
You can prevent dependent streams from being updated by returning the special value `stream.HALT`
You can prevent dependent streams from being updated by returning the special value `stream.SKIP`
```javascript
var halted = stream(1).map(function(value) {
return stream.HALT
var skipped = stream(1).map(function(value) {
return stream.SKIP
})
halted.map(function() {
skipped.map(function() {
// never runs
})
```
@ -432,14 +432,14 @@ console.log(added()) // logs 12
A stream can depend on any number of streams and it's guaranteed to update atomically. For example, if a stream A has two dependent streams B and C, and a fourth stream D is dependent on both B and C, the stream D will only update once if the value of A changes. This guarantees that the callback for stream D is never called with unstable values such as when B has a new value but C has the old value. Atomicity also brings the performance benefits of not recomputing downstreams unnecessarily.
You can prevent dependent streams from being updated by returning the special value `stream.HALT`
You can prevent dependent streams from being updated by returning the special value `stream.SKIP`
```javascript
var halted = stream.combine(function(stream) {
return stream.HALT
var skipped = stream.combine(function(stream) {
return stream.SKIP
}, [stream(1)])
halted.map(function() {
skipped.map(function() {
// never runs
})
```

View file

@ -1,6 +1,8 @@
# Change log for stream
## 2.0.0
- renamed HALT to SKIP [#2207](https://github.com/MithrilJS/mithril.js/pull/2207)
- rewrote implementation [#2207](https://github.com/MithrilJS/mithril.js/pull/2207)
- stream: Removed `valueOf` & `toString` methods ([#2150](https://github.com/MithrilJS/mithril.js/pull/2150)
## 1.1.0

View file

@ -2,150 +2,140 @@
;(function() {
"use strict"
/* eslint-enable */
Stream.SKIP = {}
Stream.lift = lift
Stream.scan = scan
Stream.merge = merge
Stream.combine = combine
Stream.scanMerge = scanMerge
Stream["fantasy-land/of"] = Stream
var guid = 0, HALT = {}
function createStream() {
function stream() {
if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0])
return stream._state.value
let warnedHalt = false
Object.defineProperty(Stream, "HALT", {
get: function() {
warnedHalt && console.log("HALT is deprecated and has been renamed to SKIP");
warnedHalt = true
return Stream.SKIP
}
initStream(stream)
})
if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0])
function Stream(value) {
var dependentStreams = []
var dependentFns = []
function stream(v) {
if (arguments.length && v !== Stream.SKIP && open(stream)) {
value = v
stream.changing()
stream.state = "active"
dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
}
return value
}
stream.constructor = Stream
stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending"
stream.changing = function() {
open(stream) && (stream.state = "changing")
dependentStreams.forEach(function(s) {
s.dependent && s.dependent.changing()
s.changing()
})
}
stream.map = function(fn, ignoreInitial) {
var target = stream.state === "active" && ignoreInitial !== Stream.SKIP
? Stream(fn(value))
: Stream()
dependentStreams.push(target)
dependentFns.push(fn)
return target
}
let end
function createEnd() {
end = Stream()
end.map(function(value) {
if (value === true) {
stream.state = "ended"
dependentStreams.length = dependentFns.length = 0
}
return value
})
return end
}
stream.toJSON = function() { return value != null && typeof value.toJSON === "function" ? value.toJSON() : value }
stream["fantasy-land/map"] = stream.map
stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) }
Object.defineProperty(stream, "end", {
get: function() { return end || createEnd() }
})
return stream
}
function initStream(stream) {
stream.constructor = createStream
stream._state = {id: guid++, value: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], endStream: undefined, unregister: undefined}
stream.map = stream["fantasy-land/map"] = map, stream["fantasy-land/ap"] = ap, stream["fantasy-land/of"] = createStream
stream.toJSON = toJSON
Object.defineProperties(stream, {
end: {get: function() {
if (!stream._state.endStream) {
var endStream = createStream()
endStream.map(function(value) {
if (value === true) {
unregisterStream(stream)
endStream._state.unregister = function(){unregisterStream(endStream)}
}
return value
})
stream._state.endStream = endStream
}
return stream._state.endStream
}}
})
}
function updateStream(stream, value) {
updateState(stream, value)
for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false)
if (stream._state.unregister != null) stream._state.unregister()
finalize(stream)
}
function updateState(stream, value) {
stream._state.value = value
stream._state.changed = true
if (stream._state.state !== 2) stream._state.state = 1
}
function updateDependency(stream, mustSync) {
var state = stream._state, parents = state.parents
if (parents.length > 0 && parents.every(active) && (mustSync || parents.some(changed))) {
var value = stream._state.derive()
if (value === HALT) return unregisterStream(stream)
updateState(stream, value)
}
}
function finalize(stream) {
stream._state.changed = false
for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false
}
function combine(fn, streams) {
if (!streams.every(valid)) throw new Error("Ensure that each item passed to stream.combine/merge/lift is a stream")
return initDependency(createStream(), streams, function() {
return fn.apply(this, streams.concat([streams.filter(changed)]))
var ready = streams.every(function(s) {
if (s.constructor !== Stream)
throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream")
return s.state === "active"
})
var stream = ready
? Stream(fn.apply(null, streams.concat([streams])))
: Stream()
let changed = []
streams.forEach(function(s) {
s.map(function(value) {
changed.push(s)
if (ready || streams.every(function(s) { return s.state !== "pending" })) {
ready = true
stream(fn.apply(null, streams.concat([changed])))
changed = []
}
return value
}, Stream.SKIP).parent = stream
})
return stream
}
function initDependency(dep, streams, derive) {
var state = dep._state
state.derive = derive
state.parents = streams.filter(notEnded)
registerDependency(dep, state.parents)
updateDependency(dep, true)
return dep
}
function registerDependency(stream, parents) {
for (var i = 0; i < parents.length; i++) {
parents[i]._state.deps[stream._state.id] = stream
registerDependency(stream, parents[i]._state.parents)
}
}
function unregisterStream(stream) {
for (var i = 0; i < stream._state.parents.length; i++) {
var parent = stream._state.parents[i]
delete parent._state.deps[stream._state.id]
}
for (var id in stream._state.deps) {
var dependent = stream._state.deps[id]
var index = dependent._state.parents.indexOf(stream)
if (index > -1) dependent._state.parents.splice(index, 1)
}
stream._state.state = 2 //ended
stream._state.deps = {}
}
function map(fn) {return combine(function(stream) {return fn(stream())}, [this])}
function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [stream, this])}
function toJSON() {return this._state.value != null && typeof this._state.value.toJSON === "function" ? this._state.value.toJSON() : this._state.value}
function valid(stream) {return stream._state }
function active(stream) {return stream._state.state === 1}
function changed(stream) {return stream._state.changed}
function notEnded(stream) {return stream._state.state !== 2}
function merge(streams) {
return combine(function() {
return streams.map(function(s) {return s()})
}, streams)
return combine(function() { return streams.map(function(s) { return s() }) }, streams)
}
function scan(reducer, seed, stream) {
var newStream = combine(function (s) {
var next = reducer(seed, s._state.value)
if (next !== HALT) return seed = next
return HALT
}, [stream])
if (newStream._state.state === 0) newStream(seed)
return newStream
function scan(fn, acc, origin) {
var stream = origin.map(function(v) {
acc = fn(acc, v)
return acc
})
stream(acc)
return stream
}
function scanMerge(tuples, seed) {
var streams = tuples.map(function(tuple) {
var stream = tuple[0]
if (stream._state.state === 0) stream(undefined)
return stream
})
var streams = tuples.map(function(tuple) { return tuple[0] })
var newStream = combine(function() {
var stream = combine(function() {
var changed = arguments[arguments.length - 1]
streams.forEach(function(stream, idx) {
if (changed.indexOf(stream) > -1) {
seed = tuples[idx][1](seed, stream._state.value)
}
streams.forEach(function(stream, i) {
if (changed.indexOf(stream) > -1)
seed = tuples[i][1](seed, stream())
})
return seed
}, streams)
return newStream
stream(seed)
return stream
}
function lift() {
@ -156,16 +146,12 @@ function lift() {
})
}
createStream["fantasy-land/of"] = createStream
createStream.merge = merge
createStream.combine = combine
createStream.scan = scan
createStream.scanMerge = scanMerge
createStream.lift = lift
createStream.HALT = HALT
function open(s) {
return s.state === "pending" || s.state === "active" || s.state === "changing"
}
if (typeof module !== "undefined") module["exports"] = createStream
else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = createStream
else window.m = {stream : createStream}
if (typeof module !== "undefined") module["exports"] = Stream
else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = Stream
else window.m = {stream : Stream}
}());

View file

@ -31,7 +31,7 @@ o.spec("scan", function() {
o(result[3]).deepEquals({a: 1})
})
o("reducer can return HALT to prevent child updates", function() {
o("reducer can return SKIP to prevent child updates", function() {
var count = 0
var action = stream()
var store = stream.scan(function (arr, value) {
@ -39,7 +39,7 @@ o.spec("scan", function() {
case "number":
return arr.concat(value)
default:
return stream.HALT
return stream.SKIP
}
}, [], action)
var child = store.map(function (p) {

View file

@ -30,6 +30,43 @@ o.spec("stream", function() {
o(stream()()).equals(1)
})
o("can SKIP", function() {
var a = Stream(2)
var b = a.map(function(value) {
return value === 5
? Stream.SKIP
: value
})
a(5)
o(b()).equals(2)
})
o("can HALT", function() {
var a = Stream(2)
var b = a.map(function(value) {
return value === 5
? Stream.HALT
: value
})
a(5)
o(b()).equals(2)
})
o("warns HALT deprecated", function() {
var log = console.log
var warning = ""
console.log = function(a) {
warning = a
}
Stream.HALT
console.log = log
o(warning).equals("HALT is deprecated and has been renamed to SKIP")
})
})
o.spec("combine", function() {
o("transforms value", function() {
@ -87,6 +124,7 @@ o.spec("stream", function() {
o(d()).equals(15)
o(count).equals(1)
})
o("combines default value atomically", function() {
var count = 0
var a = Stream(3)
@ -100,6 +138,21 @@ o.spec("stream", function() {
o(d()).equals(15)
o(count).equals(1)
})
o("combines and maps nested streams atomically", function() {
var count = 0
var a = Stream(3)
var b = Stream.combine(function(a) {return a() * 2}, [a])
var c = Stream.combine(function(a) {return a() * a()}, [a])
var d = c.map(function(x){return x})
var e = Stream.combine(function(x) {return x()}, [d])
var f = Stream.combine(function(b, e) {
count++
return b() + e()
}, [b, e])
o(f()).equals(15)
o(count).equals(1)
})
o("combine lists only changed upstreams in last arg", function() {
var streams = []
var a = Stream()
@ -111,8 +164,22 @@ o.spec("stream", function() {
a(3)
b(5)
o(streams.length).equals(1)
o(streams[0]).equals(b)
o(streams.length).equals(2)
o(streams[0]).equals(a)
o(streams[1]).equals(b)
})
o("combine continues with ended streams", function() {
var a = Stream()
var b = Stream()
var combined = Stream.combine(function(a, b) {
return a() + b()
}, [a, b])
a(3)
a.end(true)
b(5)
o(combined()).equals(8)
})
o("combine lists only changed upstreams in last arg with default value", function() {
var streams = []
@ -151,11 +218,11 @@ o.spec("stream", function() {
o(b()()).equals(undefined)
})
o("combine can halt", function() {
o("combine can skip", function() {
var count = 0
var a = Stream(1)
var b = Stream.combine(function() {
return Stream.HALT
return Stream.SKIP
}, [a])["fantasy-land/map"](function() {
count++
return 1
@ -164,13 +231,13 @@ o.spec("stream", function() {
o(b()).equals(undefined)
o(count).equals(0)
})
o("combine can conditionaly halt", function() {
o("combine can conditionaly skip", function() {
var count = 0
var halt = false
var skip = false
var a = Stream(1)
var b = Stream.combine(function(a) {
if (halt) {
return Stream.HALT
if (skip) {
return Stream.SKIP
}
return a()
}, [a])["fantasy-land/map"](function(a) {
@ -179,7 +246,7 @@ o.spec("stream", function() {
})
o(b()).equals(1)
o(count).equals(1)
halt = true
skip = true
count = 0
a(2)
o(b()).equals(1)
@ -554,7 +621,7 @@ o.spec("stream", function() {
})
o.spec("applicative", function() {
o("identity", function() {
var a = Stream()["fantasy-land/of"](function(value) {return value})
var a = Stream["fantasy-land/of"](function(value) {return value})
var v = Stream(5)
o(v["fantasy-land/ap"](a)()).equals(5)
@ -565,16 +632,16 @@ o.spec("stream", function() {
var f = function(value) {return value * 2}
var x = 3
o(a["fantasy-land/of"](x)["fantasy-land/ap"](a["fantasy-land/of"](f))()).equals(6)
o(a["fantasy-land/of"](x)["fantasy-land/ap"](a["fantasy-land/of"](f))()).equals(a["fantasy-land/of"](f(x))())
o(a.constructor["fantasy-land/of"](x)["fantasy-land/ap"](a.constructor["fantasy-land/of"](f))()).equals(6)
o(a.constructor["fantasy-land/of"](x)["fantasy-land/ap"](a.constructor["fantasy-land/of"](f))()).equals(a.constructor["fantasy-land/of"](f(x))())
})
o("interchange", function() {
var u = Stream(function(value) {return value * 2})
var a = Stream()
var y = 3
o(a["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(6)
o(a["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(u["fantasy-land/ap"](a["fantasy-land/of"](function(f) {return f(y)}))())
o(a.constructor["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(6)
o(a.constructor["fantasy-land/of"](y)["fantasy-land/ap"](u)()).equals(u["fantasy-land/ap"](a.constructor["fantasy-land/of"](function(f) {return f(y)}))())
})
})
})