mithril-vndb/stream/stream.js
Rasmus Porsager 58c86f7546 Rewrite stream (#2207)
* Rewrite stream

* Rename HALT to SKIP

* Rename HALT to SKIP

* Remove valueOf and toString

* Update docs for HALT to SKIP

* Rename halt to skip in test

* Add test for combining nested streams atomically

* Update change-log.md

* Test basic SKIP

* Add deprecated HALT

* Combine continues with ended streams

* Fix fantasy-land/of to match spec

* Don't use arrow function

* Improve scan description

* Fix merge artifact
2018-11-27 15:11:24 -05:00

157 lines
3.6 KiB
JavaScript

/* eslint-disable */
;(function() {
"use strict"
/* eslint-enable */
Stream.SKIP = {}
Stream.lift = lift
Stream.scan = scan
Stream.merge = merge
Stream.combine = combine
Stream.scanMerge = scanMerge
Stream["fantasy-land/of"] = Stream
let warnedHalt = false
Object.defineProperty(Stream, "HALT", {
get: function() {
warnedHalt && console.log("HALT is deprecated and has been renamed to SKIP");
warnedHalt = true
return Stream.SKIP
}
})
function Stream(value) {
var dependentStreams = []
var dependentFns = []
function stream(v) {
if (arguments.length && v !== Stream.SKIP && open(stream)) {
value = v
stream.changing()
stream.state = "active"
dependentStreams.forEach(function(s, i) { s(dependentFns[i](value)) })
}
return value
}
stream.constructor = Stream
stream.state = arguments.length && value !== Stream.SKIP ? "active" : "pending"
stream.changing = function() {
open(stream) && (stream.state = "changing")
dependentStreams.forEach(function(s) {
s.dependent && s.dependent.changing()
s.changing()
})
}
stream.map = function(fn, ignoreInitial) {
var target = stream.state === "active" && ignoreInitial !== Stream.SKIP
? Stream(fn(value))
: Stream()
dependentStreams.push(target)
dependentFns.push(fn)
return target
}
let end
function createEnd() {
end = Stream()
end.map(function(value) {
if (value === true) {
stream.state = "ended"
dependentStreams.length = dependentFns.length = 0
}
return value
})
return end
}
stream.toJSON = function() { return value != null && typeof value.toJSON === "function" ? value.toJSON() : value }
stream["fantasy-land/map"] = stream.map
stream["fantasy-land/ap"] = function(x) { return combine(function(s1, s2) { return s1()(s2()) }, [x, stream]) }
Object.defineProperty(stream, "end", {
get: function() { return end || createEnd() }
})
return stream
}
function combine(fn, streams) {
var ready = streams.every(function(s) {
if (s.constructor !== Stream)
throw new Error("Ensure that each item passed to stream.combine/stream.merge/lift is a stream")
return s.state === "active"
})
var stream = ready
? Stream(fn.apply(null, streams.concat([streams])))
: Stream()
let changed = []
streams.forEach(function(s) {
s.map(function(value) {
changed.push(s)
if (ready || streams.every(function(s) { return s.state !== "pending" })) {
ready = true
stream(fn.apply(null, streams.concat([changed])))
changed = []
}
return value
}, Stream.SKIP).parent = stream
})
return stream
}
function merge(streams) {
return combine(function() { return streams.map(function(s) { return s() }) }, streams)
}
function scan(fn, acc, origin) {
var stream = origin.map(function(v) {
acc = fn(acc, v)
return acc
})
stream(acc)
return stream
}
function scanMerge(tuples, seed) {
var streams = tuples.map(function(tuple) { return tuple[0] })
var stream = combine(function() {
var changed = arguments[arguments.length - 1]
streams.forEach(function(stream, i) {
if (changed.indexOf(stream) > -1)
seed = tuples[i][1](seed, stream())
})
return seed
}, streams)
stream(seed)
return stream
}
function lift() {
var fn = arguments[0]
var streams = Array.prototype.slice.call(arguments, 1)
return merge(streams).map(function(streams) {
return fn.apply(undefined, streams)
})
}
function open(s) {
return s.state === "pending" || s.state === "active" || s.state === "changing"
}
if (typeof module !== "undefined") module["exports"] = Stream
else if (typeof window.m === "function" && !("stream" in window.m)) window.m.stream = Stream
else window.m = {stream : Stream}
}());