diff --git a/api/router.js b/api/router.js index 496b5684..2ca45757 100644 --- a/api/router.js +++ b/api/router.js @@ -11,13 +11,13 @@ module.exports = function($window, renderer, pubsub) { var replay = router.defineRoutes(routes, function(payload, args, path, route) { if (typeof payload.view !== "function") { if (typeof payload.render !== "function") payload.render = function(vnode) {return vnode} - var render = function(component) { + var use = function(component) { current.path = path, current.component = component renderer.render(root, payload.render(Vnode(component, null, args, undefined, undefined, undefined))) } - if (typeof payload.resolve !== "function") payload.resolve = function() {render(current.component)} - if (path !== current.path) payload.resolve(render, args, path, route) - else render(current.component) + if (typeof payload.resolve !== "function") payload.resolve = function() {use(current.component)} + if (path !== current.path) payload.resolve(use, args, path, route) + else use(current.component) } else { renderer.render(root, Vnode(payload, null, args, undefined, undefined, undefined)) diff --git a/index.js b/index.js index 710df2fb..2c63a54f 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,6 @@ "use strict" -var Stream = require("./util/stream") +var Stream = require("./util/stream")(console.error.bind(console)) var m = require("./render/hyperscript") var renderService = require("./render/render")(window) var requestService = require("./request/request")(window) diff --git a/mithril.js b/mithril.js index e279e368..930a359b 100644 --- a/mithril.js +++ b/mithril.js @@ -1,186 +1,188 @@ new function() { -var guid = 0, noop = function() {}, HALT = {} -function createStream() { - function stream() { - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) - return stream._state.value - } - initStream(stream, arguments) - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) - return stream -} -function initStream(stream, args) { - stream.constructor = createStream - stream._state = {id: guid++, value: undefined, error: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], errorStream: undefined, endStream: undefined} - stream.map = map, stream.ap = ap, stream.of = createStream - stream.valueOf = valueOf, stream.toJSON = toJSON, stream.toString = valueOf - stream.run = run, stream.catch = doCatch - Object.defineProperties(stream, { - error: {get: function() { - if (!stream._state.errorStream) { - var errorStream = function() { - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, undefined, arguments[0]) - return stream._state.error - } - initStream(errorStream, []) - initDependency(errorStream, [stream], noop, noop) - stream._state.errorStream = errorStream - } - return stream._state.errorStream - }}, - end: {get: function() { - if (!stream._state.endStream) { - var endStream = createStream() - endStream.map(function(value) { - if (value === true) unregisterStream(stream), unregisterStream(endStream) - return value - }) - stream._state.endStream = endStream - } - return stream._state.endStream - }} - }) -} -function updateStream(stream, value, error) { - updateState(stream, value, error) - for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) - finalize(stream) -} -function updateState(stream, value, error) { - error = unwrapError(value, error) - if (error !== undefined && typeof stream._state.recover === "function") { - if (!resolve(stream, updateValues, true)) return - } - else updateValues(stream, value, error) - stream._state.changed = true - if (stream._state.state !== 2) stream._state.state = 1 -} -function updateValues(stream, value, error) { - stream._state.value = value - stream._state.error = error -} -function updateDependency(stream, mustSync) { - var state = stream._state, parents = state.parents - if (parents.length > 0 && parents.filter(active).length === parents.length && (mustSync || parents.filter(changed).length > 0)) { - var failed = parents.filter(errored) - if (failed.length > 0) updateState(stream, undefined, failed[0]._state.error) - else resolve(stream, updateState, false) - } -} -function resolve(stream, update, shouldRecover) { - try { - var value = shouldRecover ? stream._state.recover() : stream._state.derive() - if (value === HALT) return false - update(stream, value, undefined) - } - catch (e) { - update(stream, undefined, e) - reportUncaughtError(stream, e) - } - return true -} -function unwrapError(value, error) { - if (value != null && value.constructor === createStream) { - if (value._state.error !== undefined) error = value._state.error - else error = unwrapError(value._state.value, value._state.error) - } - return error -} -function finalize(stream) { - stream._state.changed = false - for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false -} -function reportUncaughtError(stream, e) { - if (Object.keys(stream._state.deps).length === 0 && stream._state.derive == null) { - setTimeout(function() { - if (Object.keys(stream._state.deps).length === 0) console.error(e) - }, 0) - } -} -function run(fn) { - var self = createStream(), stream = this - return initDependency(self, [stream], function() { - return absorb(self, fn(stream())) - }, undefined) -} -function doCatch(fn) { - var self = createStream(), stream = this - var derive = function() {return stream._state.value} - var recover = function() {return absorb(self, fn(stream._state.error))} - return initDependency(self, [stream], derive, recover) -} -function combine(fn, streams) { - return initDependency(createStream(), streams, function() { - var failed = streams.filter(errored) - if (failed.length > 0) throw failed[0]._state.error - return fn.apply(this, streams.concat([streams.filter(changed)])) - }, undefined) -} -function absorb(stream, value) { - if (value != null && value.constructor === createStream) { - var absorbable = value - var update = function() { - updateState(stream, absorbable._state.value, absorbable._state.error) - for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) +var Stream = function(log) { + var guid = 0, noop = function() {}, HALT = {} + function createStream() { + function stream() { + if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) + return stream._state.value } - absorbable.map(update).catch(function(e) { - update() - throw e + initStream(stream, arguments) + if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) + return stream + } + function initStream(stream, args) { + stream.constructor = createStream + stream._state = {id: guid++, value: undefined, error: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], errorStream: undefined, endStream: undefined} + stream.map = map, stream.ap = ap, stream.of = createStream + stream.valueOf = valueOf, stream.toJSON = toJSON, stream.toString = valueOf + stream.run = run, stream.catch = doCatch + Object.defineProperties(stream, { + error: {get: function() { + if (!stream._state.errorStream) { + var errorStream = function() { + if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, undefined, arguments[0]) + return stream._state.error + } + initStream(errorStream, []) + initDependency(errorStream, [stream], noop, noop) + stream._state.errorStream = errorStream + } + return stream._state.errorStream + }}, + end: {get: function() { + if (!stream._state.endStream) { + var endStream = createStream() + endStream.map(function(value) { + if (value === true) unregisterStream(stream), unregisterStream(endStream) + return value + }) + stream._state.endStream = endStream + } + return stream._state.endStream + }} }) - - if (absorbable._state.state === 0) return HALT - if (absorbable._state.error) throw absorbable._state.error - value = absorbable._state.value } - return value -} -function initDependency(dep, streams, derive, recover) { - var state = dep._state - state.derive = derive - state.recover = recover - state.parents = streams.filter(notEnded) - registerDependency(dep, state.parents) - updateDependency(dep, true) - return dep -} -function registerDependency(stream, parents) { - for (var i = 0; i < parents.length; i++) { - parents[i]._state.deps[stream._state.id] = stream - registerDependency(stream, parents[i]._state.parents) + function updateStream(stream, value, error) { + updateState(stream, value, error) + for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) + finalize(stream) } -} -function unregisterStream(stream) { - for (var i = 0; i < stream._state.parents.length; i++) { - var parent = stream._state.parents[i] - delete parent._state.deps[stream._state.id] + function updateState(stream, value, error) { + error = unwrapError(value, error) + if (error !== undefined && typeof stream._state.recover === "function") { + if (!resolve(stream, updateValues, true)) return + } + else updateValues(stream, value, error) + stream._state.changed = true + if (stream._state.state !== 2) stream._state.state = 1 } - for (var id in stream._state.deps) { - var dependent = stream._state.deps[id] - var index = dependent._state.parents.indexOf(stream) - if (index > -1) dependent._state.parents.splice(index, 1) + function updateValues(stream, value, error) { + stream._state.value = value + stream._state.error = error } - stream._state.state = 2 //ended - stream._state.deps = {} -} -function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} -function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [this, stream])} -function valueOf() {return this._state.value} -function toJSON() {return JSON.stringify(this._state.value)} -function active(stream) {return stream._state.state === 1} -function changed(stream) {return stream._state.changed} -function notEnded(stream) {return stream._state.state !== 2} -function errored(stream) {return stream._state.error} -function reject(e) { - var stream = createStream() - stream.error(e) - return stream -} -function merge(streams) { - return combine(function () { - return streams.map(function(s) {return s()}) - }, streams) -} -var Stream = {stream: createStream, merge: merge, combine: combine, reject: reject, HALT: HALT} + function updateDependency(stream, mustSync) { + var state = stream._state, parents = state.parents + if (parents.length > 0 && parents.filter(active).length === parents.length && (mustSync || parents.filter(changed).length > 0)) { + var failed = parents.filter(errored) + if (failed.length > 0) updateState(stream, undefined, failed[0]._state.error) + else resolve(stream, updateState, false) + } + } + function resolve(stream, update, shouldRecover) { + try { + var value = shouldRecover ? stream._state.recover() : stream._state.derive() + if (value === HALT) return false + update(stream, value, undefined) + } + catch (e) { + update(stream, undefined, e.__error != null ? e.__error : e) + if (e.__error == null) reportUncaughtError(stream, e) + } + return true + } + function unwrapError(value, error) { + if (value != null && value.constructor === createStream) { + if (value._state.error !== undefined) error = value._state.error + else error = unwrapError(value._state.value, value._state.error) + } + return error + } + function finalize(stream) { + stream._state.changed = false + for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false + } + function reportUncaughtError(stream, e) { + if (Object.keys(stream._state.deps).length === 0) { + setTimeout(function() { + if (Object.keys(stream._state.deps).length === 0) log(e) + }, 0) + } + } + function run(fn) { + var self = createStream(), stream = this + return initDependency(self, [stream], function() { + return absorb(self, fn(stream())) + }, undefined) + } + function doCatch(fn) { + var self = createStream(), stream = this + var derive = function() {return stream._state.value} + var recover = function() {return absorb(self, fn(stream._state.error))} + return initDependency(self, [stream], derive, recover) + } + function combine(fn, streams) { + return initDependency(createStream(), streams, function() { + var failed = streams.filter(errored) + if (failed.length > 0) throw {__error: failed[0]._state.error} + return fn.apply(this, streams.concat([streams.filter(changed)])) + }, undefined) + } + function absorb(stream, value) { + if (value != null && value.constructor === createStream) { + var absorbable = value + var update = function() { + updateState(stream, absorbable._state.value, absorbable._state.error) + for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) + } + absorbable.map(update).catch(function(e) { + update() + throw {__error: e} + }) + + if (absorbable._state.state === 0) return HALT + if (absorbable._state.error) throw {__error: absorbable._state.error} + value = absorbable._state.value + } + return value + } + function initDependency(dep, streams, derive, recover) { + var state = dep._state + state.derive = derive + state.recover = recover + state.parents = streams.filter(notEnded) + registerDependency(dep, state.parents) + updateDependency(dep, true) + return dep + } + function registerDependency(stream, parents) { + for (var i = 0; i < parents.length; i++) { + parents[i]._state.deps[stream._state.id] = stream + registerDependency(stream, parents[i]._state.parents) + } + } + function unregisterStream(stream) { + for (var i = 0; i < stream._state.parents.length; i++) { + var parent = stream._state.parents[i] + delete parent._state.deps[stream._state.id] + } + for (var id in stream._state.deps) { + var dependent = stream._state.deps[id] + var index = dependent._state.parents.indexOf(stream) + if (index > -1) dependent._state.parents.splice(index, 1) + } + stream._state.state = 2 //ended + stream._state.deps = {} + } + function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} + function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [this, stream])} + function valueOf() {return this._state.value} + function toJSON() {return this._state.value != null && typeof this._state.value.toJSON === "function" ? this._state.value.toJSON() : this._state.value} + function active(stream) {return stream._state.state === 1} + function changed(stream) {return stream._state.changed} + function notEnded(stream) {return stream._state.state !== 2} + function errored(stream) {return stream._state.error} + function reject(e) { + var stream = createStream() + stream.error(e) + return stream + } + function merge(streams) { + return combine(function () { + return streams.map(function(s) {return s()}) + }, streams) + } + return {stream: createStream, merge: merge, combine: combine, reject: reject, HALT: HALT} +}(console.error.bind(console)) function Vnode(tag, key, attrs, children, text, dom) { return {tag: tag, key: key, attrs: attrs, children: children, text: text, dom: dom, domSize: undefined, state: {}, events: undefined, instance: undefined} } @@ -1087,13 +1089,13 @@ m.route = function($window, renderer, pubsub) { var replay = router.defineRoutes(routes, function(payload, args, path, route) { if (typeof payload.view !== "function") { if (typeof payload.render !== "function") payload.render = function(vnode) {return vnode} - var render = function(component) { + var use = function(component) { current.path = path, current.component = component renderer.render(root, payload.render(Vnode(component, null, args, undefined, undefined, undefined))) } - if (typeof payload.resolve !== "function") payload.resolve = function() {render(current.component)} - if (path !== current.path) payload.resolve(render, args, path, route) - else render(current.component) + if (typeof payload.resolve !== "function") payload.resolve = function() {use(current.component)} + if (path !== current.path) payload.resolve(use, args, path, route) + else use(current.component) } else { renderer.render(root, Vnode(payload, null, args, undefined, undefined, undefined)) diff --git a/util/stream.js b/util/stream.js index d61df9aa..27640abf 100644 --- a/util/stream.js +++ b/util/stream.js @@ -1,196 +1,198 @@ "use strict" -var guid = 0, noop = function() {}, HALT = {} -function createStream() { - function stream() { - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) - return stream._state.value - } - initStream(stream, arguments) - - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) - - return stream -} -function initStream(stream, args) { - stream.constructor = createStream - stream._state = {id: guid++, value: undefined, error: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], errorStream: undefined, endStream: undefined} - stream.map = map, stream.ap = ap, stream.of = createStream - stream.valueOf = valueOf, stream.toJSON = toJSON, stream.toString = valueOf - stream.run = run, stream.catch = doCatch - - Object.defineProperties(stream, { - error: {get: function() { - if (!stream._state.errorStream) { - var errorStream = function() { - if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, undefined, arguments[0]) - return stream._state.error - } - initStream(errorStream, []) - initDependency(errorStream, [stream], noop, noop) - stream._state.errorStream = errorStream - } - return stream._state.errorStream - }}, - end: {get: function() { - if (!stream._state.endStream) { - var endStream = createStream() - endStream.map(function(value) { - if (value === true) unregisterStream(stream), unregisterStream(endStream) - return value - }) - stream._state.endStream = endStream - } - return stream._state.endStream - }} - }) -} -function updateStream(stream, value, error) { - updateState(stream, value, error) - for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) - finalize(stream) -} -function updateState(stream, value, error) { - error = unwrapError(value, error) - if (error !== undefined && typeof stream._state.recover === "function") { - if (!resolve(stream, updateValues, true)) return - } - else updateValues(stream, value, error) - stream._state.changed = true - if (stream._state.state !== 2) stream._state.state = 1 -} -function updateValues(stream, value, error) { - stream._state.value = value - stream._state.error = error -} -function updateDependency(stream, mustSync) { - var state = stream._state, parents = state.parents - if (parents.length > 0 && parents.filter(active).length === parents.length && (mustSync || parents.filter(changed).length > 0)) { - var failed = parents.filter(errored) - if (failed.length > 0) updateState(stream, undefined, failed[0]._state.error) - else resolve(stream, updateState, false) - } -} -function resolve(stream, update, shouldRecover) { - try { - var value = shouldRecover ? stream._state.recover() : stream._state.derive() - if (value === HALT) return false - update(stream, value, undefined) - } - catch (e) { - update(stream, undefined, e) - reportUncaughtError(stream, e) - } - return true -} -function unwrapError(value, error) { - if (value != null && value.constructor === createStream) { - if (value._state.error !== undefined) error = value._state.error - else error = unwrapError(value._state.value, value._state.error) - } - return error -} -function finalize(stream) { - stream._state.changed = false - for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false -} -function reportUncaughtError(stream, e) { - if (Object.keys(stream._state.deps).length === 0 && stream._state.derive == null) { - setTimeout(function() { - if (Object.keys(stream._state.deps).length === 0) console.error(e) - }, 0) - } -} - -function run(fn) { - var self = createStream(), stream = this - return initDependency(self, [stream], function() { - return absorb(self, fn(stream())) - }, undefined) -} -function doCatch(fn) { - var self = createStream(), stream = this - var derive = function() {return stream._state.value} - var recover = function() {return absorb(self, fn(stream._state.error))} - return initDependency(self, [stream], derive, recover) -} -function combine(fn, streams) { - return initDependency(createStream(), streams, function() { - var failed = streams.filter(errored) - if (failed.length > 0) throw failed[0]._state.error - return fn.apply(this, streams.concat([streams.filter(changed)])) - }, undefined) -} -function absorb(stream, value) { - if (value != null && value.constructor === createStream) { - var absorbable = value - var update = function() { - updateState(stream, absorbable._state.value, absorbable._state.error) - for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) +module.exports = function(log) { + var guid = 0, noop = function() {}, HALT = {} + function createStream() { + function stream() { + if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) + return stream._state.value } - absorbable.map(update).catch(function(e) { - update() - throw e + initStream(stream, arguments) + + if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, arguments[0], undefined) + + return stream + } + function initStream(stream, args) { + stream.constructor = createStream + stream._state = {id: guid++, value: undefined, error: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], errorStream: undefined, endStream: undefined} + stream.map = map, stream.ap = ap, stream.of = createStream + stream.valueOf = valueOf, stream.toJSON = toJSON, stream.toString = valueOf + stream.run = run, stream.catch = doCatch + + Object.defineProperties(stream, { + error: {get: function() { + if (!stream._state.errorStream) { + var errorStream = function() { + if (arguments.length > 0 && arguments[0] !== HALT) updateStream(stream, undefined, arguments[0]) + return stream._state.error + } + initStream(errorStream, []) + initDependency(errorStream, [stream], noop, noop) + stream._state.errorStream = errorStream + } + return stream._state.errorStream + }}, + end: {get: function() { + if (!stream._state.endStream) { + var endStream = createStream() + endStream.map(function(value) { + if (value === true) unregisterStream(stream), unregisterStream(endStream) + return value + }) + stream._state.endStream = endStream + } + return stream._state.endStream + }} }) - - if (absorbable._state.state === 0) return HALT - if (absorbable._state.error) throw absorbable._state.error - value = absorbable._state.value } - return value -} - -function initDependency(dep, streams, derive, recover) { - var state = dep._state - state.derive = derive - state.recover = recover - state.parents = streams.filter(notEnded) - - registerDependency(dep, state.parents) - updateDependency(dep, true) - - return dep -} -function registerDependency(stream, parents) { - for (var i = 0; i < parents.length; i++) { - parents[i]._state.deps[stream._state.id] = stream - registerDependency(stream, parents[i]._state.parents) + function updateStream(stream, value, error) { + updateState(stream, value, error) + for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) + finalize(stream) } -} -function unregisterStream(stream) { - for (var i = 0; i < stream._state.parents.length; i++) { - var parent = stream._state.parents[i] - delete parent._state.deps[stream._state.id] + function updateState(stream, value, error) { + error = unwrapError(value, error) + if (error !== undefined && typeof stream._state.recover === "function") { + if (!resolve(stream, updateValues, true)) return + } + else updateValues(stream, value, error) + stream._state.changed = true + if (stream._state.state !== 2) stream._state.state = 1 } - for (var id in stream._state.deps) { - var dependent = stream._state.deps[id] - var index = dependent._state.parents.indexOf(stream) - if (index > -1) dependent._state.parents.splice(index, 1) + function updateValues(stream, value, error) { + stream._state.value = value + stream._state.error = error } - stream._state.state = 2 //ended - stream._state.deps = {} + function updateDependency(stream, mustSync) { + var state = stream._state, parents = state.parents + if (parents.length > 0 && parents.filter(active).length === parents.length && (mustSync || parents.filter(changed).length > 0)) { + var failed = parents.filter(errored) + if (failed.length > 0) updateState(stream, undefined, failed[0]._state.error) + else resolve(stream, updateState, false) + } + } + function resolve(stream, update, shouldRecover) { + try { + var value = shouldRecover ? stream._state.recover() : stream._state.derive() + if (value === HALT) return false + update(stream, value, undefined) + } + catch (e) { + update(stream, undefined, e.__error != null ? e.__error : e) + if (e.__error == null) reportUncaughtError(stream, e) + } + return true + } + function unwrapError(value, error) { + if (value != null && value.constructor === createStream) { + if (value._state.error !== undefined) error = value._state.error + else error = unwrapError(value._state.value, value._state.error) + } + return error + } + function finalize(stream) { + stream._state.changed = false + for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false + } + function reportUncaughtError(stream, e) { + if (Object.keys(stream._state.deps).length === 0) { + setTimeout(function() { + if (Object.keys(stream._state.deps).length === 0) log(e) + }, 0) + } + } + + function run(fn) { + var self = createStream(), stream = this + return initDependency(self, [stream], function() { + return absorb(self, fn(stream())) + }, undefined) + } + function doCatch(fn) { + var self = createStream(), stream = this + var derive = function() {return stream._state.value} + var recover = function() {return absorb(self, fn(stream._state.error))} + return initDependency(self, [stream], derive, recover) + } + function combine(fn, streams) { + return initDependency(createStream(), streams, function() { + var failed = streams.filter(errored) + if (failed.length > 0) throw {__error: failed[0]._state.error} + return fn.apply(this, streams.concat([streams.filter(changed)])) + }, undefined) + } + function absorb(stream, value) { + if (value != null && value.constructor === createStream) { + var absorbable = value + var update = function() { + updateState(stream, absorbable._state.value, absorbable._state.error) + for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) + } + absorbable.map(update).catch(function(e) { + update() + throw {__error: e} + }) + + if (absorbable._state.state === 0) return HALT + if (absorbable._state.error) throw {__error: absorbable._state.error} + value = absorbable._state.value + } + return value + } + + function initDependency(dep, streams, derive, recover) { + var state = dep._state + state.derive = derive + state.recover = recover + state.parents = streams.filter(notEnded) + + registerDependency(dep, state.parents) + updateDependency(dep, true) + + return dep + } + function registerDependency(stream, parents) { + for (var i = 0; i < parents.length; i++) { + parents[i]._state.deps[stream._state.id] = stream + registerDependency(stream, parents[i]._state.parents) + } + } + function unregisterStream(stream) { + for (var i = 0; i < stream._state.parents.length; i++) { + var parent = stream._state.parents[i] + delete parent._state.deps[stream._state.id] + } + for (var id in stream._state.deps) { + var dependent = stream._state.deps[id] + var index = dependent._state.parents.indexOf(stream) + if (index > -1) dependent._state.parents.splice(index, 1) + } + stream._state.state = 2 //ended + stream._state.deps = {} + } + + function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} + function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [this, stream])} + function valueOf() {return this._state.value} + function toJSON() {return this._state.value != null && typeof this._state.value.toJSON === "function" ? this._state.value.toJSON() : this._state.value} + + function active(stream) {return stream._state.state === 1} + function changed(stream) {return stream._state.changed} + function notEnded(stream) {return stream._state.state !== 2} + function errored(stream) {return stream._state.error} + + function reject(e) { + var stream = createStream() + stream.error(e) + return stream + } + + function merge(streams) { + return combine(function () { + return streams.map(function(s) {return s()}) + }, streams) + } + + return {stream: createStream, merge: merge, combine: combine, reject: reject, HALT: HALT} } - -function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} -function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [this, stream])} -function valueOf() {return this._state.value} -function toJSON() {return JSON.stringify(this._state.value)} - -function active(stream) {return stream._state.state === 1} -function changed(stream) {return stream._state.changed} -function notEnded(stream) {return stream._state.state !== 2} -function errored(stream) {return stream._state.error} - -function reject(e) { - var stream = createStream() - stream.error(e) - return stream -} - -function merge(streams) { - return combine(function () { - return streams.map(function(s) {return s()}) - }, streams) -} - -module.exports = {stream: createStream, merge: merge, combine: combine, reject: reject, HALT: HALT} diff --git a/util/tests/test-stream.js b/util/tests/test-stream.js index e0ca9260..4fec7dcc 100644 --- a/util/tests/test-stream.js +++ b/util/tests/test-stream.js @@ -2,9 +2,15 @@ var o = require("../../ospec/ospec") var callAsync = require("../../test-utils/callAsync") -var Stream = require("../../util/stream") +var StreamFactory = require("../../util/stream") o.spec("stream", function() { + var Stream, spy + o.beforeEach(function() { + spy = o.spy() + Stream = StreamFactory(spy) + }) + o.spec("stream", function() { o("works as getter/setter", function() { var stream = Stream.stream(1) @@ -518,11 +524,12 @@ o.spec("stream", function() { o(mapped()).equals(1) }) o("works with errored stream", function() { + var rejected var stream = Stream.stream(undefined) - var mapped = stream.run(function(value) {return Stream.reject(new Error("error"))}) + var mapped = stream.run(function(value) { + return Stream.reject(new Error("error")) + }) - mapped.catch(function() {}) //silence reportUncaughtException - o(mapped()).equals(undefined) o(mapped.error().message).equals("error") }) @@ -803,14 +810,45 @@ o.spec("stream", function() { }) o.spec("toJSON", function() { o("works", function() { - o(Stream.stream(1).toJSON()).equals("1") - o(Stream.stream("a").toJSON()).equals("\"a\"") - o(Stream.stream(true).toJSON()).equals("true") - o(Stream.stream(null).toJSON()).equals("null") + o(Stream.stream(1).toJSON()).equals(1) + o(Stream.stream("a").toJSON()).equals("a") + o(Stream.stream(true).toJSON()).equals(true) + o(Stream.stream(null).toJSON()).equals(null) o(Stream.stream(undefined).toJSON()).equals(undefined) - o(Stream.stream({a: 1}).toJSON()).deepEquals("{\"a\":1}") - o(Stream.stream([1, 2, 3]).toJSON()).deepEquals("[1,2,3]") + o(Stream.stream({a: 1}).toJSON()).deepEquals({a: 1}) + o(Stream.stream([1, 2, 3]).toJSON()).deepEquals([1, 2, 3]) o(Stream.stream().toJSON()).equals(undefined) + o(Stream.stream(new Date(0)).toJSON()).equals(new Date(0).toJSON()) + }) + o("works w/ JSON.stringify", function() { + o(JSON.stringify(Stream.stream(1))).equals(JSON.stringify(1)) + o(JSON.stringify(Stream.stream("a"))).equals(JSON.stringify("a")) + o(JSON.stringify(Stream.stream(true))).equals(JSON.stringify(true)) + o(JSON.stringify(Stream.stream(null))).equals(JSON.stringify(null)) + o(JSON.stringify(Stream.stream(undefined))).equals(JSON.stringify(undefined)) + o(JSON.stringify(Stream.stream({a: 1}))).deepEquals(JSON.stringify({a: 1})) + o(JSON.stringify(Stream.stream([1, 2, 3]))).deepEquals(JSON.stringify([1, 2, 3])) + o(JSON.stringify(Stream.stream())).equals(JSON.stringify(undefined)) + o(JSON.stringify(Stream.stream(new Date(0)))).equals(JSON.stringify(new Date(0))) + }) + }) + o.spec("uncaught exception reporting", function() { + o("reports thrown errors", function(done) { + Stream.stream(1).map(function() {throw new Error("error")}) + + setTimeout(function() { + o(spy.callCount).equals(1) + o(spy.args[0].message).equals("error") + done() + }, 0) + }) + o("does not report explicit rejections", function(done) { + Stream.reject(1) + + setTimeout(function() { + o(spy.callCount).equals(0) + done() + }, 0) }) }) o.spec("map", function() {