diff --git a/examples/threaditjs/app.js b/examples/threaditjs/app.js index 05085fe6..48384c99 100644 --- a/examples/threaditjs/app.js +++ b/examples/threaditjs/app.js @@ -1,11 +1,15 @@ T.time("Setup"); -var request = require("../../request/request")(window, Promise).ajax +var Stream = require("../../util/stream") +var requestService = require("../../request/request")(window, Stream) +var request = requestService.xhr var m = require("../../render/hyperscript") var trust = require("../../render/trust") var renderer = require("../../render/render")(window) var router = require("../../router/router")(window) +requestService.setCompletionCallback(run) + //API calls var api = { home : function() { @@ -14,7 +18,7 @@ var api = { }, thread : function(id) { T.timeEnd("Setup") - return request({method: "GET", url: T.apiUrl + "/comments/" + id}).then(T.transformResponse) + return request({method: "GET", url: T.apiUrl + "/comments/" + id}).map(T.transformResponse) }, newThread : function(text) { return request({method: "POST", url: T.apiUrl + "/threads/create",data: {text: text}}) @@ -27,29 +31,27 @@ var api = { var threads = [], current = null, loaded = false, error = false, notFound = false function loadThreads() { loaded = false - api.home().then(function(response) { + api.home().map(function(response) { document.title = "ThreaditJS: Mithril | Home" threads = response.data loaded = true - }, function() { + }).catch(function() { loaded = error = true }) - .then(run) } function loadThread(id) { loaded = false notFound = false - api.thread(id).then(function(response) { + api.thread(id).map(function(response) { document.title = "ThreaditJS: Mithril | " + T.trimTitle(response.root.text); loaded = true current = response - }, function(response) { + }).catch(function(response) { loaded = true if (response.status === 404) notFound = true else error = true }) - .then(run) } function unloadThread() { current = null @@ -57,11 +59,10 @@ function unloadThread() { function createThread() { var threadText = document.getElementById("threadText") - api.newThread(threadText.value).then(function(response) { + api.newThread(threadText.value).map(function(response) { threadText.value = ""; threads.push(response.data); }) - .then(run) return false } @@ -72,12 +73,11 @@ function showReplying(vnode) { } function submitComment(vnode) { - api.newComment(vnode.state.newComment, vnode.attrs.node.id).then(function(response) { + api.newComment(vnode.state.newComment, vnode.attrs.node.id).map(function(response) { vnode.state.newComment = "" vnode.state.replying = false vnode.attrs.node.children.push(response.data) }) - .then(run) return false } diff --git a/examples/threaditjs/index.html b/examples/threaditjs/index.html index b59ffea7..53249dfa 100644 --- a/examples/threaditjs/index.html +++ b/examples/threaditjs/index.html @@ -18,6 +18,7 @@ + diff --git a/index.js b/index.js index 432194ea..593f266f 100644 --- a/index.js +++ b/index.js @@ -1,21 +1,30 @@ "use strict" + ;(function () { - var Promise = require("./promise/promise") - var m = require("./render/hyperscript") - var renderService = require("./render/render")(window) - var redrawService = require("./api/pubsub")() - var requestService = require("./request/request")(window, Promise) - m.request = requestService.xhr - m.jsonp = requestService.jsonp - m.route = require("./api/router")(window, renderService, redrawService) - m.mount = require("./api/mount")(renderService, redrawService) - m.trust = require("./render/trust") - m.prop = require("./util/prop") - m.withAttr = require("./util/withAttr") - m.render = renderService.render - m.redraw = redrawService.publish +var Stream = require("./util/stream") +var m = require("./render/hyperscript") +var renderService = require("./render/render")(window) +var redrawService = require("./api/pubsub")() +var requestService = require("./request/request")(window) + +requestService.setCompletionCallback(redrawService.publish) + +m.version = "bleeding-edge" +m.request = requestService.xhr +m.jsonp = requestService.jsonp +m.route = require("./api/router")(window, renderService, redrawService) +m.mount = require("./api/mount")(renderService, redrawService) +m.trust = require("./render/trust") +m.prop = Stream.stream +m.prop.combine = Stream.combine +m.prop.reject = Stream.reject +m.prop.HALT = Stream.HALT +m.withAttr = require("./util/withAttr") +m.render = renderService.render +m.redraw = redrawService.publish + +if (typeof module === "object") module.exports = m +else window.m = m - if (typeof module === "object") module.exports = m - else window.m = m })() \ No newline at end of file diff --git a/request/request.js b/request/request.js index 071600d6..7773f6dc 100644 --- a/request/request.js +++ b/request/request.js @@ -1,83 +1,94 @@ "use strict" var buildQueryString = require("../querystring/build") +var Stream = require("../util/stream") -module.exports = function($window, Promise) { +module.exports = function($window) { var callbackCount = 0 + var oncompletion + function setCompletionCallback(callback) {oncompletion = callback} + function xhr(args) { - return new Promise(function(resolve, reject) { - var useBody = typeof args.useBody === "boolean" ? args.useBody : args.method !== "GET" && args.method !== "TRACE" - - if (typeof args.serialize !== "function") args.serialize = JSON.stringify - if (typeof args.deserialize !== "function") args.deserialize = deserialize - if (typeof args.extract !== "function") args.extract = extract - - args.url = interpolate(args.url, args.data) - if (useBody) args.data = args.serialize(args.data) - else args.url = assemble(args.url, args.data) - - var xhr = new $window.XMLHttpRequest() - xhr.open(args.method, args.url, typeof args.async === "boolean" ? args.async : true, typeof args.user === "string" ? args.user : undefined, typeof args.password === "string" ? args.password : undefined) - - if (args.serialize === JSON.stringify && useBody) { - xhr.setRequestHeader("Content-Type", "application/json; charset=utf-8") - } - if (args.deserialize === deserialize) { - xhr.setRequestHeader("Accept", "application/json, text/*") - } - - if (typeof args.config === "function") xhr = args.config(xhr, args) || xhr - - xhr.onreadystatechange = function() { - if (xhr.readyState === 4) { - try { - var response = args.deserialize(args.extract(xhr, args)) - if (xhr.status >= 200 && xhr.status < 300) { - if (typeof args.type === "function") { - if (response instanceof Array) { - for (var i = 0; i < response.length; i++) { - response[i] = new args.type(response[i]) - } + var stream = Stream.stream() + + var useBody = typeof args.useBody === "boolean" ? args.useBody : args.method !== "GET" && args.method !== "TRACE" + + if (typeof args.serialize !== "function") args.serialize = JSON.stringify + if (typeof args.deserialize !== "function") args.deserialize = deserialize + if (typeof args.extract !== "function") args.extract = extract + + args.url = interpolate(args.url, args.data) + if (useBody) args.data = args.serialize(args.data) + else args.url = assemble(args.url, args.data) + + var xhr = new $window.XMLHttpRequest() + xhr.open(args.method, args.url, typeof args.async === "boolean" ? args.async : true, typeof args.user === "string" ? args.user : undefined, typeof args.password === "string" ? args.password : undefined) + + if (args.serialize === JSON.stringify && useBody) { + xhr.setRequestHeader("Content-Type", "application/json; charset=utf-8") + } + if (args.deserialize === deserialize) { + xhr.setRequestHeader("Accept", "application/json, text/*") + } + + if (typeof args.config === "function") xhr = args.config(xhr, args) || xhr + + xhr.onreadystatechange = function() { + if (xhr.readyState === 4) { + try { + var response = args.deserialize(args.extract(xhr, args)) + if (xhr.status >= 200 && xhr.status < 300) { + if (typeof args.type === "function") { + if (response instanceof Array) { + for (var i = 0; i < response.length; i++) { + response[i] = new args.type(response[i]) } - else response = new args.type(response) } - - resolve(response) + else response = new args.type(response) } - else reject(new Error(xhr.responseText)) - } - catch (e) { - reject(e) + + stream(response) } + else stream.error(new Error(xhr.responseText)) } + catch (e) { + stream.error(e) + } + if (typeof oncompletion === "function") oncompletion() } - - if (useBody) xhr.send(args.data) - else xhr.send() - }) + } + + if (useBody) xhr.send(args.data) + else xhr.send() + + return stream } function jsonp(args) { - return new Promise(function(resolve, reject) { - var callbackName = args.callbackName || "_mithril_" + Math.round(Math.random() * 1e16) + "_" + callbackCount++ - var script = $window.document.createElement("script") - $window[callbackName] = function(data) { - script.parentNode.removeChild(script) - resolve(data) - delete $window[callbackName] - } - script.onerror = function() { - script.parentNode.removeChild(script) - reject(new Error("JSONP request failed")) - delete $window[callbackName] - } - if (args.data == null) args.data = {} - args.url = interpolate(args.url, args.data) - args.data[args.callbackKey || "callback"] = callbackName - script.src = assemble(args.url, args.data) - $window.document.documentElement.appendChild(script) - }) + var stream = Stream.stream() + + var callbackName = args.callbackName || "_mithril_" + Math.round(Math.random() * 1e16) + "_" + callbackCount++ + var script = $window.document.createElement("script") + $window[callbackName] = function(data) { + script.parentNode.removeChild(script) + stream(data) + if (typeof oncompletion === "function") oncompletion() + delete $window[callbackName] + } + script.onerror = function() { + script.parentNode.removeChild(script) + stream.error(new Error("JSONP request failed")) + if (typeof oncompletion === "function") oncompletion() + delete $window[callbackName] + } + if (args.data == null) args.data = {} + args.url = interpolate(args.url, args.data) + args.data[args.callbackKey || "callback"] = callbackName + script.src = assemble(args.url, args.data) + $window.document.documentElement.appendChild(script) + + return stream } function interpolate(url, data) { @@ -110,5 +121,5 @@ module.exports = function($window, Promise) { function extract(xhr) {return xhr.responseText} - return {xhr: xhr, jsonp: jsonp} + return {xhr: xhr, jsonp: jsonp, setCompletionCallback: setCompletionCallback} } diff --git a/request/tests/index.html b/request/tests/index.html index 2d5f3120..e9bf02e0 100644 --- a/request/tests/index.html +++ b/request/tests/index.html @@ -9,9 +9,10 @@ - + + diff --git a/request/tests/test-jsonp.js b/request/tests/test-jsonp.js index 0335bbe3..fcb7f2c1 100644 --- a/request/tests/test-jsonp.js +++ b/request/tests/test-jsonp.js @@ -9,7 +9,7 @@ o.spec("jsonp", function() { var mock, jsonp o.beforeEach(function() { mock = xhrMock() - jsonp = new Request(mock, Promise).jsonp + jsonp = new Request(mock).jsonp }) o("works", function(done) { @@ -19,9 +19,9 @@ o.spec("jsonp", function() { return {status: 200, responseText: queryData["callback"] + "(" + JSON.stringify({a: 1}) + ")"} } }) - jsonp({url: "/item"}).then(function(data) { + jsonp({url: "/item"}).map(function(data) { o(data).deepEquals({a: 1}) - }).then(done) + }).map(done) }) o("works w/ other querystring params", function(done) { mock.$defineRoutes({ @@ -30,10 +30,10 @@ o.spec("jsonp", function() { return {status: 200, responseText: queryData["callback"] + "(" + JSON.stringify(queryData) + ")"} } }) - jsonp({url: "/item", data: {a: "b", c: "d"}}).then(function(data) { + jsonp({url: "/item", data: {a: "b", c: "d"}}).map(function(data) { delete data["callback"] o(data).deepEquals({a: "b", c: "d"}) - }).then(done) + }).map(done) }) o("works w/ custom callbackKey", function(done) { mock.$defineRoutes({ @@ -42,9 +42,9 @@ o.spec("jsonp", function() { return {status: 200, responseText: queryData["cb"] + "(" + JSON.stringify({a: 2}) + ")"} } }) - jsonp({url: "/item", callbackKey: "cb"}).then(function(data) { + jsonp({url: "/item", callbackKey: "cb"}).map(function(data) { o(data).deepEquals({a: 2}) - }).then(done) + }).map(done) }) o("handles error", function(done) { jsonp({url: "/item", callbackKey: "cb"}).catch(function(e) { diff --git a/request/tests/test-xhr.js b/request/tests/test-xhr.js index 7be6e14e..88057603 100644 --- a/request/tests/test-xhr.js +++ b/request/tests/test-xhr.js @@ -8,7 +8,7 @@ o.spec("xhr", function() { var mock, xhr o.beforeEach(function() { mock = xhrMock() - xhr = new Request(mock, Promise).xhr + xhr = new Request(mock).xhr }) o.spec("success", function() { @@ -19,9 +19,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: 1})} } }) - xhr({method: "GET", url: "/item"}).then(function(data) { + xhr({method: "GET", url: "/item"}).map(function(data) { o(data).deepEquals({a: 1}) - }).then(function() { + }).map(function() { done() }) }) @@ -31,9 +31,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: 1})} } }) - xhr({method: "GET", url: "/item"}).then(function(data) { + xhr({method: "GET", url: "/item"}).map(function(data) { o(data).deepEquals({a: 1}) - }).then(done) + }).map(done) }) o("works w/ parameterized data via GET", function(done) { mock.$defineRoutes({ @@ -41,9 +41,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: request.query})} } }) - xhr({method: "GET", url: "/item", data: {x: "y"}}).then(function(data) { + xhr({method: "GET", url: "/item", data: {x: "y"}}).map(function(data) { o(data).deepEquals({a: "?x=y"}) - }).then(done) + }).map(done) }) o("works w/ parameterized data via POST", function(done) { mock.$defineRoutes({ @@ -51,9 +51,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: JSON.parse(request.body)})} } }) - xhr({method: "POST", url: "/item", data: {x: "y"}}).then(function(data) { + xhr({method: "POST", url: "/item", data: {x: "y"}}).map(function(data) { o(data).deepEquals({a: {x: "y"}}) - }).then(done) + }).map(done) }) o("works w/ parameterized data containing colon via GET", function(done) { mock.$defineRoutes({ @@ -61,9 +61,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: request.query})} } }) - xhr({method: "GET", url: "/item", data: {x: ":y"}}).then(function(data) { + xhr({method: "GET", url: "/item", data: {x: ":y"}}).map(function(data) { o(data).deepEquals({a: "?x=%3Ay"}) - }).then(done) + }).map(done) }) o("works w/ parameterized data containing colon via POST", function(done) { mock.$defineRoutes({ @@ -71,9 +71,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: JSON.parse(request.body)})} } }) - xhr({method: "POST", url: "/item", data: {x: ":y"}}).then(function(data) { + xhr({method: "POST", url: "/item", data: {x: ":y"}}).map(function(data) { o(data).deepEquals({a: {x: ":y"}}) - }).then(done) + }).map(done) }) o("works w/ parameterized url via GET", function(done) { mock.$defineRoutes({ @@ -81,9 +81,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: request.url, b: request.query})} } }) - xhr({method: "GET", url: "/item/:x", data: {x: "y"}}).then(function(data) { + xhr({method: "GET", url: "/item/:x", data: {x: "y"}}).map(function(data) { o(data).deepEquals({a: "/item/y", b: {}}) - }).then(done) + }).map(done) }) o("works w/ parameterized url via POST", function(done) { mock.$defineRoutes({ @@ -91,9 +91,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: request.url, b: JSON.parse(request.body)})} } }) - xhr({method: "POST", url: "/item/:x", data: {x: "y"}}).then(function(data) { + xhr({method: "POST", url: "/item/:x", data: {x: "y"}}).map(function(data) { o(data).deepEquals({a: "/item/y", b: {}}) - }).then(done) + }).map(done) }) o("ignores unresolved parameter via GET", function(done) { mock.$defineRoutes({ @@ -101,9 +101,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: request.url})} } }) - xhr({method: "GET", url: "/item/:x"}).then(function(data) { + xhr({method: "GET", url: "/item/:x"}).map(function(data) { o(data).deepEquals({a: "/item/:x"}) - }).then(done) + }).map(done) }) o("ignores unresolved parameter via POST", function(done) { mock.$defineRoutes({ @@ -111,9 +111,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({a: request.url})} } }) - xhr({method: "GET", url: "/item/:x"}).then(function(data) { + xhr({method: "GET", url: "/item/:x"}).map(function(data) { o(data).deepEquals({a: "/item/:x"}) - }).then(done) + }).map(done) }) o("type parameter works for Array responses", function(done) { var Entity = function(args) { @@ -125,9 +125,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify([{id: 1}, {id: 2}, {id: 3}])} } }) - xhr({method: "GET", url: "/item", type: Entity}).then(function(data) { + xhr({method: "GET", url: "/item", type: Entity}).map(function(data) { o(data).deepEquals([{_id: 1}, {_id: 2}, {_id: 3}]) - }).then(done) + }).map(done) }) o("type parameter works for Object responses", function(done) { var Entity = function(args) { @@ -139,9 +139,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({id: 1})} } }) - xhr({method: "GET", url: "/item", type: Entity}).then(function(data) { + xhr({method: "GET", url: "/item", type: Entity}).map(function(data) { o(data).deepEquals({_id: 1}) - }).then(done) + }).map(done) }) o("serialize parameter works in GET", function(done) { var serialize = function(data) { @@ -153,9 +153,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({body: request.query})} } }) - xhr({method: "GET", url: "/item", serialize: serialize, data: {id: 1}}).then(function(data) { + xhr({method: "GET", url: "/item", serialize: serialize, data: {id: 1}}).map(function(data) { o(data.body).equals("?id=1") - }).then(done) + }).map(done) }) o("serialize parameter works in POST", function(done) { var serialize = function(data) { @@ -167,9 +167,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({body: request.body})} } }) - xhr({method: "POST", url: "/item", serialize: serialize, data: {id: 1}}).then(function(data) { + xhr({method: "POST", url: "/item", serialize: serialize, data: {id: 1}}).map(function(data) { o(data.body).equals("id=1") - }).then(done) + }).map(done) }) o("deserialize parameter works in GET", function(done) { var deserialize = function(data) { @@ -181,9 +181,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({test: 123})} } }) - xhr({method: "GET", url: "/item", deserialize: deserialize}).then(function(data) { + xhr({method: "GET", url: "/item", deserialize: deserialize}).map(function(data) { o(data).equals("{\"test\":123}") - }).then(done) + }).map(done) }) o("deserialize parameter works in POST", function(done) { var deserialize = function(data) { @@ -195,9 +195,9 @@ o.spec("xhr", function() { return {status: 200, responseText: JSON.stringify({test: 123})} } }) - xhr({method: "POST", url: "/item", deserialize: deserialize}).then(function(data) { + xhr({method: "POST", url: "/item", deserialize: deserialize}).map(function(data) { o(data).equals("{\"test\":123}") - }).then(done) + }).map(done) }) o("extract parameter works in GET", function(done) { var extract = function(data) { @@ -209,9 +209,9 @@ o.spec("xhr", function() { return {status: 200, responseText: ""} } }) - xhr({method: "GET", url: "/item", extract: extract}).then(function(data) { + xhr({method: "GET", url: "/item", extract: extract}).map(function(data) { o(data).deepEquals({test: 123}) - }).then(done) + }).map(done) }) o("extract parameter works in POST", function(done) { var extract = function(data) { @@ -223,9 +223,9 @@ o.spec("xhr", function() { return {status: 200, responseText: ""} } }) - xhr({method: "POST", url: "/item", extract: extract}).then(function(data) { + xhr({method: "POST", url: "/item", extract: extract}).map(function(data) { o(data).deepEquals({test: 123}) - }).then(done) + }).map(done) }) o("config parameter works", function(done) { mock.$defineRoutes({ @@ -233,7 +233,7 @@ o.spec("xhr", function() { return {status: 200, responseText: ""} } }) - xhr({method: "POST", url: "/item", config: config}).then(done) + xhr({method: "POST", url: "/item", config: config}).map(done) function config(xhr) { o(typeof xhr.setRequestHeader).equals("function") @@ -251,7 +251,7 @@ o.spec("xhr", function() { }) xhr({method: "GET", url: "/item"}).catch(function(e) { o(e.message).equals(JSON.stringify({error: "error"})) - }).then(done) + }).map(done) }) o("rejects on non-JSON server error", function(done) { mock.$defineRoutes({ @@ -261,7 +261,7 @@ o.spec("xhr", function() { }) xhr({method: "GET", url: "/item"}).catch(function(e) { o(e.message).equals("error") - }).then(done) + }).map(done) }) }) }) \ No newline at end of file diff --git a/util/prop.js b/util/prop.js deleted file mode 100644 index d5174796..00000000 --- a/util/prop.js +++ /dev/null @@ -1,8 +0,0 @@ -"use strict" - -module.exports = function(store) { - return function() { - if (arguments.length > 0) store = arguments[0] - return store - } -} \ No newline at end of file diff --git a/util/stream.js b/util/stream.js new file mode 100644 index 00000000..222ec078 --- /dev/null +++ b/util/stream.js @@ -0,0 +1,159 @@ +"use strict" + +var guid = 0, noop = function() {}, HALT = {} +function createStream() { + function stream() { + if (arguments.length > 0) updateStream(stream, arguments[0], undefined) + return stream._state.value + } + initStream(stream, arguments) + + if (arguments.length > 0) updateStream(stream, arguments[0], undefined) + + return stream +} +function initStream(stream, args) { + stream.constructor = createStream + stream._state = {id: guid++, value: undefined, error: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], errorStream: undefined, endStream: undefined} + stream.map = map, stream.ap = ap, stream.of = createStream + stream.valueOf = valueOf + stream.catch = doCatch + + Object.defineProperties(stream, { + error: {get: function() { + if (!stream._state.errorStream) { + var errorStream = function() { + if (arguments.length > 0) updateStream(stream, undefined, arguments[0]) + return stream._state.error + } + initStream(errorStream, []) + initDependency(errorStream, [stream], noop, noop) + stream._state.errorStream = errorStream + } + return stream._state.errorStream + }}, + end: {get: function() { + if (!stream._state.endStream) { + var endStream = createStream() + endStream.map(function(value) { + if (value === true) unregisterStream(stream), unregisterStream(endStream) + return value + }) + stream._state.endStream = endStream + } + return stream._state.endStream + }} + }) +} +function updateStream(stream, value, error) { + if (!absorbStream(stream, value, false) && !absorbStream(stream, error, true)) { + updateState(stream, value, error) + for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) + finalize(stream) + } +} +function updateState(stream, value, error) { + if (error !== undefined && typeof stream._state.recover === "function") { + try {updateValues(stream, stream._state.recover(), undefined)} + catch (e) {updateValues(stream, undefined, e)} + } + else updateValues(stream, value, error) + stream._state.changed = true + if (stream._state.state !== 2) stream._state.state = 1 +} +function updateValues(stream, value, error) { + stream._state.value = value + stream._state.error = error +} +function updateDependency(stream, mustSync) { + var state = stream._state, parents = state.parents + if (parents.length > 0 && parents.filter(active).length === parents.length && (mustSync || parents.filter(changed).length > 0)) { + var failed = parents.filter(errored) + if (failed.length > 0) updateState(stream, undefined, failed[0]._state.error) + else { + try { + var value = state.derive() + if (!absorbStream(stream, value)) updateState(stream, value, undefined) + } + catch (e) { + updateState(stream, undefined, e) + } + } + } +} +function absorbStream(stream, value, isError) { + if (value != null && value.constructor === createStream) { + if (value._state.state === 2) { + stream.end(true) + stream(value()) + } + else if (value._state.error) stream.error(value.error()) + else if (value._state.state === 0) return true + else if (!isError) stream(value()) + else stream.error(value()) + return true + } + return false +} +function finalize(stream) { + stream._state.changed = false + for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false +} + +function doCatch(fn) { + var stream = this + var derive = function() {return stream._state.value} + var recover = function() {return fn(stream._state.error)} + return initDependency(createStream(), [stream], derive, recover) +} +function combine(fn, streams) { + return initDependency(createStream(), streams, function() { + var failed = streams.filter(errored) + if (failed.length > 0) throw failed[0]._state.error + return fn.apply(this, streams.concat([streams.filter(changed)])) + }, undefined) +} + +function initDependency(dep, streams, derive, recover) { + var state = dep._state + state.derive = derive + state.recover = recover + state.parents = streams.filter(notEnded) + + registerDependency(dep, state.parents) + updateDependency(dep, true) + + return dep +} +function registerDependency(stream, parents) { + for (var i = 0; i < parents.length; i++) { + parents[i]._state.deps[stream._state.id] = stream + registerDependency(stream, parents[i]._state.parents) + } +} +function unregisterStream(stream) { + for (var id in stream._state.deps) { + var dependent = stream._state.deps[id] + var index = dependent._state.parents.indexOf(stream) + if (index > -1) dependent._state.parents.splice(index, 1) + } + stream._state.state = 2 //ended + stream._state.deps = {} +} + +function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} +function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [this, stream])} +function valueOf() {return this._state.value} + +function active(stream) {return stream._state.state === 1} +function changed(stream) {return stream._state.changed} +function notEnded(stream) {return stream._state.state !== 2} +function errored(stream) {return stream._state.error} + +function reject(e) { + var stream = createStream() + stream.error(e) + return stream +} + +module.exports = {stream: createStream, combine: combine, reject: reject} diff --git a/util/tests/index.html b/util/tests/index.html index e156fad4..26954980 100644 --- a/util/tests/index.html +++ b/util/tests/index.html @@ -6,11 +6,12 @@ + - - + + diff --git a/util/tests/test-prop.js b/util/tests/test-prop.js deleted file mode 100644 index 124c5d5a..00000000 --- a/util/tests/test-prop.js +++ /dev/null @@ -1,16 +0,0 @@ -"use strict" - -var o = require("../../ospec/ospec") -var prop = require("../../util/prop") - -o.spec("prop", function() { - o("works", function() { - var store = prop(1) - var initialValue = store() - store(2) - var newValue = store() - - o(initialValue).equals(1) - o(newValue).equals(2) - }) -}) \ No newline at end of file diff --git a/util/tests/test-stream.js b/util/tests/test-stream.js new file mode 100644 index 00000000..099ee4b6 --- /dev/null +++ b/util/tests/test-stream.js @@ -0,0 +1,779 @@ +"use strict" + +var o = require("../../ospec/ospec") +var callAsync = require("../../test-utils/callAsync") +var Stream = require("../../util/stream") + +o.spec("stream", function() { + o.spec("stream", function() { + o("works as getter/setter", function() { + var stream = Stream.stream(1) + var initialValue = stream() + stream(2) + var newValue = stream() + + o(initialValue).equals(1) + o(newValue).equals(2) + }) + o("has undefined value by default", function() { + var stream = Stream.stream() + + o(stream()).equals(undefined) + }) + o("can update to undefined", function() { + var stream = Stream.stream(1) + stream(undefined) + + o(stream()).equals(undefined) + }) + }) + o.spec("combine", function() { + o("transforms value", function() { + var stream = Stream.stream() + var doubled = Stream.combine(function(s) {return s() * 2}, [stream]) + + stream(2) + + o(doubled()).equals(4) + }) + o("transforms default value", function() { + var stream = Stream.stream(2) + var doubled = Stream.combine(function(s) {return s() * 2}, [stream]) + + o(doubled()).equals(4) + }) + o("transforms multiple values", function() { + var s1 = Stream.stream() + var s2 = Stream.stream() + var added = Stream.combine(function(s1, s2) {return s1() + s2()}, [s1, s2]) + + s1(2) + s2(3) + + o(added()).equals(5) + }) + o("transforms multiple default values", function() { + var s1 = Stream.stream(2) + var s2 = Stream.stream(3) + var added = Stream.combine(function(s1, s2) {return s1() + s2()}, [s1, s2]) + + o(added()).equals(5) + }) + o("transforms mixed default and late-bound values", function() { + var s1 = Stream.stream(2) + var s2 = Stream.stream() + var added = Stream.combine(function(s1, s2) {return s1() + s2()}, [s1, s2]) + + s2(3) + + o(added()).equals(5) + }) + o("combines atomically", function() { + var count = 0 + var a = Stream.stream() + var b = Stream.combine(function(a) {return a() * 2}, [a]) + var c = Stream.combine(function(a) {return a() * a()}, [a]) + var d = Stream.combine(function(b, c) { + count++ + return b() + c() + }, [b, c]) + + a(3) + + o(d()).equals(15) + o(count).equals(1) + }) + o("combines default value atomically", function() { + var count = 0 + var a = Stream.stream(3) + var b = Stream.combine(function(a) {return a() * 2}, [a]) + var c = Stream.combine(function(a) {return a() * a()}, [a]) + var d = Stream.combine(function(b, c) { + count++ + return b() + c() + }, [b, c]) + + o(d()).equals(15) + o(count).equals(1) + }) + o("combine lists only changed upstreams in last arg", function() { + var streams = [] + var a = Stream.stream() + var b = Stream.stream() + var c = Stream.combine(function(a, b, changed) { + streams = changed + }, [a, b]) + + a(3) + b(5) + + o(streams.length).equals(1) + o(streams[0]).equals(b) + }) + o("combine lists only changed upstreams in last arg with default value", function() { + var streams = [] + var a = Stream.stream(3) + var b = Stream.stream(5) + var c = Stream.combine(function(a, b, changed) { + streams = changed + }, [a, b]) + + a(7) + + o(streams.length).equals(1) + o(streams[0]).equals(a) + }) + o("combine can return undefined", function() { + var a = Stream.stream(1) + var b = Stream.combine(function(a) { + return undefined + }, [a]) + + o(b()).equals(undefined) + }) + o("combine absorbs streams", function() { + var a = Stream.stream(1) + var b = Stream.combine(function(a) { + return Stream.stream(2) + }, [a]) + + o(b()).equals(2) + }) + o("combine absorbs errored streams", function() { + var a = Stream.stream(1) + var b = Stream.combine(function(a) { + return Stream.reject(new Error("error")) + }, [a]) + + o(b()).equals(undefined) + o(b.error().message).equals("error") + }) + o("combine absorbs ready streams", function() { + var count = 0 + var a = Stream.stream(1) + var b = Stream.combine(function(a) { + return Stream.stream() + }, [a]) + var c = Stream.combine(function(b) { + count++ + return 2 + }, [b]) + + o(c()).equals(undefined) + o(count).equals(0) + }) + }) + o.spec("end", function() { + o("end stream works", function() { + var stream = Stream.stream() + var doubled = Stream.combine(function(stream) {return stream() * 2}, [stream]) + + stream.end(true) + + stream(3) + + o(doubled()).equals(undefined) + }) + o("end stream works with default value", function() { + var stream = Stream.stream(2) + var doubled = Stream.combine(function(stream) {return stream() * 2}, [stream]) + + stream.end(true) + + stream(3) + + o(doubled()).equals(4) + }) + o("cannot add downstream to ended stream", function() { + var stream = Stream.stream(2) + stream.end(true) + + var doubled = Stream.combine(function(stream) {return stream() * 2}, [stream]) + stream(3) + + o(doubled()).equals(undefined) + }) + }) + o.spec("error", function() { + o("error() works", function() { + var stream = Stream.stream() + var errored = Stream.combine(function(stream) {throw new Error("error")}, [stream]) + + stream(3) + + o(errored()).equals(undefined) + o(errored.error().message).equals("error") + }) + o("error() works with default value", function() { + var stream = Stream.stream(3) + var errored = Stream.combine(function(stream) {throw new Error("error")}, [stream]) + + o(errored()).equals(undefined) + o(errored.error().message).equals("error") + }) + o("error() removes error on valid value", function() { + var stream = Stream.stream("a") + var doubled = Stream.combine(function(stream) { + if (typeof stream() !== "number") throw new Error("error") + else return stream() * 2 + }, [stream]) + + stream(3) + + o(doubled()).equals(6) + o(doubled.error()).equals(undefined) + }) + o("error() triggers catch", function() { + var count = 0 + var stream = Stream.stream(1) + var handled = stream.catch(function() { + count++ + return 2 + }) + + stream.error(new Error("error")) + + o(handled()).equals(2) + o(handled.error()).equals(undefined) + o(count).equals(1) + }) + o("thrown error propagates downstream", function() { + var count = 0 + var stream = Stream.stream(1) + .map(function() {throw new Error("error")}) + .map(function(value) { + count++ + return value * 2 + }) + .map(function(value) { + count++ + return value * 3 + }) + + o(stream()).equals(undefined) + o(stream.error().message).equals("error") + o(count).equals(0) + }) + o("set error propagates downstream", function() { + var count = 0 + var stream = Stream.stream() + var mapped = stream.map(function(value) { + count++ + return value * 2 + }) + .map(function(value) { + count++ + return value * 3 + }) + stream.error(new Error("error")) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("error") + o(count).equals(0) + }) + o("error.map works", function() { + var stream = Stream.stream(1) + var mappedFromError = stream.error.map(function(value) { + return "from" + value.message + }) + + o(mappedFromError()).equals(undefined) + + stream.error(new Error("error")) + + o(mappedFromError()).equals("fromerror") + }) + o("error from error.map propagates", function() { + var stream = Stream.stream(1) + var mappedFromError = stream.error.map(function(value) { + return "from" + value.message + }).map(function(value) { + return "a" + value + }) + + o(mappedFromError()).equals(undefined) + + stream.error(new Error("error")) + + o(mappedFromError()).equals("afromerror") + }) + o("error thrown from error.map propagates downstream", function() { + var count = 0 + var stream = Stream.stream(1) + var mappedFromError = stream.error.map(function(value) { + throw new Error("b") + }) + + var downstream = mappedFromError.map(function() { + count++ + }) + + o(mappedFromError()).equals(undefined) + + stream.error(new Error("a")) + + o(mappedFromError()).equals(undefined) + o(mappedFromError.error().message).equals("b") + o(downstream()).equals(undefined) + o(downstream.error().message).equals("b") + o(count).equals(0) + }) + o("error.map absorbs streams", function() { + var stream = Stream.reject(new Error("error")) + var error = stream.error.map(function(value) { + return Stream.stream(1) + }) + + o(error()).equals(1) + }) + }) + o.spec("reject", function() { + o("reject works", function() { + var stream = Stream.reject(new Error("error")) + + o(stream()).equals(undefined) + o(stream.error().message).equals("error") + }) + o("rejected propagates downstream", function() { + var count = 0 + var stream = Stream.reject(new Error("error")) + .map(function(value) { + count++ + return value * 2 + }) + .map(function(value) { + count++ + return value * 3 + }) + + o(stream()).equals(undefined) + o(stream.error().message).equals("error") + }) + o("rejected removes error on value", function() { + var stream = Stream.reject(new Error("error")) + var doubled = stream.map(function(value) { + return value * 2 + }) + + stream(1) + + o(doubled()).equals(2) + o(stream.error()).equals(undefined) + }) + o("combined rejected yields first error", function() { + var count = 0 + var a = Stream.reject(new Error("a")) + var b = Stream.reject(new Error("b")) + var combined = Stream.combine(function(a, b) { + count++ + return a() + b() + }, [a, b]) + + o(combined()).equals(undefined) + o(combined.error().message).equals("a") + o(count).equals(0) + }) + }) + o.spec("catch", function() { + o("catch works from reject", function() { + var count = 0 + var stream = Stream.reject(new Error("error")).catch(function(e) { + count++ + return "no" + e.message + }).map(function(value) { + return value + "mapped" + }) + + o(count).equals(1) + o(stream()).equals("noerrormapped") + o(stream.error()).equals(undefined) + }) + o("catch works from combine", function() { + var count = 0 + var stream = Stream.combine(function() {throw new Error("error")}, [Stream.stream(1)]).catch(function(e) { + count++ + return "no" + e.message + }).map(function(value) { + return value + "mapped" + }) + + o(count).equals(1) + o(stream()).equals("noerrormapped") + o(stream.error()).equals(undefined) + }) + o("catch is not called if no error", function() { + var count = 0 + var stream = Stream.stream() + var handled = stream.map(function(value) {return value + value}).catch(function(e) { + count++ + return "no" + e.message + }).map(function(value) { + return value + "mapped" + }) + + stream("a") + + o(count).equals(0) + o(handled()).equals("aamapped") + o(handled.error()).equals(undefined) + }) + o("catch is not called if no error with default value", function() { + var count = 0 + var stream = Stream.stream("a").map(function(value) {return value + value}).catch(function(e) { + count++ + return "no" + e.message + }).map(function(value) { + return value + "mapped" + }) + + o(count).equals(0) + o(stream()).equals("aamapped") + o(stream.error()).equals(undefined) + }) + o("throwing from catch rejects", function() { + var stream = Stream.reject(new Error("a")).catch(function(e) { + throw new Error("b") + }) + var mapped = stream.map(function(value) {return value + "ok"}) + + o(stream()).equals(undefined) + o(stream.error().message).equals("b") + o(mapped()).equals(undefined) + o(mapped.error().message).equals("b") + }) + o("catch can return undefined", function() { + var stream = Stream.reject(new Error("b")).catch(function(e) {}).map(function(value) {return String(value)}) + + o(stream()).equals("undefined") + o(stream.error()).equals(undefined) + }) + o("catch does not prevent sibling error propagation", function() { + var a = Stream.reject(new Error("a")) + var b = a.map(function(value) {return value + "b"}).catch(function(e) {}) + var c = a.map(function(value) {return value + "c"}) + var d = Stream.combine(function(b, c) {return b() + c()}, [b, c]) + + o(d()).equals(undefined) + o(d.error().message).equals("a") + }) + o("catches absorbed rejected stream", function() { + var caught + var stream = Stream.stream(1).map(function() { + return Stream.reject(new Error("error")) + }).catch(function(value) { + caught = value + return "no" + value.message + }).map(function(value) { + return value + "mapped" + }) + + o(stream()).equals("noerrormapped") + }) + }) + o.spec("map", function() { + o("works", function() { + var stream = Stream.stream() + var doubled = stream.map(function(value) {return value * 2}) + + stream(3) + + o(doubled()).equals(6) + }) + o("works with default value", function() { + var stream = Stream.stream(3) + var doubled = stream.map(function(value) {return value * 2}) + + o(doubled()).equals(6) + }) + o("works with undefined value", function() { + var stream = Stream.stream() + var mapped = stream.map(function(value) {return String(value)}) + + stream(undefined) + + o(mapped()).equals("undefined") + }) + o("works with default undefined value", function() { + var stream = Stream.stream(undefined) + var mapped = stream.map(function(value) {return String(value)}) + + o(mapped()).equals("undefined") + }) + }) + o.spec("ap", function() { + o("works", function() { + var apply = Stream.stream(function(value) {return value * 2}) + var stream = Stream.stream(3) + var applied = apply.ap(stream) + + o(applied()).equals(6) + + apply(function(value) {return value / 3}) + + o(applied()).equals(1) + + stream(9) + + o(applied()).equals(3) + }) + o("works with undefined value", function() { + var apply = Stream.stream(function(value) {return String(value)}) + var stream = Stream.stream(undefined) + var applied = apply.ap(stream) + + o(applied()).equals("undefined") + + apply(function(value) {return String(value) + "a"}) + + o(applied()).equals("undefineda") + }) + }) + o.spec("absorbing semantics", function() { + o("absorbs stream", function() { + var stream = Stream.stream(Stream.stream(1)) + var doubled = stream.map(function(value) {return value * 2}) + + o(stream()).equals(1) + o(doubled()).equals(2) + }) + o("absorbs stream on update", function() { + var stream = Stream.stream() + var doubled = stream.map(function(value) {return value * 2}) + + stream(Stream.stream(1)) + + o(stream()).equals(1) + o(doubled()).equals(2) + }) + o("absorbs errored stream", function() { + var stream = Stream.stream(Stream.reject(1)) + var doubled = stream.error.map(function(value) {return value * 2}) + + o(stream()).equals(undefined) + o(stream.error()).equals(1) + o(doubled()).equals(2) + }) + o("absorbs pending stream", function() { + var count = 0 + var stream = Stream.stream(Stream.stream()) + var mapped = stream.map(function() { + count++ + }) + + o(stream()).equals(undefined) + o(count).equals(0) + }) + o("absorbs ended stream", function() { + var count = 0 + var ended = Stream.stream() + ended.end(true) + + var stream = Stream.stream(ended) + var mapped = stream.map(function() { + count++ + }) + + stream(1) + + o(count).equals(0) + }) + o("combine absorbs stream", function() { + var stream = Stream.stream(1) + var combined = Stream.combine(function(stream) { + return Stream.stream(2) + }, [stream]) + var doubled = combined.map(function(value) {return value * 2}) + + o(combined()).equals(2) + o(doubled()).equals(4) + }) + o("combine absorbs errored stream", function() { + var stream = Stream.stream(1) + var combined = Stream.combine(function(stream) { + return Stream.reject(2) + }, [stream]) + var doubled = combined.error.map(function(value) {return value * 2}) + + o(combined()).equals(undefined) + o(combined.error()).equals(2) + o(doubled()).equals(4) + }) + o("combine absorbs pending stream", function() { + var count = 0 + var stream = Stream.stream(1) + var combined = Stream.combine(function(stream) { + return Stream.stream() + }, [stream]) + var mapped = combined.map(function() { + count++ + }) + + o(combined()).equals(undefined) + o(count).equals(0) + }) + o("combine absorbs ended stream", function() { + var count = 0 + var stream = Stream.stream(1) + var combined = Stream.combine(function(stream) { + var ended = Stream.stream(2) + ended.end(true) + return ended + }, [stream]) + var mapped = combined.map(function() { + count++ + }) + + o(combined()).equals(2) + o(count).equals(0) + }) + o("reject absorbs stream", function() { + var stream = Stream.reject(Stream.stream(1)) + var doubled = stream.error.map(function(value) {return value * 2}) + + o(stream.error()).equals(1) + o(doubled()).equals(2) + }) + o("reject absorbs errored stream", function() { + var count = 0 + var stream = Stream.reject(Stream.reject(new Error("error"))) + var mapped = stream.error.map(function() { + count++ + }) + + o(stream.error().message).equals("error") + o(count).equals(1) + }) + o("reject absorbs pending stream", function() { + var count = 0 + var stream = Stream.reject(Stream.stream()) + var mapped = stream.error.map(function() { + count++ + }) + + o(stream.error()).equals(undefined) + o(count).equals(0) + }) + o("reject absorbs ended stream", function() { + var count = 0 + var ended = Stream.stream(1) + ended.end(true) + + var stream = Stream.reject(ended) + var mapped = stream.error.map(function() { + count++ + }) + + o(count).equals(0) + }) + o("ended stream absorbs stream", function() { + var stream = Stream.stream() + stream.end(true) + + var doubled = stream.map(function(value) {return value * 2}) + stream(Stream.stream(1)) + + o(stream()).equals(1) + o(doubled()).equals(undefined) + }) + o("ended stream absorbs pending stream", function() { + var stream = Stream.stream() + stream.end(true) + + var doubled = stream.map(function(value) {return value * 2}) + stream(Stream.stream()) + + o(stream()).equals(undefined) + o(doubled()).equals(undefined) + }) + o("ended stream absorbs errored stream", function() { + var stream = Stream.stream() + stream.end(true) + + var doubled = stream.map(function(value) {return value * 2}) + stream(Stream.reject(new Error("error"))) + + o(stream.error().message).equals("error") + o(doubled()).equals(undefined) + }) + o("ended stream absorbs ended stream", function() { + var stream = Stream.stream(1) + stream.end(true) + + var ended = Stream.stream(2) + ended.end(true) + + var doubled = stream.map(function(value) {return value * 2}) + stream(ended) + + o(stream()).equals(2) + o(doubled()).equals(undefined) + }) + }) + o.spec("fantasy-land", function() { + o.spec("functor", function() { + o("identity", function() { + var stream = Stream.stream(3) + var mapped = stream.map(function(value) {return value}) + + o(stream()).equals(mapped()) + }) + o("composition", function() { + function f(x) {return x * 2} + function g(x) {return x * x} + + var stream = Stream.stream(3) + + var mapped = stream.map(function(value) {return f(g(value))}) + var composed = stream.map(g).map(f) + + o(mapped()).equals(18) + o(mapped()).equals(composed()) + }) + }) + o.spec("apply", function() { + o("composition", function() { + var a = Stream.stream(function(value) {return value * 2}) + var u = Stream.stream(function(value) {return value * 3}) + var v = Stream.stream(5) + + var mapped = a.map(function(f) { + return function(g) { + return function(x) { + return f(g(x)) + } + } + }).ap(u).ap(v) + + var composed = a.ap(u.ap(v)) + + o(mapped()).equals(30) + o(mapped()).equals(composed()) + }) + }) + o.spec("applicative", function() { + o("identity", function() { + var a = Stream.stream().of(function(value) {return value}) + var v = Stream.stream(5) + + o(a.ap(v)()).equals(5) + o(a.ap(v)()).equals(v()) + }) + o("homomorphism", function() { + var a = Stream.stream(0) + var f = function(value) {return value * 2} + var x = 3 + + o(a.of(f).ap(a.of(x))()).equals(6) + o(a.of(f).ap(a.of(x))()).equals(a.of(f(x))()) + }) + o("interchange", function() { + var u = Stream.stream(function(value) {return value * 2}) + var a = Stream.stream() + var y = 3 + + o(u.ap(a.of(y))()).equals(6) + o(u.ap(a.of(y))()).equals(a.of(function(f) {return f(y)}).ap(u)()) + }) + }) + }) +}) \ No newline at end of file