diff --git a/docs/prop.md b/docs/prop.md index 897f11be..08061983 100644 --- a/docs/prop.md +++ b/docs/prop.md @@ -4,20 +4,27 @@ - [Static members](#static-members) - [prop.combine](#prop-combine) - [prop.reject](#prop-reject) + - [prop.HALT](#prop-halt) - [Instance members](#static-members) - - [stream.map](#stream-map) + - [stream.run](#stream-run) - [stream.end](#stream-end) - [stream.error](#stream-error) - [stream.catch](#stream-catch) - [stream.of](#stream-of) + - [stream.map](#stream-map) - [stream.ap](#stream-ap) - [Basic usage](#basic-usage) + - [Streams as variables](#streams-as-variables) + - [Bidirectional bindings](#bidirectional-bindings) + - [Computed properties](#computed-properties) + - [Loading icons and error messages](#loading-icons-and-error-messages) - [Streams vs promises](#streams-vs-promises) - [Chaining streams](#chaining-streams) - [Combining streams](#combining-streams) - [Absorbing streams](#absorbing-streams) - [Stream states](#stream-states) - [Handling errors](#handling-errors) +- [Serializing streams](#serializing-streams) --- @@ -77,15 +84,19 @@ Argument | Type | Required | Description [How to read signatures](signatures.md) +##### prop.HALT + +A special value that can be returned to stream callbacks to halt execution of downstreams + #### Instance members -##### stream.map +##### stream.run Creates a dependent stream whose value is set to the result of the callback function. See [chaining streams](#chaining-streams) -This method exists to conform to [Fantasy Land's Applicative specification](https://github.com/fantasyland/fantasy-land) +If the callback returns a stream, it is absorbed, and `dependentStream` adopts its value and state. -`dependentStream = m.prop().map(callback)` +`dependentStream = m.prop().run(callback)` Argument | Type | Required | Description ------------ | -------------------- | -------- | --- @@ -130,6 +141,23 @@ Argument | Type | Required | Description `value` | `any` | No | If this argument is present, the value of the prop is set to it **returns** | `Stream` | | Returns a stream +##### stream.map + +Creates a dependent stream whose value is set to the result of the callback function. See [chaining streams](#chaining-streams) + +This method is almost functionally identical to [`stream.run()`](#stream-run), except that if the return value is a stream, the stream is not absorbed. + +This method exists to conform to [Fantasy Land's Applicative specification](https://github.com/fantasyland/fantasy-land) + +`dependentStream = m.prop().map(callback)` + +Argument | Type | Required | Description +------------ | -------------------- | -------- | --- +`callback` | `any -> any` | Yes | A callback whose return value becomes the value of the stream +**returns** | `Stream` | | Returns a stream + +[How to read signatures](signatures.md) + ##### stream.ap The name of this method stands for `apply`. If a stream has a function as its value, calling `ap` will call the function with the value of the input stream as its argument, and it will return another stream whose value is the result of the function call. This method exists to conform to [Fantasy Land's Applicative specification](https://github.com/fantasyland/fantasy-land) @@ -145,6 +173,8 @@ Argument | Type | Required | Description ### Basic usage +#### Streams as variables + `m.prop()` returns a stream. At its most basic level, a stream works similar to a variable or a getter-setter property: it can hold state, which can be modified. ```javascript @@ -166,7 +196,11 @@ fetch("/api/users") .then(users) ``` -In the example above, the `users` stream is populated with the response data when the request resolves. It can also be populated from other higher order functions, such as [`m.withAttr`](withAttr.md) +In the example above, the `users` stream is populated with the response data when the request resolves. + +#### Bidirectional bindings + +Streams can also be populated from other higher order functions, such as [`m.withAttr`](withAttr.md) ```javascript // a stream @@ -181,14 +215,88 @@ m("input", { In the example above, when the user types in the input, the `user` stream is updated to the value of the input field. +#### Computed properties + +Streams are useful for implementing computed properties: + +```javascript +var title = m.prop("") +var slug = title.run(function(value) { + return value.toLowerCase().replace(/\W/g, "-") +}) + +title("Hello world") +console.log(slug()) // logs "hello-world" +``` + +In the example above, the value of `slug` is computed when `title` is updated, not when `slug` is read. + +It's of course also possible to compute properties based on multiple streams: + +```javascript +var firstName = m.prop("John") +var lastName = m.prop("Doe") +var fullName = m.prop.combine(function(first, last) { + return first() + last() +}, [firstName, lastName]) + +firstName("Mary") + +console.log(fullName()) // logs "Mary Doe" +``` + +Computed properties in Mithril are updated atomically: the callback to `combine` will never be called more than once per value update, no matter how complex the computed property's dependency graph is. + +#### Loading icons and error messages + +Here's an example using [`m.request`](request.md) that uses streams to implement a loading indicator and an error message for an AJAX call: + +```javascript +var RobustExample = { + oninit: function(vnode) { + var req = m.request({ + method: "GET", + url: "/api/items", + }) + vnode.state.items = req.catch(function() { + return [] + }) + vnode.state.error = req.error.run(this.errorView) + }, + view: function(vnode) { + return [ + vnode.state.items() ? vnode.state.items().map(function(item) { + return m("div", item.name) + }) : m(".loading-icon"), + vnode.state.error(), + ] + }, + errorView: function(e) { + return m(".error", "An error occurred") + } +} + +m.route(document.body, "/", { + "/": MyComponent +}) +``` + +When this component is initialized, `m.request` is called and its return value is assigned to `req`. Before the request completes, the `req` stream remains in a pending state, and therefore has a value of `undefined`. `req.error` is the error stream for the request. Since `req` is pending, the `req.error` stream also remain in a pending state, and likewise, `vnode.state.error` stays pending and does not call `this.errorView`. + +Then the component renders. Both `vnode.state.items()` and `vnode.state.error()` return `undefined`, so the component returns `[m(".loading-icon"), undefined]`, which in turn creates a loading icon element in the DOM. + +When the request to the server completes, `req` is populated with the response data, which is propagated to the `vnode.state.items` dependent stream. (Note that the function in `catch` is not called if there's no error). After the request completes, the component is re-rendered. `vnode.state.error()` is still `undefined`, but now `view` returns a list of vnodes containing item names, and therefore the loading icon is replaced by a list of `div` elements are created in the DOM. + +If the request to the server fails, `catch` is called and `vnode.state.items()` is set to an empty array. Also, `req.error` is populated with the error, and `vnode.state.error` is populated with the vnode tree returned by `errorView`. Therefore, `view` returns `[[], m(".error", "An error occurred")]`, which replaces the loading icon with the error message in the DOM. + --- ### Streams vs promises -Mithril streams have many similarities to [ES6 promises](https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise): +Mithril streams have some similarities to [ES6 promises](https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise): - streams can be [chained](#chaining-streams) (analogous to `promise.then(callback)`) -- streams can be [absorbed by other streams](#absorbing-streams) (analogous to `Promise.resolve(promise)`) +- streams can [absorb other streams](#absorbing-streams) (analogous to `promise.then(function() {return Promise.resolve(1)})`) - streams have [composable error handling semantics](#handling-errors) (analogous to `promise.catch`) These semantic similarities are designed to make it easy to migrate from promise-based asynchronous code to stream-based code. @@ -209,7 +317,7 @@ And here's equivalent stream-based code: ```javascript m.request({url: "/api/users", method: "GET"}) - .map(function(users) { + .run(function(users) { if (users.length === 0) return m.prop.reject("No users found") }) .catch(function(e) { @@ -217,13 +325,17 @@ m.request({url: "/api/users", method: "GET"}) }) ``` -Aside from the syntax differences between the [`fetch API`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) and [`m.request()`](request.md) in the first line of each snippet above, the only other syntax difference is that streams use the method `map` to chain, instead of `then`. +Aside from the syntax differences between the [`fetch API`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) and [`m.request()`](request.md) in the first line of each snippet above, the only other syntax difference is that streams use the method `.run()` to chain, instead of `.then()`. #### Differences -In most use cases, streams can be used as replacements for promises without much effort. However, because streams are more powerful, they have some important differences. +In most use cases, streams can be used as replacements for promises without much effort, by simply renaming `.then()` to `.run()`. -Promises are *immutable*; in other words, a promise can only ever resolve to one value. Streams, on the other hand, are *reactive*: a stream's value can be changed freely, and it automatically updates the values of other streams that depend on it. +One major difference that can affect a migration is that `.run()` only accepts one argument (whereas `.then()` accepts an error handler as the second argument. Misplacement of error handlers is a common source of bugs in promise-based code, and it's generally recommended that error handlers be attached using `.catch()` rather than passed as a second argument to `.then()`. To avoid those issues, error handlers in streams can only be defined using the `.catch()` method. + +Another more obscure functional difference is that if a promise is passed as an argument to `Promise.resolve()` and `Promise.reject()`, the promise is absorbed, whereas absorption does not occur in their stream counterparts `m.prop()` and `m.prop.reject()`. + +There are also a few important differences in semantics between promises and streams. A promise can only ever resolve to one value. Streams, on the other hand, are *reactive*: a stream's value can be changed freely, and it automatically updates the values of other streams that depend on it. Promises are required by spec to resolve asynchronously, even if the resolution value is known in advance (e.g. `Promise.resolve("hello")`). Mithril streams are guaranteed to update synchronously and atomically. @@ -259,14 +371,14 @@ var promise = Promise.resolve(stream()) ### Chaining streams -Streams can be chained using the `map` method. A chained stream is also known as a *dependent stream*. +Streams can be chained using the `run` method. A chained stream is also known as a *dependent stream*. ```javascript // parent stream var stream = m.prop(1) // dependent stream -var doubled = stream.map(function(value) { +var doubled = stream.run(function(value) { return value * 2 }) @@ -275,6 +387,18 @@ console.log(doubled()) // logs 2 Dependent streams are *reactive*: their values are updated any time the value of their parent stream is updated. This happens regardless of whether the dependent stream was created before or after the value of the parent stream was set. +You can prevent dependent streams from being updated by returning the special value `m.prop.HALT` + +```javascript +var halted = m.prop(1).run(function(value) { + return m.prop.HALT +}) + +halted.run(function() { + //never runs +}) +``` + --- ### Combining streams @@ -292,44 +416,48 @@ var added = m.prop.combine(function(a, b) { console.log(added()) // logs 12 ``` -A stream can depend on any number of streams and it's guaranteed to update atomically. For example, if a stream A has two dependent streams B and C, and a fourth stream D is dependent on both B and C, the stream D will only update once if the value of A changes. This guarantees that the callback for stream D is never called with unstable values such as receiving the new value of C but the old value of D. This also bring performance benefits of not recomputing downstreams unnecessarily. +A stream can depend on any number of streams and it's guaranteed to update atomically. For example, if a stream A has two dependent streams B and C, and a fourth stream D is dependent on both B and C, the stream D will only update once if the value of A changes. This guarantees that the callback for stream D is never called with unstable values such as when B has a new value but C has the old value. Atomicity also bring the performance benefits of not recomputing downstreams unnecessarily. + +You can prevent dependent streams from being updated by returning the special value `m.prop.HALT` + +```javascript +var halted = m.prop.combine(function(stream) { + return m.prop.HALT +}, [m.prop(1)]) + +halted.run(function() { + //never runs +}) +``` --- ### Absorbing streams -It's not possible to set the value of a stream to another streams. Doing so will cause wrapper stream to *absorb* the inner stream and adopt its value and [state](#stream-states): +Similar to promises, stream can absorb other streams. Returning a stream from the callback to `.run()` or `.catch()` will cause the wrapper stream to *absorb* the inner stream and adopt its value and [state](#stream-states): ```javascript -var stream = m.prop(m.prop(1)) +var stream = m.prop() +var mapped = m.prop(1).run(function(value) { + return stream(value * 2) +}) -console.log(stream()) // logs 1 +console.log(mapped()) // logs 2 + +stream(4) + +console.log(mapped()) // logs 4 ``` ```javascript -var pending = m.prop(m.prop()) - -console.log(stream()) // logs undefined because the stream is pending -``` - -This behavior also applies when mapping and combining streams: - -```javascript -var mapped = m.prop(1).map(function(value) { - return m.prop(value * 2) +var mapped = m.prop.reject(new Error("error")).catch(function(e) { + return m.prop(2) }) console.log(mapped()) // logs 2 ``` -```javascript -var stream = m.prop(1) -var combined = m.prop.combine(function(stream) { - return m.prop(stream() * 2) -}, [stream]) - -console.log(combined()) // logs 2 -``` +Stream absorption does not occur in fantasy-land methods (i.e. `.map()`, `.ap()`, `.of()`) --- @@ -360,11 +488,11 @@ console.log(added()) // logs undefined In the example above, `added` is a pending stream, because its parent `b` is also pending. -This also applies to dependent streams created via `stream.map`: +This also applies to dependent streams created via `stream.run`: ```javascript var stream = m.prop() -var doubled = stream.map(function(value) {return value * 2}) +var doubled = stream.run(function(value) {return value * 2}) console.log(doubled()) // logs undefined because `doubled` is pending ``` @@ -392,10 +520,10 @@ Errored streams can be created by calling `m.prop.reject()` var erroredStream = m.prop.reject(new Error("Server is offline")) ``` -A stream can also become errored if it's a dependent stream and its [`combiner`](#combiner) or [`map`](#stream-map) function throws an error +A stream can also become errored if it's a dependent stream and its [`combiner`](#combiner) or [`run`](#stream-run) function throws an error ```javascript -var errored1 = m.prop(1).map(function(value) { +var errored1 = m.prop(1).run(function(value) { if (typeof value !== "string") { throw new Error("Not a string") } @@ -430,7 +558,7 @@ A stream can stop affecting its dependent streams by calling `stream.end(true)`. ```javascript var stream = m.prop() -var doubled = stream.map(function(value) {return value * 2}) +var doubled = stream.run(function(value) {return value * 2}) stream.end(true) // set to ended state @@ -480,7 +608,7 @@ errored.error("Server is offline") console.log(errored2.error()) // logs "Server is offline" // by throwing an error in a chain -var errored3 = m.prop("hello").map(function() { +var errored3 = m.prop("hello").run(function() { throw "Server is offline" }) console.log(errored3.error()) // logs "Server is offline" @@ -491,7 +619,7 @@ var errored4 = m.prop.combine(function() { console.log(errored4.error()) // logs "Server is offline" // by returning an errored stream in a chain -var errored5 = m.prop("hello").map(function() { +var errored5 = m.prop("hello").run(function() { return m.prop.reject("Server is offline") }) console.log(errored5.error()) // logs "Server is offline" @@ -506,7 +634,7 @@ console.log(errored6.error()) // logs "Server is offline" Errors in stream chains propagate: if a stream is in an errored state, all of its dependent streams will have the same errored state, unless the error is handled via a `catch` method. ```javascript -var dependentStream = erroredStream.map(function(value) {return value}) +var dependentStream = erroredStream.run(function(value) {return value}) console.log(dependentStream()) // logs undefined console.log(dependentStream.error()) // logs "Server is offline" @@ -527,4 +655,19 @@ console.log(recoveredStream()) // logs "hi" console.log(recoveredStream.error()) // logs undefined ``` +### Serializing streams +Streams implement a `.toJSON()` method. When a stream is passed as the argument to `JSON.stringify()`, the value of the stream is serialized. + +``` +var stream = m.prop(123) +var serialized = JSON.stringify(stream) +console.log(serialized) // logs 123 +``` + +Streams also implement a `valueOf` method that returns the value of the stream. + +``` +var stream = m.prop(123) +console.log("test " + stream) // logs "test 123" +``` \ No newline at end of file diff --git a/docs/request.md b/docs/request.md index 00821aaa..dbe6c3d4 100644 --- a/docs/request.md +++ b/docs/request.md @@ -2,7 +2,7 @@ - [API](#api) - [How it works](#how-it-works) -- [Typical workflow](#typical-workflow) +- [Typical usage](#typical-usage) - [Dynamic URLs](#dynamic-urls) - [Why JSON instead of HTML](#why-json-instead-of-html) - [Why XMLHttpRequest instead of fetch](#why-xmlhttprequest-instead-of-fetch) @@ -43,7 +43,7 @@ The `m.request` utility is a thin wrapper around [`XMLHttpRequest`](https://deve m.request({ method: "GET", url: "/api/v1/users", -}).map(function(users) { +}).run(function(users) { console.log(users) }) ``` @@ -52,7 +52,7 @@ Calls to `m.request` return a [stream](prop.md). --- -### Typical workflow +### Typical usage Here's an illustrative example of a self-contained component that uses `m.request` to retrieve some data from a server. @@ -81,6 +81,8 @@ Let's assume making a request to the server URL `/api/items` returns an array of When `m.route` is called at the bottom, `MyComponent` is initialized. `oninit` is called, which calls `m.request` and assigns its return value (a stream) to `vnode.state.items`. This stream contains the `initialValue` (i.e. an empty array), and this value can be retrieved by calling the stream as a function (i.e. `value = vnode.state.items()`). After the oninit method returns, the component is then rendered. Since `vnode.state.items()` returns an empty array, the component's `view` method also returns an empty array, so no DOM elements are created. When the request to the server completes, `m.request` parses the response data into a Javascript array of objects and sets the value of the stream to that array. Then, the component is rendered again. This time, `vnode.state.items()` returns a non-empty array, so the component's `view` method returns an array of vnodes, which in turn are rendered into `div` DOM elements. +#### Loading icons and error messages + Here's an expanded version of the example above that implements a loading indicator and an error message: ```javascript @@ -93,7 +95,7 @@ var RobustExample = { vnode.state.items = req.catch(function() { return [] }) - vnode.state.error = req.error.map(this.errorView) + vnode.state.error = req.error.run(this.errorView) }, view: function(vnode) { return [ @@ -132,7 +134,7 @@ m.request({ method: "GET", url: "/api/v1/users/:id", data: {id: 123}, -}).map(function(user) { +}).run(function(user) { console.log(user.id) // logs 123 }) ``` @@ -279,9 +281,9 @@ function getProjectOwnerID(project) { function doStuffWithProjectOwner(projectID) { return findProject(projectID) - .map(getProjectOwnerID) - .map(findUser) - .map(doStuff) + .run(getProjectOwnerID) + .run(findUser) + .run(doStuff) .catch(function(e) { console.log(e) }) @@ -290,7 +292,7 @@ function doStuffWithProjectOwner(projectID) { doStuffWithProjectOwner(123) ``` -Aside from the API signature difference between `fetch` and `m.request`, the only change required to achieve the same functionality was to replace all instances of `then` with `map`. +Aside from the API signature difference between `fetch` and `m.request`, the only change required to achieve the same functionality was to replace all instances of `.then` with `.run`. However, stream have some additional interesting properties. Let's suppose project objects have a `team` property that contains a list of user objects, and we wanted to display a list of designers and a list of developers in a project: @@ -304,11 +306,11 @@ function getTeamUsersByType(team, type) { }) } var project = findProject(123) -var team = project.map(getProjectTeam) -var designers = team.map(function(team) { +var team = project.run(getProjectTeam) +var designers = team.run(function(team) { return getTeamUsersByType(team, "designer") }) -var developers = team.map(function(team) { +var developers = team.run(function(team) { return getTeamUsersByType(team, "developer") }) ``` @@ -318,7 +320,7 @@ Now let's suppose that the team changed for the project and we need to fetch the Fortunately, `project` is a stream, and `team`, `designers` and `developers` are streams derived from `project`. So to update the state of all these streams, we only need to do this: ```javascript -findProject(123).map(project) +findProject(123).run(project) ``` Doing so updates all the streams, and therefore there's no need to place the filtering code in the view, where the filtering code would recompute the same thing on every render. diff --git a/util/stream.js b/util/stream.js index 07a6480e..1897f72b 100644 --- a/util/stream.js +++ b/util/stream.js @@ -16,8 +16,8 @@ function initStream(stream, args) { stream.constructor = createStream stream._state = {id: guid++, value: undefined, error: undefined, state: 0, derive: undefined, recover: undefined, deps: {}, parents: [], errorStream: undefined, endStream: undefined} stream.map = map, stream.ap = ap, stream.of = createStream - stream.valueOf = valueOf - stream.catch = doCatch + stream.valueOf = valueOf, stream.toJSON = toJSON + stream.run = run, stream.catch = doCatch Object.defineProperties(stream, { error: {get: function() { @@ -46,15 +46,18 @@ function initStream(stream, args) { }) } function updateStream(stream, value, error) { - if (!absorbStream(stream, value, false) && !absorbStream(stream, error, true)) { - updateState(stream, value, error) - for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) - finalize(stream) - } + updateState(stream, value, error) + for (var id in stream._state.deps) updateDependency(stream._state.deps[id], false) + finalize(stream) } function updateState(stream, value, error) { + error = unwrapError(value, error) if (error !== undefined && typeof stream._state.recover === "function") { - try {updateValues(stream, stream._state.recover(), undefined)} + try { + var recovered = stream._state.recover() + if (recovered === HALT) return + updateValues(stream, recovered, undefined) + } catch (e) {updateValues(stream, undefined, e)} } else updateValues(stream, value, error) @@ -73,7 +76,8 @@ function updateDependency(stream, mustSync) { else { try { var value = state.derive() - if (!absorbStream(stream, value)) updateState(stream, value, undefined) + if (value === HALT) return + updateState(stream, value, undefined) } catch (e) { updateState(stream, undefined, e) @@ -81,30 +85,29 @@ function updateDependency(stream, mustSync) { } } } -function absorbStream(stream, value, isError) { +function unwrapError(value, error) { if (value != null && value.constructor === createStream) { - if (value._state.state === 2) { - stream.end(true) - stream(value()) - } - else if (value._state.error) stream.error(value.error()) - else if (value._state.state === 0) return true - else if (!isError) stream(value()) - else stream.error(value()) - return true + if (value._state.error !== undefined) error = value._state.error + else error = unwrapError(value._state.value, value._state.error) } - return false + return error } function finalize(stream) { stream._state.changed = false for (var id in stream._state.deps) stream._state.deps[id]._state.changed = false } +function run(fn) { + var self = createStream(), stream = this + return initDependency(self, [stream], function() { + return absorb(self, fn(stream())) + }, undefined) +} function doCatch(fn) { - var stream = this + var self = createStream(), stream = this var derive = function() {return stream._state.value} - var recover = function() {return fn(stream._state.error)} - return initDependency(createStream(), [stream], derive, recover) + var recover = function() {return absorb(self, fn(stream._state.error))} + return initDependency(self, [stream], derive, recover) } function combine(fn, streams) { return initDependency(createStream(), streams, function() { @@ -113,6 +116,16 @@ function combine(fn, streams) { return fn.apply(this, streams.concat([streams.filter(changed)])) }, undefined) } +function absorb(stream, value) { + if (value != null && value.constructor === createStream) { + value.error.map(stream.error) + value.map(stream) + if (value._state.state === 0) return HALT + if (value._state.error) throw value._state.error + value = value._state.value + } + return value +} function initDependency(dep, streams, derive, recover) { var state = dep._state @@ -148,6 +161,7 @@ function unregisterStream(stream) { function map(fn) {return combine(function(stream) {return fn(stream())}, [this])} function ap(stream) {return combine(function(s1, s2) {return s1()(s2())}, [this, stream])} function valueOf() {return this._state.value} +function toJSON() {return JSON.stringify(this._state.value)} function active(stream) {return stream._state.state === 1} function changed(stream) {return stream._state.changed} @@ -160,4 +174,4 @@ function reject(e) { return stream } -module.exports = {stream: createStream, combine: combine, reject: reject} +module.exports = {stream: createStream, combine: combine, reject: reject, HALT: HALT} diff --git a/util/tests/test-stream.js b/util/tests/test-stream.js index 1741fe3a..fabbf771 100644 --- a/util/tests/test-stream.js +++ b/util/tests/test-stream.js @@ -26,6 +26,11 @@ o.spec("stream", function() { o(stream()).equals(undefined) }) + o("can be stream of streams", function() { + var stream = Stream.stream(Stream.stream(1)) + + o(stream()()).equals(1) + }) }) o.spec("combine", function() { o("transforms value", function() { @@ -131,36 +136,34 @@ o.spec("stream", function() { o(b()).equals(undefined) }) - o("combine absorbs streams", function() { + o("combine can return stream", function() { var a = Stream.stream(1) var b = Stream.combine(function(a) { return Stream.stream(2) }, [a]) - o(b()).equals(2) + o(b()()).equals(2) }) - o("combine absorbs errored streams", function() { - var a = Stream.stream(1) - var b = Stream.combine(function(a) { - return Stream.reject(new Error("error")) - }, [a]) - - o(b()).equals(undefined) - o(b.error().message).equals("error") - }) - o("combine absorbs ready streams", function() { - var count = 0 + o("combine can return pending stream", function() { var a = Stream.stream(1) var b = Stream.combine(function(a) { return Stream.stream() }, [a]) - var c = Stream.combine(function(b) { - count++ - return 2 - }, [b]) - o(c()).equals(undefined) - o(count).equals(0) + o(b()()).equals(undefined) + }) + o("combine can halt", function() { + var count = 0 + var a = Stream.stream(1) + var b = Stream.combine(function(a) { + return Stream.HALT + }, [a]) + .map(function() { + count++ + return 1 + }) + + o(b()).equals(undefined) }) }) o.spec("end", function() { @@ -297,7 +300,8 @@ o.spec("stream", function() { var stream = Stream.stream(1) var mappedFromError = stream.error.map(function(value) { return "from" + value.message - }).map(function(value) { + }) + .map(function(value) { return "a" + value }) @@ -328,13 +332,35 @@ o.spec("stream", function() { o(downstream.error().message).equals("b") o(count).equals(0) }) - o("error.map absorbs streams", function() { + o("error can halt", function() { + var count = 0 + var stream = Stream.reject(1).error.map(function() { + return Stream.HALT + }) + .map(function() { + count++ + return 1 + }) + + o(stream()).equals(undefined) + o(count).equals(0) + }) + o("error.map can return streams", function() { var stream = Stream.reject(new Error("error")) var error = stream.error.map(function(value) { return Stream.stream(1) }) - o(error()).equals(1) + o(error()()).equals(1) + }) + o("combined stream of two errored streams adopts error from first", function() { + var a = Stream.stream(1) + var b = Stream.combine(function(a) {throw new Error("error from b")}, [a]) + var c = Stream.combine(function(a) {throw new Error("error from c")}, [a]) + var d = Stream.combine(function(a, b) {return 2}, [a, b]) + + o(d()).equals(undefined) + o(d.error().message).equals("error from b") }) }) o.spec("reject", function() { @@ -384,13 +410,158 @@ o.spec("stream", function() { o(count).equals(0) }) }) + o.spec("run", function() { + o("works", function() { + var stream = Stream.stream() + var doubled = stream.run(function(value) {return value * 2}) + + stream(3) + + o(doubled()).equals(6) + }) + o("works with default value", function() { + var stream = Stream.stream(3) + var doubled = stream.run(function(value) {return value * 2}) + + o(doubled()).equals(6) + }) + o("works with undefined value", function() { + var stream = Stream.stream() + var mapped = stream.run(function(value) {return String(value)}) + + stream(undefined) + + o(mapped()).equals("undefined") + }) + o("works with default undefined value", function() { + var stream = Stream.stream(undefined) + var mapped = stream.run(function(value) {return String(value)}) + + o(mapped()).equals("undefined") + }) + o("works with stream that throws", function() { + var count = 0 + var stream = Stream.stream(undefined) + var errored = stream.run(function(value) {throw new Error("error")}) + var mapped = errored.map(function(value) { + count++ + return value + }) + + o(errored()).equals(undefined) + o(errored.error().message).equals("error") + o(mapped()).equals(undefined) + o(mapped.error().message).equals("error") + o(count).equals(0) + }) + o("works with pending stream", function() { + var count = 0 + var stream = Stream.stream(undefined) + var absorber = stream.run(function(value) {return Stream.stream()}) + var mapped = absorber.map(function(value) { + count++ + return value + }) + + o(mapped()).equals(undefined) + o(count).equals(0) + }) + o("works with active stream", function() { + var stream = Stream.stream(undefined) + var mapped = stream.run(function(value) {return Stream.stream(1)}) + + o(mapped()).equals(1) + }) + o("works with errored stream", function() { + var stream = Stream.stream(undefined) + var mapped = stream.run(function(value) {return Stream.reject(new Error("error"))}) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("error") + }) + o("works with ended stream", function() { + var stream = Stream.stream(1) + var mapped = stream.run(function(value) { + var ended = Stream.stream(2) + ended.end(true) + return ended + }) + + stream(3) + + o(mapped()).equals(2) + }) + o("works when active stream updates", function() { + var stream = Stream.stream(undefined) + var absorbed = Stream.stream(1) + var mapped = stream.run(function(value) {return absorbed}) + + absorbed(2) + + o(mapped()).equals(2) + + absorbed(3) + + o(mapped()).equals(3) + }) + o("works when updating stream to errored state", function() { + var stream = Stream.stream(undefined) + var absorbed = Stream.stream(1) + var mapped = stream.run(function(value) {return absorbed}) + + absorbed.error(new Error("error")) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("error") + + absorbed.error(new Error("another error")) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("another error") + }) + o("works when pending stream updates", function() { + var stream = Stream.stream(undefined) + var absorbed = Stream.stream() + var mapped = stream.run(function(value) {return absorbed}) + + absorbed(2) + + o(mapped()).equals(2) + }) + o("works when updating pending stream to errored state", function() { + var stream = Stream.stream(undefined) + var absorbed = Stream.stream() + var mapped = stream.run(function(value) {return absorbed}) + + absorbed.error(new Error("error")) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("error") + }) + o("works when updating stream to active state", function() { + var stream = Stream.stream(undefined) + var absorbed = Stream.stream(1) + var mapped = stream.run(function(value) {return absorbed}) + + absorbed.error(new Error("error")) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("error") + + absorbed(2) + + o(mapped()).equals(2) + o(mapped.error()).equals(undefined) + }) + }) o.spec("catch", function() { o("catch works from reject", function() { var count = 0 var stream = Stream.reject(new Error("error")).catch(function(e) { count++ return "no" + e.message - }).map(function(value) { + }) + .map(function(value) { return value + "mapped" }) @@ -403,7 +574,8 @@ o.spec("stream", function() { var stream = Stream.combine(function() {throw new Error("error")}, [Stream.stream(1)]).catch(function(e) { count++ return "no" + e.message - }).map(function(value) { + }) + .map(function(value) { return value + "mapped" }) @@ -417,7 +589,8 @@ o.spec("stream", function() { var handled = stream.map(function(value) {return value + value}).catch(function(e) { count++ return "no" + e.message - }).map(function(value) { + }) + .map(function(value) { return value + "mapped" }) @@ -432,7 +605,8 @@ o.spec("stream", function() { var stream = Stream.stream("a").map(function(value) {return value + value}).catch(function(e) { count++ return "no" + e.message - }).map(function(value) { + }) + .map(function(value) { return value + "mapped" }) @@ -457,6 +631,39 @@ o.spec("stream", function() { o(stream()).equals("undefined") o(stream.error()).equals(undefined) }) + o("catch absorbs pending stream", function() { + var count = 0 + var stream = Stream.stream() + var mapped = Stream.reject(new Error("b")).catch(function(e) { + return stream + }) + .map(function(value) { + count++ + return String(value) + }) + + o(mapped()).equals(undefined) + o(count).equals(0) + }) + o("catch absorbs active stream", function() { + var stream = Stream.stream(1) + var mapped = Stream.reject(new Error("b")).catch(function(e) { + return stream + }) + .map(function(value) {return String(value)}) + + o(mapped()).equals("1") + }) + o("catch absorbs errored stream", function() { + var stream = Stream.reject(new Error("a")) + var mapped = Stream.reject(new Error("b")).catch(function(e) { + return stream + }) + .map(function(value) {return String(value)}) + + o(mapped()).equals(undefined) + o(mapped.error().message).equals("a") + }) o("catch does not prevent sibling error propagation", function() { var a = Stream.reject(new Error("a")) var b = a.map(function(value) {return value + "b"}).catch(function(e) {}) @@ -466,19 +673,62 @@ o.spec("stream", function() { o(d()).equals(undefined) o(d.error().message).equals("a") }) - o("catches absorbed rejected stream", function() { + o("catches wrapped rejected stream", function() { var caught var stream = Stream.stream(1).map(function() { return Stream.reject(new Error("error")) - }).catch(function(value) { + }) + .catch(function(value) { caught = value return "no" + value.message - }).map(function(value) { + }) + .map(function(value) { return value + "mapped" }) o(stream()).equals("noerrormapped") }) + o("catches nested wrapped rejected stream", function() { + var caught + var stream = Stream.stream(1).map(function() { + return Stream.stream(2).map(function() { + return Stream.reject(new Error("error")) + }) + }) + .catch(function(value) { + caught = value + return "no" + value.message + }) + .map(function(value) { + return value + "mapped" + }) + + o(stream()).equals("noerrormapped") + }) + }) + o.spec("valueOf", function() { + o("works", function() { + o(Stream.stream(1).valueOf()).equals(1) + o(Stream.stream("a").valueOf()).equals("a") + o(Stream.stream(true).valueOf()).equals(true) + o(Stream.stream(null).valueOf()).equals(null) + o(Stream.stream(undefined).valueOf()).equals(undefined) + o(Stream.stream({a: 1}).valueOf()).deepEquals({a: 1}) + o(Stream.stream([1, 2, 3]).valueOf()).deepEquals([1, 2, 3]) + o(Stream.stream().valueOf()).equals(undefined) + }) + }) + o.spec("toJSON", function() { + o("works", function() { + o(Stream.stream(1).toJSON()).equals("1") + o(Stream.stream("a").toJSON()).equals("\"a\"") + o(Stream.stream(true).toJSON()).equals("true") + o(Stream.stream(null).toJSON()).equals("null") + o(Stream.stream(undefined).toJSON()).equals(undefined) + o(Stream.stream({a: 1}).toJSON()).deepEquals("{\"a\":1}") + o(Stream.stream([1, 2, 3]).toJSON()).deepEquals("[1,2,3]") + o(Stream.stream().toJSON()).equals(undefined) + }) }) o.spec("map", function() { o("works", function() { @@ -509,6 +759,12 @@ o.spec("stream", function() { o(mapped()).equals("undefined") }) + o("works with pending stream", function() { + var stream = Stream.stream(undefined) + var mapped = stream.map(function(value) {return Stream.stream()}) + + o(mapped()()).equals(undefined) + }) }) o.spec("ap", function() { o("works", function() { @@ -538,187 +794,6 @@ o.spec("stream", function() { o(applied()).equals("undefineda") }) }) - o.spec("absorbing semantics", function() { - o("absorbs stream", function() { - var stream = Stream.stream(Stream.stream(1)) - var doubled = stream.map(function(value) {return value * 2}) - - o(stream()).equals(1) - o(doubled()).equals(2) - }) - o("absorbs stream on update", function() { - var stream = Stream.stream() - var doubled = stream.map(function(value) {return value * 2}) - - stream(Stream.stream(1)) - - o(stream()).equals(1) - o(doubled()).equals(2) - }) - o("absorbs errored stream", function() { - var stream = Stream.stream(Stream.reject(1)) - var doubled = stream.error.map(function(value) {return value * 2}) - - o(stream()).equals(undefined) - o(stream.error()).equals(1) - o(doubled()).equals(2) - }) - o("absorbs pending stream", function() { - var count = 0 - var stream = Stream.stream(Stream.stream()) - var mapped = stream.map(function() { - count++ - }) - - o(stream()).equals(undefined) - o(count).equals(0) - }) - o("absorbs ended stream", function() { - var count = 0 - var ended = Stream.stream() - ended.end(true) - - var stream = Stream.stream(ended) - var mapped = stream.map(function() { - count++ - }) - - stream(1) - - o(count).equals(0) - }) - o("combine absorbs stream", function() { - var stream = Stream.stream(1) - var combined = Stream.combine(function(stream) { - return Stream.stream(2) - }, [stream]) - var doubled = combined.map(function(value) {return value * 2}) - - o(combined()).equals(2) - o(doubled()).equals(4) - }) - o("combine absorbs errored stream", function() { - var stream = Stream.stream(1) - var combined = Stream.combine(function(stream) { - return Stream.reject(2) - }, [stream]) - var doubled = combined.error.map(function(value) {return value * 2}) - - o(combined()).equals(undefined) - o(combined.error()).equals(2) - o(doubled()).equals(4) - }) - o("combine absorbs pending stream", function() { - var count = 0 - var stream = Stream.stream(1) - var combined = Stream.combine(function(stream) { - return Stream.stream() - }, [stream]) - var mapped = combined.map(function() { - count++ - }) - - o(combined()).equals(undefined) - o(count).equals(0) - }) - o("combine absorbs ended stream", function() { - var count = 0 - var stream = Stream.stream(1) - var combined = Stream.combine(function(stream) { - var ended = Stream.stream(2) - ended.end(true) - return ended - }, [stream]) - var mapped = combined.map(function() { - count++ - }) - - o(combined()).equals(2) - o(count).equals(0) - }) - o("reject absorbs stream", function() { - var stream = Stream.reject(Stream.stream(1)) - var doubled = stream.error.map(function(value) {return value * 2}) - - o(stream.error()).equals(1) - o(doubled()).equals(2) - }) - o("reject absorbs errored stream", function() { - var count = 0 - var stream = Stream.reject(Stream.reject(new Error("error"))) - var mapped = stream.error.map(function() { - count++ - }) - - o(stream.error().message).equals("error") - o(count).equals(1) - }) - o("reject absorbs pending stream", function() { - var count = 0 - var stream = Stream.reject(Stream.stream()) - var mapped = stream.error.map(function() { - count++ - }) - - o(stream.error()).equals(undefined) - o(count).equals(0) - }) - o("reject absorbs ended stream", function() { - var count = 0 - var ended = Stream.stream(1) - ended.end(true) - - var stream = Stream.reject(ended) - var mapped = stream.error.map(function() { - count++ - }) - - o(count).equals(0) - }) - o("ended stream absorbs stream", function() { - var stream = Stream.stream() - stream.end(true) - - var doubled = stream.map(function(value) {return value * 2}) - stream(Stream.stream(1)) - - o(stream()).equals(1) - o(doubled()).equals(undefined) - }) - o("ended stream absorbs pending stream", function() { - var stream = Stream.stream() - stream.end(true) - - var doubled = stream.map(function(value) {return value * 2}) - stream(Stream.stream()) - - o(stream()).equals(undefined) - o(doubled()).equals(undefined) - }) - o("ended stream absorbs errored stream", function() { - var stream = Stream.stream() - stream.end(true) - - var doubled = stream.map(function(value) {return value * 2}) - stream(Stream.reject(new Error("error"))) - - o(stream.error().message).equals("error") - o(doubled()).equals(undefined) - }) - o("ended stream absorbs ended stream", function() { - var stream = Stream.stream(1) - stream.end(true) - - var ended = Stream.stream(2) - ended.end(true) - - var doubled = stream.map(function(value) {return value * 2}) - stream(ended) - - o(stream()).equals(2) - o(doubled()).equals(undefined) - }) - }) o.spec("fantasy-land", function() { o.spec("functor", function() { o("identity", function() {