专业编程基础技术教程

网站首页 > 基础教程 正文

RxJS——自制 concatMap 操作符(rxjava concatmap)

ccvgpt 2024-07-23 01:17:41 基础教程 13 ℃


今天来实现 xxxMap 三剑客中的最后一位 concatMap 。我们知道这三个操作符都是与内部订阅有关的,mergeMap 的特点是对每个上游事件都作出响应,switchMap 的特点是先检查有没有发生过内部订阅,发生过,先取消,再做订阅。concatMap 的特点就像 concat 操作符一样,关注的是处理事件的顺序性。也就是说,当上游有值到来后,concatMap 会把值放到缓存中,等待上次的订阅发出 complete 事件后才会把新值从缓存中取出来做后续操作。

RxJS——自制 concatMap 操作符(rxjava concatmap)

同样的,我们先来看看 concatMap 的会带来什么样的效果:

fromEvent(document, "click")
  .pipe(
    scan(i => i + 1, 0),
    concatMap(value => of(value).pipe(delay(500)))
  )
  .subscribe(value => {
    console.log(value);
  });

无论我们在页面上点击速度有多快,数字的输出一定是按照 500 毫秒一个来的,大家可以对比一下 mergeMap,会发现其中的区别。

下面就来实现 myConcatMap。还是老规矩,拷贝之前的代码:

class MyConcatMapSubscriber extends Subscriber {
  constructor(sub, fn) {
    super(sub);
    this.fn = fn;
    this.innerSubscription = null;
  }

  _next(value) {
    const innerObservable$ = this.fn(value);

    this.innerSubscription = innerObservable$.subscribe(value => {
      this.destination.next(value);
    });
  }
}

const myConcatMap = fn => source => {
  return source.lift({
    call(sub, source) {
      source.subscribe(new MyConcatMapSubscriber(sub, fn));
    }
  });
};

这可以说是自己写高阶 Observable 操作符的基础骨架,我们只是把自定义 Subscriber 和自定义操作符 myConcatMap的名字改好。而真正的逻辑修改是在关键函数 _next 中。根据之前说到的 concatMap 操作符的逻辑,我们需要做的是检查内部订阅是否完毕。代码如下:

_next(value) {
    const innerObservable$ = this.fn(value);

    const { isStopped } = this.innerSubscription || {
      isStopped: true
    }

    this.innerSubscription = innerObservable$.subscribe(value => {
      this.destination.next(value);
    });
  }
}

如果 this.innerSubscriptionnull 或者 undefined 都会返回内部订阅已经停止了。如果它有值,就说明它还没有结束。接下来我们要做的就是把新来的值放到缓存里,因此要有个缓存变量。代码如下:

class MyConcatMapSubscriber extends Subscriber {
  constructor(sub, fn) {
    super(sub);
    this.fn = fn;
    this.innerSubscription = null;
    this.buffer = [];
  }

  _next(value) {
    const { isStopped } = this.innerSubscription || {
      isStopped: true
    };

    if (!isStopped) {
      this.buffer = [...this.buffer, value];
    } else {
      const innerObservable$ = this.fn(value);

      this.innerSubscription = innerObservable$.subscribe(value => {
        this.destination.next(value);
      });
    }
  }
}

在自定义 Subscriber 类的构造函数中,我们添加了一个 buffer 数组变量作为缓存。然后我们在 _next 添加了逻辑,如果订阅没有结束,就把值放到 buffer 中,如果订阅结束了,就把值传递下去。接下来,然而这样就完了吗?我们在把值传递下去的时候想一想,buffer 中的值是不是也应该属于内部 Observable 需要处理的值啊,当然了。那么就有如下代码:

class MyConcatMapSubscriber extends Subscriber {
  constructor(sub, fn) {
    super(sub);
    this.fn = fn;
    this.innerSubscription = null;
    this.buffer = [];
  }

  _next(value) {
    const { isStopped } = this.innerSubscription || {
      isStopped: true
    };

    if (!isStopped) {
      this.buffer = [...this.buffer, value];
    } else {
      const innerObservable$ = this.fn(value);

      this.innerSubscription = innerObservable$.subscribe({
        next: value => {
          this.destination.next(value);
        },
        complete: () => {
          if (this.buffer.length) {
            const [first, ...rest] = this.buffer;
            this.buffer = rest;
            this._next(first);
          }
        }
      });
    }
  }
}

我们在订阅函数中添加了 complete 事件的回调函数,内部的 Observable 发送完事件后,会触发 complete 事件,紧接着会调用回调函数。在这个回调函数中,我们就来判断是否真的应该 complete 。如果 buffer 中有值,就说明应该继续处理这些值,我们就取出第一个值递归调用 this._next ,剩下的值赋值给 buffer。这样,我们的代码逻辑就完整了,自定义版 concatMap操作符也完成了。我记得之前的文章说过 complete 事件貌似没什么用处,在这里体现了它的重要性。

如有任何问题,请添加微信公众号“读一读我”。

Tags:

最近发表
标签列表