Вход

Просмотр полной версии : Как трансформировать Rx Subject?


Shitbox2
18.04.2018, 14:09
Пример кода:
const stream = new Subject()

stream.subscribe(event => console.log)
stream.next({type: 'some', payload: 'thing1'})
// {type: 'some', payload: 'thing1'}

const someStream = fn('some', stream)

someStream.subscribe(event => console.log)
someStream.next('thing2')
// 'thing1'
// 'thing2'

Нужно реализовать функцию fn, которая фильтрует и мапит исходный поток.

Если бы она возвращала Observable, то всё было бы просто:
fn(type, baseStream) {
return baseStream.filter(event => type === event.type).pluck('payload');
}

А так нужно делать или что-то подобное + кучу перекрестных проверок, чтобы не было зацикливания, или как-то переопределять методы next и subscribe...
fn(type, baseStream) {
const subject = new Subject();

baseStream.subscribe(action => {
if (type === action.type) {
subject.next(action.payload);
}
});

subject.subscribe(payload => {
baseStream.next({type: type, payload: payload});
});

return subject;
}

В общем, пока нет мыслей, как сделать это красиво. Ваши варианты?

Shitbox2
18.04.2018, 17:33
Такая штука работает, правда, не понятно, почему она внутри себя не зацикливается
function fn(type, baseStream) {
const observer = {
next: payload => baseStream.next({type: type, payload: payload}),
error: error => baseStream.error({type: type, error: error}),
complete: () => baseStream.complete(),
};
const observable = baseStream.filter(event => type === event.type).pluck('payload');

return Subject.create(observer, observable);
}