fantasy-land methods no longer absorb streams

This commit is contained in:
Leo Horie 2016-06-30 01:28:50 -04:00
parent 7fa3fee9ce
commit 015a812610
4 changed files with 524 additions and 290 deletions

View file

@ -16,8 +16,8 @@ function initStream(stream, args) {
stream.constructor = createStream
stream._state = {id: guid++, value: undefined, error: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], errorStream: undefined, endStream: undefined}
stream.map = map, stream.ap = ap, stream.of = createStream
stream.valueOf = valueOf
stream.catch = doCatch
stream.valueOf = valueOf, stream.toJSON = toJSON
stream.run = run, stream.catch = doCatch
Object.defineProperties(stream, {
error: {get: function() {
@ -46,15 +46,18 @@ function initStream(stream, args) {
})
}
function updateStream(stream, value, error) {
if (!absorbStream(stream, value, false) && !absorbStream(stream, error, true)) {
updateState(stream, value, error)
for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false)
finalize(stream)
}
updateState(stream, value, error)
for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false)
finalize(stream)
}
function updateState(stream, value, error) {
error = unwrapError(value, error)
if (error !== undefined && typeof stream._state.recover === "function") {
try {updateValues(stream, stream._state.recover(), undefined)}
try {
var recovered = stream._state.recover()
if (recovered === HALT) return
updateValues(stream, recovered, undefined)
}
catch (e) {updateValues(stream, undefined, e)}
}
else updateValues(stream, value, error)
@ -73,7 +76,8 @@ function updateDependency(stream, mustSync) {
else {
try {
var value = state.derive()
if (!absorbStream(stream, value)) updateState(stream, value, undefined)
if (value === HALT) return
updateState(stream, value, undefined)
}
catch (e) {
updateState(stream, undefined, e)
@ -81,30 +85,29 @@ function updateDependency(stream, mustSync) {
}
}
}
function absorbStream(stream, value, isError) {
function unwrapError(value, error) {
if (value != null && value.constructor === createStream) {
if (value._state.state === 2) {
stream.end(true)
stream(value())
}
else if (value._state.error) stream.error(value.error())
else if (value._state.state === 0) return true
else if (!isError) stream(value())
else stream.error(value())
return true
if (value._state.error !== undefined) error = value._state.error
else error = unwrapError(value._state.value, value._state.error)
}
return false
return error
}
function finalize(stream) {
stream._state.changed = false
for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false
}
function run(fn) {
var self = createStream(), stream = this
return initDependency(self, [stream], function() {
return absorb(self, fn(stream()))
}, undefined)
}
function doCatch(fn) {
var stream = this
var self = createStream(), stream = this
var derive = function() {return stream._state.value}
var recover = function() {return fn(stream._state.error)}
return initDependency(createStream(), [stream], derive, recover)
var recover = function() {return absorb(self, fn(stream._state.error))}
return initDependency(self, [stream], derive, recover)
}
function combine(fn, streams) {
return initDependency(createStream(), streams, function() {
@ -113,6 +116,16 @@ function combine(fn, streams) {
return fn.apply(this, streams.concat([streams.filter(changed)]))
}, undefined)
}
function absorb(stream, value) {
if (value != null && value.constructor === createStream) {
value.error.map(stream.error)
value.map(stream)
if (value._state.state === 0) return HALT
if (value._state.error) throw value._state.error
value = value._state.value
}
return value
}
function initDependency(dep, streams, derive, recover) {
var state = dep._state
@ -148,6 +161,7 @@ function unregisterStream(stream) {
function map(fn) {return combine(function(stream) {return fn(stream())}, [this])}
function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [this, stream])}
function valueOf() {return this._state.value}
function toJSON() {return JSON.stringify(this._state.value)}
function active(stream) {return stream._state.state === 1}
function changed(stream) {return stream._state.changed}
@ -160,4 +174,4 @@ function reject(e) {
return stream
}
module.exports = {stream: createStream, combine: combine, reject: reject}
module.exports = {stream: createStream, combine: combine, reject: reject, HALT: HALT}

View file

@ -26,6 +26,11 @@ o.spec("stream", function() {
o(stream()).equals(undefined)
})
o("can be stream of streams", function() {
var stream = Stream.stream(Stream.stream(1))
o(stream()()).equals(1)
})
})
o.spec("combine", function() {
o("transforms value", function() {
@ -131,36 +136,34 @@ o.spec("stream", function() {
o(b()).equals(undefined)
})
o("combine absorbs streams", function() {
o("combine can return stream", function() {
var a = Stream.stream(1)
var b = Stream.combine(function(a) {
return Stream.stream(2)
}, [a])
o(b()).equals(2)
o(b()()).equals(2)
})
o("combine absorbs errored streams", function() {
var a = Stream.stream(1)
var b = Stream.combine(function(a) {
return Stream.reject(new Error("error"))
}, [a])
o(b()).equals(undefined)
o(b.error().message).equals("error")
})
o("combine absorbs ready streams", function() {
var count = 0
o("combine can return pending stream", function() {
var a = Stream.stream(1)
var b = Stream.combine(function(a) {
return Stream.stream()
}, [a])
var c = Stream.combine(function(b) {
count++
return 2
}, [b])
o(c()).equals(undefined)
o(count).equals(0)
o(b()()).equals(undefined)
})
o("combine can halt", function() {
var count = 0
var a = Stream.stream(1)
var b = Stream.combine(function(a) {
return Stream.HALT
}, [a])
.map(function() {
count++
return 1
})
o(b()).equals(undefined)
})
})
o.spec("end", function() {
@ -297,7 +300,8 @@ o.spec("stream", function() {
var stream = Stream.stream(1)
var mappedFromError = stream.error.map(function(value) {
return "from" + value.message
}).map(function(value) {
})
.map(function(value) {
return "a" + value
})
@ -328,13 +332,35 @@ o.spec("stream", function() {
o(downstream.error().message).equals("b")
o(count).equals(0)
})
o("error.map absorbs streams", function() {
o("error can halt", function() {
var count = 0
var stream = Stream.reject(1).error.map(function() {
return Stream.HALT
})
.map(function() {
count++
return 1
})
o(stream()).equals(undefined)
o(count).equals(0)
})
o("error.map can return streams", function() {
var stream = Stream.reject(new Error("error"))
var error = stream.error.map(function(value) {
return Stream.stream(1)
})
o(error()).equals(1)
o(error()()).equals(1)
})
o("combined stream of two errored streams adopts error from first", function() {
var a = Stream.stream(1)
var b = Stream.combine(function(a) {throw new Error("error from b")}, [a])
var c = Stream.combine(function(a) {throw new Error("error from c")}, [a])
var d = Stream.combine(function(a, b) {return 2}, [a, b])
o(d()).equals(undefined)
o(d.error().message).equals("error from b")
})
})
o.spec("reject", function() {
@ -384,13 +410,158 @@ o.spec("stream", function() {
o(count).equals(0)
})
})
o.spec("run", function() {
o("works", function() {
var stream = Stream.stream()
var doubled = stream.run(function(value) {return value * 2})
stream(3)
o(doubled()).equals(6)
})
o("works with default value", function() {
var stream = Stream.stream(3)
var doubled = stream.run(function(value) {return value * 2})
o(doubled()).equals(6)
})
o("works with undefined value", function() {
var stream = Stream.stream()
var mapped = stream.run(function(value) {return String(value)})
stream(undefined)
o(mapped()).equals("undefined")
})
o("works with default undefined value", function() {
var stream = Stream.stream(undefined)
var mapped = stream.run(function(value) {return String(value)})
o(mapped()).equals("undefined")
})
o("works with stream that throws", function() {
var count = 0
var stream = Stream.stream(undefined)
var errored = stream.run(function(value) {throw new Error("error")})
var mapped = errored.map(function(value) {
count++
return value
})
o(errored()).equals(undefined)
o(errored.error().message).equals("error")
o(mapped()).equals(undefined)
o(mapped.error().message).equals("error")
o(count).equals(0)
})
o("works with pending stream", function() {
var count = 0
var stream = Stream.stream(undefined)
var absorber = stream.run(function(value) {return Stream.stream()})
var mapped = absorber.map(function(value) {
count++
return value
})
o(mapped()).equals(undefined)
o(count).equals(0)
})
o("works with active stream", function() {
var stream = Stream.stream(undefined)
var mapped = stream.run(function(value) {return Stream.stream(1)})
o(mapped()).equals(1)
})
o("works with errored stream", function() {
var stream = Stream.stream(undefined)
var mapped = stream.run(function(value) {return Stream.reject(new Error("error"))})
o(mapped()).equals(undefined)
o(mapped.error().message).equals("error")
})
o("works with ended stream", function() {
var stream = Stream.stream(1)
var mapped = stream.run(function(value) {
var ended = Stream.stream(2)
ended.end(true)
return ended
})
stream(3)
o(mapped()).equals(2)
})
o("works when active stream updates", function() {
var stream = Stream.stream(undefined)
var absorbed = Stream.stream(1)
var mapped = stream.run(function(value) {return absorbed})
absorbed(2)
o(mapped()).equals(2)
absorbed(3)
o(mapped()).equals(3)
})
o("works when updating stream to errored state", function() {
var stream = Stream.stream(undefined)
var absorbed = Stream.stream(1)
var mapped = stream.run(function(value) {return absorbed})
absorbed.error(new Error("error"))
o(mapped()).equals(undefined)
o(mapped.error().message).equals("error")
absorbed.error(new Error("another error"))
o(mapped()).equals(undefined)
o(mapped.error().message).equals("another error")
})
o("works when pending stream updates", function() {
var stream = Stream.stream(undefined)
var absorbed = Stream.stream()
var mapped = stream.run(function(value) {return absorbed})
absorbed(2)
o(mapped()).equals(2)
})
o("works when updating pending stream to errored state", function() {
var stream = Stream.stream(undefined)
var absorbed = Stream.stream()
var mapped = stream.run(function(value) {return absorbed})
absorbed.error(new Error("error"))
o(mapped()).equals(undefined)
o(mapped.error().message).equals("error")
})
o("works when updating stream to active state", function() {
var stream = Stream.stream(undefined)
var absorbed = Stream.stream(1)
var mapped = stream.run(function(value) {return absorbed})
absorbed.error(new Error("error"))
o(mapped()).equals(undefined)
o(mapped.error().message).equals("error")
absorbed(2)
o(mapped()).equals(2)
o(mapped.error()).equals(undefined)
})
})
o.spec("catch", function() {
o("catch works from reject", function() {
var count = 0
var stream = Stream.reject(new Error("error")).catch(function(e) {
count++
return "no" + e.message
}).map(function(value) {
})
.map(function(value) {
return value + "mapped"
})
@ -403,7 +574,8 @@ o.spec("stream", function() {
var stream = Stream.combine(function() {throw new Error("error")}, [Stream.stream(1)]).catch(function(e) {
count++
return "no" + e.message
}).map(function(value) {
})
.map(function(value) {
return value + "mapped"
})
@ -417,7 +589,8 @@ o.spec("stream", function() {
var handled = stream.map(function(value) {return value + value}).catch(function(e) {
count++
return "no" + e.message
}).map(function(value) {
})
.map(function(value) {
return value + "mapped"
})
@ -432,7 +605,8 @@ o.spec("stream", function() {
var stream = Stream.stream("a").map(function(value) {return value + value}).catch(function(e) {
count++
return "no" + e.message
}).map(function(value) {
})
.map(function(value) {
return value + "mapped"
})
@ -457,6 +631,39 @@ o.spec("stream", function() {
o(stream()).equals("undefined")
o(stream.error()).equals(undefined)
})
o("catch absorbs pending stream", function() {
var count = 0
var stream = Stream.stream()
var mapped = Stream.reject(new Error("b")).catch(function(e) {
return stream
})
.map(function(value) {
count++
return String(value)
})
o(mapped()).equals(undefined)
o(count).equals(0)
})
o("catch absorbs active stream", function() {
var stream = Stream.stream(1)
var mapped = Stream.reject(new Error("b")).catch(function(e) {
return stream
})
.map(function(value) {return String(value)})
o(mapped()).equals("1")
})
o("catch absorbs errored stream", function() {
var stream = Stream.reject(new Error("a"))
var mapped = Stream.reject(new Error("b")).catch(function(e) {
return stream
})
.map(function(value) {return String(value)})
o(mapped()).equals(undefined)
o(mapped.error().message).equals("a")
})
o("catch does not prevent sibling error propagation", function() {
var a = Stream.reject(new Error("a"))
var b = a.map(function(value) {return value + "b"}).catch(function(e) {})
@ -466,19 +673,62 @@ o.spec("stream", function() {
o(d()).equals(undefined)
o(d.error().message).equals("a")
})
o("catches absorbed rejected stream", function() {
o("catches wrapped rejected stream", function() {
var caught
var stream = Stream.stream(1).map(function() {
return Stream.reject(new Error("error"))
}).catch(function(value) {
})
.catch(function(value) {
caught = value
return "no" + value.message
}).map(function(value) {
})
.map(function(value) {
return value + "mapped"
})
o(stream()).equals("noerrormapped")
})
o("catches nested wrapped rejected stream", function() {
var caught
var stream = Stream.stream(1).map(function() {
return Stream.stream(2).map(function() {
return Stream.reject(new Error("error"))
})
})
.catch(function(value) {
caught = value
return "no" + value.message
})
.map(function(value) {
return value + "mapped"
})
o(stream()).equals("noerrormapped")
})
})
o.spec("valueOf", function() {
o("works", function() {
o(Stream.stream(1).valueOf()).equals(1)
o(Stream.stream("a").valueOf()).equals("a")
o(Stream.stream(true).valueOf()).equals(true)
o(Stream.stream(null).valueOf()).equals(null)
o(Stream.stream(undefined).valueOf()).equals(undefined)
o(Stream.stream({a: 1}).valueOf()).deepEquals({a: 1})
o(Stream.stream([1, 2, 3]).valueOf()).deepEquals([1, 2, 3])
o(Stream.stream().valueOf()).equals(undefined)
})
})
o.spec("toJSON", function() {
o("works", function() {
o(Stream.stream(1).toJSON()).equals("1")
o(Stream.stream("a").toJSON()).equals("\"a\"")
o(Stream.stream(true).toJSON()).equals("true")
o(Stream.stream(null).toJSON()).equals("null")
o(Stream.stream(undefined).toJSON()).equals(undefined)
o(Stream.stream({a: 1}).toJSON()).deepEquals("{\"a\":1}")
o(Stream.stream([1, 2, 3]).toJSON()).deepEquals("[1,2,3]")
o(Stream.stream().toJSON()).equals(undefined)
})
})
o.spec("map", function() {
o("works", function() {
@ -509,6 +759,12 @@ o.spec("stream", function() {
o(mapped()).equals("undefined")
})
o("works with pending stream", function() {
var stream = Stream.stream(undefined)
var mapped = stream.map(function(value) {return Stream.stream()})
o(mapped()()).equals(undefined)
})
})
o.spec("ap", function() {
o("works", function() {
@ -538,187 +794,6 @@ o.spec("stream", function() {
o(applied()).equals("undefineda")
})
})
o.spec("absorbing semantics", function() {
o("absorbs stream", function() {
var stream = Stream.stream(Stream.stream(1))
var doubled = stream.map(function(value) {return value * 2})
o(stream()).equals(1)
o(doubled()).equals(2)
})
o("absorbs stream on update", function() {
var stream = Stream.stream()
var doubled = stream.map(function(value) {return value * 2})
stream(Stream.stream(1))
o(stream()).equals(1)
o(doubled()).equals(2)
})
o("absorbs errored stream", function() {
var stream = Stream.stream(Stream.reject(1))
var doubled = stream.error.map(function(value) {return value * 2})
o(stream()).equals(undefined)
o(stream.error()).equals(1)
o(doubled()).equals(2)
})
o("absorbs pending stream", function() {
var count = 0
var stream = Stream.stream(Stream.stream())
var mapped = stream.map(function() {
count++
})
o(stream()).equals(undefined)
o(count).equals(0)
})
o("absorbs ended stream", function() {
var count = 0
var ended = Stream.stream()
ended.end(true)
var stream = Stream.stream(ended)
var mapped = stream.map(function() {
count++
})
stream(1)
o(count).equals(0)
})
o("combine absorbs stream", function() {
var stream = Stream.stream(1)
var combined = Stream.combine(function(stream) {
return Stream.stream(2)
}, [stream])
var doubled = combined.map(function(value) {return value * 2})
o(combined()).equals(2)
o(doubled()).equals(4)
})
o("combine absorbs errored stream", function() {
var stream = Stream.stream(1)
var combined = Stream.combine(function(stream) {
return Stream.reject(2)
}, [stream])
var doubled = combined.error.map(function(value) {return value * 2})
o(combined()).equals(undefined)
o(combined.error()).equals(2)
o(doubled()).equals(4)
})
o("combine absorbs pending stream", function() {
var count = 0
var stream = Stream.stream(1)
var combined = Stream.combine(function(stream) {
return Stream.stream()
}, [stream])
var mapped = combined.map(function() {
count++
})
o(combined()).equals(undefined)
o(count).equals(0)
})
o("combine absorbs ended stream", function() {
var count = 0
var stream = Stream.stream(1)
var combined = Stream.combine(function(stream) {
var ended = Stream.stream(2)
ended.end(true)
return ended
}, [stream])
var mapped = combined.map(function() {
count++
})
o(combined()).equals(2)
o(count).equals(0)
})
o("reject absorbs stream", function() {
var stream = Stream.reject(Stream.stream(1))
var doubled = stream.error.map(function(value) {return value * 2})
o(stream.error()).equals(1)
o(doubled()).equals(2)
})
o("reject absorbs errored stream", function() {
var count = 0
var stream = Stream.reject(Stream.reject(new Error("error")))
var mapped = stream.error.map(function() {
count++
})
o(stream.error().message).equals("error")
o(count).equals(1)
})
o("reject absorbs pending stream", function() {
var count = 0
var stream = Stream.reject(Stream.stream())
var mapped = stream.error.map(function() {
count++
})
o(stream.error()).equals(undefined)
o(count).equals(0)
})
o("reject absorbs ended stream", function() {
var count = 0
var ended = Stream.stream(1)
ended.end(true)
var stream = Stream.reject(ended)
var mapped = stream.error.map(function() {
count++
})
o(count).equals(0)
})
o("ended stream absorbs stream", function() {
var stream = Stream.stream()
stream.end(true)
var doubled = stream.map(function(value) {return value * 2})
stream(Stream.stream(1))
o(stream()).equals(1)
o(doubled()).equals(undefined)
})
o("ended stream absorbs pending stream", function() {
var stream = Stream.stream()
stream.end(true)
var doubled = stream.map(function(value) {return value * 2})
stream(Stream.stream())
o(stream()).equals(undefined)
o(doubled()).equals(undefined)
})
o("ended stream absorbs errored stream", function() {
var stream = Stream.stream()
stream.end(true)
var doubled = stream.map(function(value) {return value * 2})
stream(Stream.reject(new Error("error")))
o(stream.error().message).equals("error")
o(doubled()).equals(undefined)
})
o("ended stream absorbs ended stream", function() {
var stream = Stream.stream(1)
stream.end(true)
var ended = Stream.stream(2)
ended.end(true)
var doubled = stream.map(function(value) {return value * 2})
stream(ended)
o(stream()).equals(2)
o(doubled()).equals(undefined)
})
})
o.spec("fantasy-land", function() {
o.spec("functor", function() {
o("identity", function() {