Такая штука работает, правда, не понятно, почему она внутри себя не зацикливается
function fn(type, baseStream) {
const observer = {
next: payload => baseStream.next({type: type, payload: payload}),
error: error => baseStream.error({type: type, error: error}),
complete: () => baseStream.complete(),
};
const observable = baseStream.filter(event => type === event.type).pluck('payload');
return Subject.create(observer, observable);
}