Marble testing redux-observable epics

One great benefit of defining the sequence of redux actions as an observable is the ability to mock the scheduler to unit-test the asynchronous behavior of the application, redux-observable epics in particular. One can even take advantage of the marble testing approach of RxJS to make for particularly precise descriptions of the action stimuli and the expected outcome.

Epic definition

We can illustrate this with the classical example of debouncing search input. To setup such a behavior with redux-observable, we will define two epics: a first epic concerned with debouncing search input and issuing search actions, and a second epic in charge of carrying out the actual search.

// This epic drives the as-you-type experience, throttling the input every 100ms to avoid overloading the server
export const searchOnInputChangedEpic =
    (action$: ActionsObservable<Actions>, store: MiddlewareAPI<SearchState>, services: IServices): Rx.Observable<Actions> => {
        return action$
            .actionsOfType<InputChangedAction>(InputChangedActionType)
            .map(a => a.payload)
            .debounceTime(100, services.scheduler)  
            .distinctUntilChanged()
            .map(search);
    };

// This second epic drives the search, submitting searches to the service, dealing with responses and cancelling in-flight searches if no longer relevant
export const searchEpic =
    (action$: ActionsObservable<Actions>,
     store: MiddlewareAPI<SearchState>, services: IServices): Rx.Observable<Actions> => {
        return action$
            .actionsOfType<SearchAction>(SearchActionType)
            .map(a => a.payload)
            .switchMap(input =>
                input.length > 0 ?
                    services.search.doSearch(input)
                        .map(p => new ContentResult(p))
                        .catch((error: Error) => 
                             Rx.Observable.of<SearchResult>(new ErrorResult(error.message))) 
                    : Rx.Observable.of<SearchResult>(EmptyResult.Instance))
            .map(searchFulfilled);
    };

Let’s now look more in detail at each component of these epics, starting with the signatures:

Epic Signature

const searchOnInputChangedEpic = (action$: ActionsObservable<Actions>, store: MiddlewareAPI<SearchState>, services: IServices): Rx.Observable<Actions>

You might have noticed that we are “injecting” dependencies through an IServices interface, which provides the services required by in the epic logic, as well as the Rx scheduler. Epic middleware indeed allows passing an object as the third parameter of an epic. Here’s how this interface is defined:

import { ISearchService } from "services/search";
import { IScheduler } from "rxjs/Scheduler";

export interface IServices {
    readonly search: ISearchService;
    readonly scheduler: IScheduler;
}

The actual IServices instance is configured at the middleware creation, like so:

const services = { 
   search: new SomeSearchService(),
   scheduler:  Rx.Scheduler.async // setup the default rx scheduler for epics as the async scheduler, to be overriden in tests
};
const epicMiddleware = createEpicMiddleware(rootEpic, { dependencies: services });

When unit testing, this interface is mocked with e.g. a mock search service and Rx’s TestScheduler instead of the standard one.

Action selection

The following lines only lets input_changed and search actions filter through, respectively:

action$.actionsOfType<InputChangedAction>(InputChangedActionType)
action$.actionsOfType<SearchAction>(SearchActionType)

actionsOfType<T>() is a helper operator which both calls redux-observable built-in ofType() operator and typecasts the action to the proper type:

export function actionsOfType<A>(this: ActionsObservable<any>, type: string): Observable<A> {
    return this.ofType(type).map(t => (t as A));
}

// Add the operator to the Observable prototype:
ActionsObservable.prototype.actionsOfType = actionsOfType;

Input processing

The next part of the input_changed epic “pipeline” performs input throttling, and only emits searches if the input actually changed between debounces:

.map(a => a.payload)
   .debounceTime(100, services.scheduler)  // 100ms throttling delay - important: we specify the scheduler here, to allow for testing
   .distinctUntilChanged()

Note that we are passing our injected scheduler to the debounceTime operator. This is essential, since it will allow us to unit test debouncing behavior by injecting a TestScheduler instance rather than the standard asynchronous scheduler.

The second searchEpic launches a search query for each search action emitted by the first epic and deals with the response, also cancelling any prior search that would still be in-flight (thanks to the switchMap operator).

.switchMap(input =>
                input.length > 0 ?
                    services.search.doSearch(input)  // delegating to the injected search service interface here the actual request emission
                        .map(p => new ContentResult(p))
                        .catch((error: Error) => Rx.Observable.of<SearchResult>(new ErrorResult(error.message))) 
                    : Rx.Observable.of<SearchResult>(EmptyResult.Instance))

It is important to note here that we are not emitting requests directly, but rather using our ISearchService abstraction. This will make unit testing easier, since epic tests will not have to deal will the additional concern of mocking http requests (e.g. via libraries such as nock).

Action creators

The final line .map(searchFulfilled); concludes the epic, by mapping the search result with the searchFulfilled action creator function, which is defined like so:

export const searchFulfilled = (response: SearchResult): SearchFulfilledAction => createAction(SearchFulfilledActionType, response);

You might have noticed that actions are typed. SearchFulfilledAction is defined in the following way:

export type SearchFulfilledAction = TypedAction<typeof SearchFulfilledActionType, SearchResult>;

The TypedAction<T, P> is a helper type, with the accompanying createAction function

export interface TypedAction<T, P> extends Action {
    readonly type: T;
    readonly payload: P;
    error?: boolean;
    meta?: {};
}

export function createAction<T extends string, P>(type: T, payload: P): TypedAction<T, P> {
    return ({type, payload});
}

You might have noticed also that our epic signature is typed with an Actions type. This is a discriminated union allowing for a common type for all actions:

export type Actions =
    InputChangedAction
    | SearchAction
    | SearchFulfilledAction
    | ...

Unit testing

With our epics now defined, let’s proceed to to unit testing. We will be taking advantage of Rx’s TestScheduler as well as a mocking library to mock our services (in our case we are using TypeMoq).

Testing debouncing behavior

Here’s the test for searchOnInputChangedEpic:

 test("searchOnInputChangedEpic debounces input", () => {
        // Create test scheduler
        const testScheduler = createTestScheduler();

        // Mock services instance, passing in our test scheduler
        const servicesMock = TypeMoq.Mock.ofType<IServices>();
        servicesMock.setup(m => m.scheduler).returns(() => testScheduler);
        const services = servicesMock.object;

        // Define marbles
        const inputActions = {
            a: inputChanged("r"),
            b: inputChanged("rx"),
            c: inputChanged("rxjs")
        };
        const outputActions = {
            b: search("rx"),
            c: search("rxjs")
        };
        const inputMarble =     "-ab-----------c";
        const outputMarble =    "------------b-----------c";

        // Mock input actions stream
        const action$ = createTestAction$FromMarbles<Actions>(testScheduler, inputMarble, inputActions);

        // Apply epic on actions observable
        const outputAction$ = searchOnInputChangedEpic(action$, store, services);

        // Assert on the resulting actions observable
        testScheduler.expectObservable(outputAction$).toBe(outputMarble, outputActions);

        // Run test
        testScheduler.flush();
    });

Let’s break this down into its parts:

Test scheduler

The first line

const testScheduler = createTestScheduler();

creates Rx TestScheduler instance. It is wrapped in a helper function, since this is used recurrently in all the tests. Here’s its definition:

import * as Rx from "rxjs";
export const createTestScheduler = () => new Rx.TestScheduler((actual, expected) => expect(actual).toEqual(expected));

The TestScheduler supports writing “Marble Tests”, as we will see below. It allows specifying the deep equality used for comparing values in the marbles: we are using the standard equality in our case.

IServices mocking

With the following block

const servicesMock = TypeMoq.Mock.ofType<IServices>();
servicesMock.setup(m => m.scheduler).returns(() => testScheduler);
const services = servicesMock.object;

we are mocking the IServices instance which our epic relies upon. We first grab a Mock instance typed with IServices from TypeMoq, which allows setting up the behavior for the mock with a fluent API. In our case, we set it up so that a call to the scheduler property returns our testScheduler (the search property is left out for now, since this test will not be looking at the search proper). Finally we obtain the actual IServices instance by means of TypeMoq’s `.object’ operator.

Input and output marbles

Here’s the interesting part of the test and where the magic happens:

const inputActions = {
    a: inputChanged("r"),
    b: inputChanged("rx"),
    c: inputChanged("rxjs")
};
const outputActions = {
    b: search("rx"),
    c: search("rxjs")
};
const inputMarble =     "-ab-----------c";
const outputMarble =    "------------b-----------c";

inputActions is an object defining what actions we want to stimulate the epic with, and inputMarble indicates when and in what order these actions will be emitted. Similarly, outputActions and outputMarble represent the expected output sequence. This makes for a visual representation of what is the expected behavior. In these diagrams, a slot (- or an actual value, e.g. a) represents a unit of time of 10ms. For instance, -ab represents an initial delay of 10ms, then a rapid succession of r and rx input strings (within a 20ms window).

The expected outcome is that r is debounced, and a search action with payload rx at exactly 130ms (since we debounce with a 100ms delay). Marbles support more semantics than the simple ones illustrated here, for more information check out “Marble Tests”.

Note that such tests can lose clarity for real-life delays (typically 200ms for debouncing, or seconds for timeouts). In such cases, instead of including dashes for each 10ms unit, one can use the following helper method:

export function frames(n: number, unit: string = "-"): string {
    return n === 1 ? unit : unit + frames(n - 1, unit);
}

The marbles now become:

const inputMarble =  "-ab" + frames(10) + "-c";
const outputMarble = "--" + frames(10) + "b" + frames(10) + "-c";

which also have the advantage of outlining more clearly the action of the debouncing delay.

Action stream mocking

The following line in the test generates the action stream from the inputMarble definition:

const action$ = createTestAction$FromMarbles<Actions>(testScheduler, inputMarble, inputActions);

This makes use of a createTestAction$FromMarbles<T> helper method which wraps this logic, common to all tests:

import { TypedAction } from "./typed-action";
import { ActionsObservable } from "redux-observable";
import { TestScheduler } from "rxjs/Rx";
import * as Rx from "rxjs";

export function createTestAction$FromMarbles<A>(testScheduler: TestScheduler, marbles: string, values?: any) {
    return new ActionsObservable<A>(testScheduler.createHotObservable(marbles, values));
}

Epic actuation

Epics are actually functions, and we now have the required arguments to invoke it and grab the resulting output actions stream:

const outputAction$ = searchOnInputChangedEpic(action$, store, services);

Assertion

We can finally assert on the expected marble, like so:

// Assert on the resulting actions observable
testScheduler.expectObservable(outputAction$).toBe(outputMarble, outputActions);
// Run test
testScheduler.flush();

Flushing the scheduler actually performs the test, in some sense virtually incrementing the time and carrying out scheduled tasks in order. That allows for defining tests of asynchronous behavior potentially spanning a very long time that run instantaneously!

Testing search behavior

To test searchEpic, we will define two unit tests, one looking at behavior in the case of a actual result, and the other looking at error handling.

Testing search with result

Here’s the full test:

test("searchEpic with content", (done) => {
    const searchInput = "rxjs";
    const searchResult = "Reactive programming";

    // Mock services 
    const servicesMock = TypeMoq.Mock.ofType<IServices>();
    const searchMock = TypeMoq.Mock.ofType<ISearchService>();
    searchMock.setup(s => s.doSearch(searchInput)).returns(() => Rx.Observable.of(searchResult));
    servicesMock.setup(s => s.search).returns(() => searchMock.object);
    const services = servicesMock.object;

     // Mock input actions stream
    const action$ = createTestAction$<Actions>(search(searchInput));

    // Apply epic on actions observable
    const outputAction$ = searchEpic(action$, store, services);

    // Assert on the resulting actions observable   
    outputAction$.subscribe(
        action => expect(action).toEqual(searchFulfilled(new ContentResult(searchResult))), 
        error => fail(error), 
        done
    );
});

We are not using marbles in this case as this test is not concerned with the asynchronicity but rather the search operation. We are thus not configuring the scheduler, since the epic doesn’t involve any time-related Rx operator. Instead, we are now mocking the search service. We are using TypeMock to setup the service so that when receiving a call to doSearch with the input rxjs, we return an observable simply containing the “Reactive programming” string as unique immediate value:

searchMock.setup(m => m.doSearch(searchInput)).returns(() => Rx.Observable.of(searchResult));

Asserting on the result now involves subscribing the the output actions stream: we expect a searchFullfilled action with a ContentResult instance wrapping the “Reactive programming” string. Occurrence of an error in this context indicates a test failure:

outputAction$.subscribe(
    action => expect(action).toEqual(searchFulfilled(new ContentResult(searchResult))), 
    error => fail(error), 
    done
);

As mentioned previously, the advantage of mocking the search service is that this test does not need to burden itself with additional aspects of the actual http requests carrying out the search. This is a separate concern which is covered separately in the unit tests of the search service itself (which might involve usage of more intrusive mocking mechanisms such as nock)

Testing search with error

Here’s the equivalent test covering the error scenario:

test("searchEpic with error", (done) => {
    const searchInput = "rxjs";
    const searchError = "Some error";

    // Mock services 
    const servicesMock = TypeMoq.Mock.ofType<IServices>();
    const searchMock = TypeMoq.Mock.ofType<ISearchService>();
    searchMock.setup(s => s.doSearch(searchInput)).returns(() => Rx.Observable.throw(new Error(searchError)));
    searchMock.setup(s => s.search).returns(() => searchMock.object);
    const services = servicesMock.object;
     // Mock input actions stream
    const action$ = createTestAction$<Actions>(search(searchInput));
    // Apply epic on actions observable
    const outputAction$ = searchEpic(action$, store, services);
    // Assert on the resulting actions observable   
    outputAction$.subscribe(
        action => expect(action).toEqual(searchFulfilled(new ErrorResult(searchError))), 
        error => fail(error),
        done
    );
});

Note that we are now using the throw operator to fake doSearch() throwing an exception. Our expectation in this case is that the search epic itself does not terminate with an exception, but rather wraps the error into an ErrorResult instance which can be further dealt with accordingly by the app. Terminating the epic would actually be fatal, since it would destroy the search pipeline - not the behavior we want in this case.

Conclusion

We have demonstrated how to take advantage of RxJS’s marble tests to unit test asynchronous behavior of redux-observable epics. In the course of this, we have introduced various helpers, including some allowing for a fully typed experience when defining the epic and tests. We have also demonstrated how to limit epic tests concern to actual epic behavior, and not extraneous aspects such as http request emission, by introducing interfaces for the epic dependencies and mocking them with the TypeMock library.