Skip to content

任务队列 &I

实现核心思路

  • addTask 添加任务
    • _getShortestLineIndex 获取最短的队列索引
    • return new Promise
      • 声明一个 runTask 来执行task
        • task.then中 调用上Promise的resolve,
        • 并且 执行_next,如果队列有值,则shift 执行,如果没有则,停止
      • 如果 当前队列没有执行,则runTask
      • 如果 当前队列正在执行,则将runTask 添加到队列中
  • addTasks 添加多个任务 Array.map 调用addTask
js
export default class MultiLineQueue {
  constructor(concurrentLines = 2) {
    this.concurrentLines = concurrentLines;
    this.lines = Array.from({ length: concurrentLines }, () => ({
      queue: [],
      running: false,
    }));
  }

  // 单个任务添加
  addTask(task) {
    if (typeof task !== 'function') {
      throw new Error('任务必须是返回Promise的函数');
    }

    let lineIndex = this._getShortestLineIndex();

    return new Promise((resolve, reject) => {
      const runTask = () => {
        this.lines[lineIndex].running = true;
        task()
          .then((res) => {
            resolve(res);
            this._next(lineIndex);
          })
          .catch((err) => {
            reject(err);
            this._next(lineIndex);
          });
      };

      if (!this.lines[lineIndex].running) {
        runTask();
      } else {
        this.lines[lineIndex].queue.push(runTask);
      }
    });
  }

  // 批量任务添加,tasks数组为返回Promise的函数数组,返回所有任务的Promise数组
  addTasks(tasks) {
    if (!Array.isArray(tasks)) {
      throw new Error('批量任务参数必须是数组');
    }
    return tasks.map((task) => this.addTask(task));
  }

  // 获取当前任务最少的队列索引,用于负载均衡分配任务
  _getShortestLineIndex() {
    let minIndex = 0;
    let minLength =
      this.lines[0].queue.length + (this.lines[0].running ? 1 : 0);

    for (let i = 1; i < this.concurrentLines; i++) {
      const currLength =
        this.lines[i].queue.length + (this.lines[i].running ? 1 : 0);
      if (currLength < minLength) {
        minLength = currLength;
        minIndex = i;
      }
    }
    return minIndex;
  }

  _next(lineIndex) {
    if (this.lines[lineIndex].queue.length > 0) {
      const nextTask = this.lines[lineIndex].queue.shift();
      nextTask();
    } else {
      this.lines[lineIndex].running = false;
    }
  }
}

参考:p-queue