Ostatnio poruszyłem na blogu temat obsługi strumieni danych za pomocą biblioteki RxJS. To wszystko na prośbę jednego z czytelników bloga, który napisał do mnie czy nie dało by się pokazać połączenia Redux + RxJS w aplikacji React. Oczywiście ochoczo się zgodziłem, jednak stwierdziłem, że zanim przedstawię ten temat, warto byłoby pokazać co to w ogóle jest RxJS.

Poźniej z kolei, kiedy zacząłem research pod dzisiejszy wpis stwierdziłem, że nie omówiłem jeszcze na blogu obsługi danych asynchronicznych w kontekście Redux. Stąd też, na blogu pojawił się ostatnio wpis na ten temat.

I wreszcie teraz, kiedy te dwa wpisy wiszą już na blogu, mogę wreszcie spełnić prośbę czytelnika! W związku z tym dziś przedstawiam Redux + RxJS na podstawie 2 przykładów wykorzystania tego połączenia - zapraszam do lektury!

Wprowadzenie

Zanim przejdę do przedstawienia konkretnych przykładów użycia Redux + RxJS konieczne jest omówienie kilku kwestii. Przede wszystkim: po co właściwie łączyć “reactive extensions” z Reduxem? Postaram się za chwilę odpowiedzieć na to pytanie. Najpierw jednak kilka słów na temat tego, co należy wiedzieć zanim przystąpi się do dalszej lektury…

W swoich dzisiejszych rozważaniach wykorzystam Redux middleware o nazwie redux-observable. Z tego względu fajnie jakbyś był już obeznany z podstawami Reduxa łącznie z tym co to jest middleware. Zresztą ogólnie przyda nam się wiedza, którą przedstawiłem już w poniższych wpisach:

Do powyższych wpisów będę się odwoływać w dalszej części tekstu.

Jeśli chodzi o źródła wiedzy, to polecam obejrzeć fajną prezentację Jaya Phelpsa na konferencji Netflix JavaScript Talks, która wiele wyjaśnia:

Sporo informacji i inspiracji dla tego wpisu zaczerpnąłem właśnie z tego wystąpienia. Poza tym warto przejrzeć dokumentację middleware redux-observable oraz zapoznać się z tym wpisem autorstwa Robina Wierucha.

Po co w ogóle Redux + RxJS?

Pierwszym pytaniem, które musimy sobie zadać jest “po co w ogóle łączyć RxJS z Reduxem”? Przecież, jak już przedstawiłem w poprzednim wpisie, operacje asynchroniczne w oparciu o zwykłe “promisy” działają z Reduxem bardzo dobrze…

No cóż, głównym ograniczeniem “promisów” jest to, że nie można ich anulować. Od czasu do czasu powoduje to problemy. Najbardziej oczywistym jest sytuacja (pokazana dobrze w powyższym filmie), gdy w trakcie pobierania danych zmienimy “route” w naszej aplikacji - rozpoczęte pobieranie nie może zostać anulowane więc cały czas wykonuje się “w tle”, a w tym czasie startuje już kolejne żądanie. A nie daj boże jeśli oba żądania finalnie aktualizują tę samą właściwość w “store”. Problem, często trudny do wychwycenia, gotowy…

Dlatego też idealnie byłoby mieć, w momencie zmiany aktualnego “route”, możliwość anulowania wszystkich rozpoczętych do tej pory żądań. Zresztą t0 samo byłoby przydatne w różnych innych “asynchronicznych” sytuacjach - na przykład szybkie klikanie przycisków, formularze “autocomplete” gdzie strzelamy do serwera po każdym wciśnięciu klawisza przez użytkownika i wiele innych.

Jak się domyślasz, z pomocą przyjdzie nam wykorzystanie RxJS! W sumie… przedstawiłem już pewne jego zalety w moim wcześniejszym wpisie, ale od tej strony na “reactive extensions” jeszcze nie patrzyliśmy…

Przykład #1 - co to jest Epic?

Podstawowym pojęciem wprowadzonym przez bibliotekę redux-observable jest “Epic”. Jeśli zajrzysz do dokumentacji, przeczytasz tam, że Epic jest funkcją, która przyjmuje strumień akcji (Reduxowych), wykonuje na nim odpowiednie operacje (dzięki operatorom RxJS) i na koniec zwraca ten zmodyfikowany strumień akcji. Ten wynikowy strumień jest następnie przechwytywany przez middleware i wszystkie akcje, które zawiera, są natychmiast rozgłaszane.

Myślę, że dla lepszego zrozumienia czym jest Epic, najlepiej będzie omówić przykład dostępny w dokumentacji biblioteki redux-observable. Spójrz na poniższy kawałek kodu:

const PING = 'PING';
const PONG = 'PONG';

const ping = () => ({ type: PING });

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000) // Asynchronously wait 1000ms then continue
    .mapTo({ type: PONG });

const pingReducer = (state = { isPinging: false }, action) => {
  switch (action.type) {
    case PING:
      return { isPinging: true };

    case PONG:
      return { isPinging: false };

    default:
      return state;
  }
};

Omówienie przykładu

Jak widzisz, mamy tutaj zdefiniowane dwie stałe: PING oraz PONG. Jak to zwykle w Reduxie, są to nazwy typów dostępnych akcji. W kolejnej linii widzimy kreator akcji ping(). Zwraca on akcję typu PING.

Dalej mamy nasz Epic (funkcja przypisana do stałej pingEpic). Jak już wspomniałem, jest to funkcja przyjmująca strumień akcji. Strumień ten reprezentowany jest przez parametr funkcji o nazwie action$. Zwróć uwagę na znak dolara na końcu nazwy parametru - jest to konwencja informująca o tym, że zmienna ta zawiera strumień danych.

Spójrz teraz na ciało funkcji anonimowej (strzałkowej) przypisanej do stałej pingEpic. Na początku widzimy wywołanie funkcji operatora ofType():

action$.ofType(PING)

Middleware redux-observable wprowadza dodatkowy typ obserwowalny ActionsObservable, który dziedziczy po standardowej klasie Observable dostępnej w ramach biblioteki RxJS. I właśnie tego typu jest strumień przypisany jest do parametru action$. Klasa ActionsObservable posiada metodę ofType() pozwalającą na filtrowanie akcji po jej typie. Wywołanie metody ofType() z przekazaniem jej parametru PING jest więc tożsame z wywołaniem metody filter z RxJS:

action$.filter(action => action.type === 'PING')

W ten sposób filtrujemy więc wszystkie akcje znajdujące się w strumieniu akcji. Na tak przefiltrowanym strumieniu wywołujemy następnie metodę delay(1000) powodując opóźnienie wywołania kolejnych operacji o 1 sekundę. Na koniec mapujemy obiekt akcji na nowy obiekt o typie PONG.

Na końcu przykładu widać typowy Reduxowy “reducer”. Jak widać dla akcji PING ustawia on wartość właściwości isPinging stanu na true, a dla akcji PONG zmienia tę wartość na false.

Jak to działa?

Wszystko fajnie ale zastanawiasz się pewnie jak to wszystko działa? Spieszę z wyjaśnieniem:

  1. Najpierw rozgłaszana jest akcja PING, na przykład w momencie gdy użytkownik kliknie jakiś guzik: dispatch(ping())
  2. Middleware redux-observable przechwytuje tę akcję, dodaje ją do strumienia i wywołuje funkcję pingEpic
  3. Oprócz tego przekazuje ją dalej, trafia więc ona normalnie do “reducera” i jest w nim obsługiwana w sposób standardowy
  4. W tak zwanym “międzyczasie” wykonywane są operacje w pingEpic, w rezultacie których zwracany jest zmodyfikowany strumień akcji
  5. Wszystkie akcje, które “wypadną” ze strumienia są natychmiast rozgłaszane i w rezultacie również obsługiwane przez “reducer”

Czyli w skrócie, w momencie wciśnięcia guzika wykonywany kod, który w rezultacie tożsamy byłby z poniższym:

dispatch({ type: 'PING' });
// one second delay
dispatch({ type: 'PONG' });

Mówiąc więc obrazowo, redux-observable wyłapuje wszystkie rozgłaszane akcje i tworzy z nich strumień akcji. Epic natomiast pozwala na nasłuchiwanie tego strumienia, wyłapywanie konkretnych akcji i wykonywanie na nich różnych operacji.

Konfiguracja

Skoro wiemy już co to jest Epic, przydałoby się jeszcze zobaczyć jak konfiguruje się middleware redux-observable. Nie jest to nic trudnego:

import { createStore, applyMiddleware } from 'redux';
import { createEpicMiddleware } from 'redux-observable';
import { rootEpic, rootReducer } from './modules/root';

const epicMiddleware = createEpicMiddleware(rootEpic);

const store = createStore(
  rootReducer,
  applyMiddleware(epicMiddleware)
);

Middleware redux-observable to typowe middleware Reduxa. Aby go skonfigurować importujemy funkcję createEpicMiddleware i wywołujemy ją, przekazując jako parametr obiekt rootEpic (o nim za chwilę). Wynik działania tej funkcji przekazujemy standardowo do funkcji createStore, tak jak robimy to z każdym innym middleware.

Jeśli chodzi o obiekt rootEpic, to jest to twór podobny do obiektu rootReducer Reduxa. Do jego utworzenia wykorzystujemy funkcję combineEpics (analogicznie do combineReducers w przypadku “reducera”):

import { combineEpics } from 'redux-observable';
import { combineReducers } from 'redux';
import ping, { pingEpic } from './ping';
import users, { usersEpic } from './users';

export const rootEpic = combineEpics(
  pingEpic,
  usersEpic
);

export const rootReducer = combineReducers({
  ping,
  users
});

To w zasadzie tyle jeśli chodzi o konfigurację - myślę, że jest to dość prosto ogarnięte i każdy sobie z tym poradzi.

Przykład #2 - wywołania AJAX

Kolejny przykład będzie już bardziej życiowy. Jest to sytuacja, z którą stykamy się najczęściej, a więc wywołania asynchroniczne AJAX.

Myślę, że aby zachować ciągłość, skorzystam tutaj z przykładu kodu, którego użyłem w poprzednim wpisie, na temat wywołań asynchronicznych w Redux. Poniżej znajduje się modyfikacja pliku z kreatorami akcji z użyciem RxJS:

import { ajax } from 'rxjs/observable/dom/ajax';

export const GET_DATA_REQUESTED = 'GET_DATA_REQUESTED';
export const GET_DATA_DONE = 'GET_DATA_DONE';
export const GET_DATA_FAILED = 'GET_DATA_FAILED';

export function getDataRequested() {
  return {
    type: 'GET_DATA_REQUESTED'
  };
}

export function getDataDone(data) {
  return {
    type: 'GET_DATA_DONE',
    payload: data
  };
}

export function getDataFailed(error) {
  return {
    type: 'GET_DATA_FAILED',
    payload: error
  };
}

export function getDataEpic(action$) {
  return action$.ofType(GET_DATA_REQUESTED)
    .mergeMap(action =>
      ajax.getJSON('https://api.github.com/users/burczu/repos')
        .map(response => getDataDone(response))
        .catch(error => getDataFailed(error))
    );
}

Objaśnienie kodu

Pierwsze co rzuca się w oczy to to, że nie importuję już tutaj metody fetch z biblioteki isomorphic-fetch. Zamiast tego korzystam z metody ajax wchodzącej w skład biblioteki RxJS. Dzięki niej możliwe jest wysyłanie żądań i odbieranie strumieni a nie “promisów”. Tutaj jeszcze mała uwaga: biblioteka redux-observable nie dostarcza nam automatycznie rozszerzeń RxJS. Musimy je sami zaimportować je w pliku index.js lub importować konkretne jej operatory w plikach, w których akurat ich potrzebujemy.

Biblioteka redux-observable nie dostarcza nam automatycznie rozszerzeń RxJS. Musimy je sami zainstalować (z npm), a następnie zaimportować je w pliku index.js lub importować konkretne jej metody w plikach, w których akurat ich potrzebujemy.

Stałe zawierające nazwy typów akcji oraz kreatory akcji nie zmieniły się. Zamiast jednak funkcji getData, która rozgłaszała inne akcje w zależności od wyników operacji asynchronicznej, mamy teraz funkcję getDataEpic. Nasłuchuje ona strumień w oczekiwaniu na akcję GET_DATA_REQUESTED.

Następnie wywoływana jest metoda mergeMap, która “mapuje każdą wartość strumienia wejściowego na obiekt Observable, a następnie spłaszcza ją za pomocą metody mergeAll” (tak, wiele metod w RxJS sprawia, że chce się płakać… tutaj więcej o mergeMap). Generalnie chodzi o to, że ajax.getJSON również zwraca strumień, i trzeba go odpowiednio zmapować i zmergować ze strumieniem wejściowym tak aby uzyskać strumień wyjściowy.

Jeśli chodzi o ajax.getJSON to tak jak wspomniałem zwraca on strumień, na którym od razu możemy pracować. Dzięki możliwości wywołania na tym strumieniu metody map możemy przekierować dane do odpowiedniego kreatora akcji (to samo dla błędu - metoda catch). Tak więc w zależności od tego czy pobieranie danych z API się powiedzie czy nie, jako wynik Epica zwracana jest akcja GET_DATA_DONE lub GET_DATA_FAILED, która jest oczywiście od razu rozgłaszana.

Poza zmianami konfiguracyjnymi nie ma potrzeby niczego zmieniać w “reducerze”. Kod tego przykładu jest jak zwykle do pobrania z mojego GitHuba - zachęcam do tego, bo temat trzeba trochę “pomacać” żeby poczuć się w nim w miarę pewnie.

Anulowanie żądania

Na początku napisałem, że główną wadę “promisów” jest brak możliwości ich anulowania. Wykorzystanie RxJS miało być remedium na ten problem… Wypadałoby w tej sytuacji pokazać, jak można obsłużyć anulowanie przetwarzania strumienia RxJS. Poniżej drobna modyfikacja przedstawionego powyżej Epica:

export function getDataEpic(action$) {
  return action$.ofType(GET_DATA_REQUESTED)
    .mergeMap(action =>
      ajax.getJSON('https://api.github.com/users/burczu/repos')
        .map(response => getDataDone(response))
        .catch(error => getDataFailed(error))
        .takeUntil(action$.ofType(GET_DATA_CANCEL))
    );
}

Jak widzisz, doszło nam tutaj wywołanie metody takeUntil(), które służy do przerywania strumienia. Jako parametr przekazujemy jej wywołanie action$.ofType(GET_DATA_CANCEL). W ten sposób, jeśli akcja GET_DATA_CANCEL zostanie rozgłoszona, strumień zwracany przez ajax.getJSON() zostanie przerwany. Nie wpłynie to oczywiście na strumień całego Epica.

Redux + RxJS - podsumowanie

Jak widzisz, tematyka związana z RxJS jest bardzo przydatna i chyba ciekawa. Jest to jednak zagadnienie dość trudne do nauki… o przekazywaniu wiedzy innym nie wspominając. Bardzo dobrze obrazuje to poniższy tweet:

Mam nadzieję, że udało mi się w miarę dobrze wyjaśnić połączenie Redux + RxJS. Przyznam szczerze, że ciężko się pisało ten wpis i nadal nie jestem pewny czy wszystko co napisałem ma sens. Na pewno starałem się “zrobić moje najlepsze” (ang. “do my best”)!

Jeśli masz pytania, wątpliwości, skargi lub zażalenia - pisz w komentarzach. Postaramy się wspólnie rozwiać wątpliwości!