今天来实现 xxxMap 三剑客中的最后一位 concatMap 。我们知道这三个操作符都是与内部订阅有关的,mergeMap 的特点是对每个上游事件都作出响应,switchMap 的特点是先检查有没有发生过内部订阅,发生过,先取消,再做订阅。concatMap 的特点就像 concat 操作符一样,关注的是处理事件的顺序性。也就是说,当上游有值到来后,concatMap 会把值放到缓存中,等待上次的订阅发出 complete 事件后才会把新值从缓存中取出来做后续操作。
同样的,我们先来看看 concatMap 的会带来什么样的效果:
fromEvent(document, "click")
.pipe(
scan(i => i + 1, 0),
concatMap(value => of(value).pipe(delay(500)))
)
.subscribe(value => {
console.log(value);
});
无论我们在页面上点击速度有多快,数字的输出一定是按照 500 毫秒一个来的,大家可以对比一下 mergeMap,会发现其中的区别。
下面就来实现 myConcatMap。还是老规矩,拷贝之前的代码:
class MyConcatMapSubscriber extends Subscriber {
constructor(sub, fn) {
super(sub);
this.fn = fn;
this.innerSubscription = null;
}
_next(value) {
const innerObservable$ = this.fn(value);
this.innerSubscription = innerObservable$.subscribe(value => {
this.destination.next(value);
});
}
}
const myConcatMap = fn => source => {
return source.lift({
call(sub, source) {
source.subscribe(new MyConcatMapSubscriber(sub, fn));
}
});
};
这可以说是自己写高阶 Observable 操作符的基础骨架,我们只是把自定义 Subscriber 和自定义操作符 myConcatMap的名字改好。而真正的逻辑修改是在关键函数 _next 中。根据之前说到的 concatMap 操作符的逻辑,我们需要做的是检查内部订阅是否完毕。代码如下:
_next(value) {
const innerObservable$ = this.fn(value);
const { isStopped } = this.innerSubscription || {
isStopped: true
}
this.innerSubscription = innerObservable$.subscribe(value => {
this.destination.next(value);
});
}
}
如果 this.innerSubscription 是 null 或者 undefined 都会返回内部订阅已经停止了。如果它有值,就说明它还没有结束。接下来我们要做的就是把新来的值放到缓存里,因此要有个缓存变量。代码如下:
class MyConcatMapSubscriber extends Subscriber {
constructor(sub, fn) {
super(sub);
this.fn = fn;
this.innerSubscription = null;
this.buffer = [];
}
_next(value) {
const { isStopped } = this.innerSubscription || {
isStopped: true
};
if (!isStopped) {
this.buffer = [...this.buffer, value];
} else {
const innerObservable$ = this.fn(value);
this.innerSubscription = innerObservable$.subscribe(value => {
this.destination.next(value);
});
}
}
}
在自定义 Subscriber 类的构造函数中,我们添加了一个 buffer 数组变量作为缓存。然后我们在 _next 添加了逻辑,如果订阅没有结束,就把值放到 buffer 中,如果订阅结束了,就把值传递下去。接下来,然而这样就完了吗?我们在把值传递下去的时候想一想,buffer 中的值是不是也应该属于内部 Observable 需要处理的值啊,当然了。那么就有如下代码:
class MyConcatMapSubscriber extends Subscriber {
constructor(sub, fn) {
super(sub);
this.fn = fn;
this.innerSubscription = null;
this.buffer = [];
}
_next(value) {
const { isStopped } = this.innerSubscription || {
isStopped: true
};
if (!isStopped) {
this.buffer = [...this.buffer, value];
} else {
const innerObservable$ = this.fn(value);
this.innerSubscription = innerObservable$.subscribe({
next: value => {
this.destination.next(value);
},
complete: () => {
if (this.buffer.length) {
const [first, ...rest] = this.buffer;
this.buffer = rest;
this._next(first);
}
}
});
}
}
}
我们在订阅函数中添加了 complete 事件的回调函数,内部的 Observable 发送完事件后,会触发 complete 事件,紧接着会调用回调函数。在这个回调函数中,我们就来判断是否真的应该 complete 。如果 buffer 中有值,就说明应该继续处理这些值,我们就取出第一个值递归调用 this._next ,剩下的值赋值给 buffer。这样,我们的代码逻辑就完整了,自定义版 concatMap操作符也完成了。我记得之前的文章说过 complete 事件貌似没什么用处,在这里体现了它的重要性。
如有任何问题,请添加微信公众号“读一读我”。