并发控制:限制异步任务的并发数量

8 分钟

题目一:基础并发控制器

实现一个并发控制器,限制同时执行的异步任务数量。要求:

  1. 限制并发数:同时最多执行 N 个异步任务
  2. 队列管理:超出限制的任务进入等待队列
  3. 自动调度:任务完成后自动执行下一个任务
  4. 保留顺序:结果按照添加顺序返回

测试用例

const createTask = (id, delay) => {
  return () => new Promise((resolve) => {
    console.log(`任务 ${id} 开始`)
    setTimeout(() => {
      console.log(`任务 ${id} 完成`)
      resolve(`结果 ${id}`)
    }, delay)
  })
}

const controller = new ConcurrencyController(2)

const tasks = [
  createTask(1, 1000),
  createTask(2, 500),
  createTask(3, 300),
  createTask(4, 400),
  createTask(5, 200),
]

Promise.all(tasks.map(task => controller.add(task)))
  .then(results => {
    console.log('所有任务完成:', results)
  })

// 输出:
// 任务 1 开始
// 任务 2 开始
// 任务 2 完成 (500ms后)
// 任务 3 开始
// 任务 3 完成 (300ms后)
// 任务 4 开始
// 任务 1 完成 (1000ms后)
// 任务 5 开始
// 任务 5 完成 (200ms后)
// 任务 4 完成 (400ms后)
// 所有任务完成: ['结果 1', '结果 2', '结果 3', '结果 4', '结果 5']

解法

class ConcurrencyController {
  constructor(limit) {
    this.limit = limit
    this.running = 0
    this.queue = []
  }

  add(asyncFn) {
    return new Promise((resolve, reject) => {
      this.queue.push({ asyncFn, resolve, reject })
      this.run()
    })
  }

  run() {
    if (this.running >= this.limit || this.queue.length === 0) {
      return
    }

    this.running++
    const { asyncFn, resolve, reject } = this.queue.shift()

    asyncFn()
      .then(resolve)
      .catch(reject)
      .finally(() => {
        this.running--
        this.run()
      })
  }
}

题目二:支持优先级和取消

在题目一的基础上,增加以下功能:

  1. 支持任务优先级:优先级高的任务优先执行
  2. 支持任务取消:可以取消等待队列中的任务

测试用例

const controller = new AdvancedConcurrencyController(2)

// 测试优先级
controller.add(() => Promise.resolve('低优先级'), 0)
controller.add(() => Promise.resolve('高优先级'), 10)
controller.add(() => Promise.resolve('中优先级'), 5)

// 测试取消
const { promise, cancel } = controller.add(() => Promise.resolve('可取消任务'))
cancel() // 取消任务
promise.catch(err => console.log('任务已取消'))

解法

class AdvancedConcurrencyController {
  constructor(limit) {
    this.limit = limit
    this.running = 0
    this.queue = []
    this.taskId = 0
  }

  add(asyncFn, priority = 0) {
    const id = this.taskId++
    let canceled = false

    const promise = new Promise((resolve, reject) => {
      this.queue.push({
        id,
        asyncFn,
        resolve,
        reject,
        priority,
        isCanceled: () => canceled,
      })
      
      // 按优先级排序
      this.queue.sort((a, b) => b.priority - a.priority)
      this.run()
    })

    return {
      promise,
      cancel: () => {
        canceled = true
        const index = this.queue.findIndex(item => item.id === id)
        if (index !== -1) {
          this.queue.splice(index, 1)
        }
      },
    }
  }

  run() {
    if (this.running >= this.limit || this.queue.length === 0) {
      return
    }

    const task = this.queue.shift()
    
    // 跳过已取消的任务
    if (task.isCanceled()) {
      this.run()
      return
    }

    this.running++
    const { asyncFn, resolve, reject } = task

    asyncFn()
      .then(resolve)
      .catch(reject)
      .finally(() => {
        this.running--
        this.run()
      })
  }
}

题目三:使用 serial 和 parallel 实现并发控制(字节真题)

实现三个函数:serial(串行)、parallel(并行)、batch(并发控制),要求 batch 只能使用 serialparallel 实现。

测试用例

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms))
const mock = (ms) => () => sleep(ms).then(() => console.log(ms))

const tasks = [mock(200), mock(1000), mock(700), mock(2000), mock(300)]

// 串行执行:按顺序执行,总耗时 4200ms
await serial(tasks)

// 并行执行:同时执行,总耗时 2000ms
await parallel(tasks)

// 并发执行:最多 2 个并发,总耗时 2900ms
await batch(tasks, 2)

/* batch(tasks, 2) 执行流程:
时间轴:
0ms:    worker1 取 task1(200ms),worker2 取 task2(1000ms)
200ms:  worker1 完成,立即取 task3(700ms)
900ms:  worker1 完成,立即取 task4(2000ms)
1000ms: worker2 完成,立即取 task5(300ms)
1300ms: worker2 完成
2900ms: worker1 完成

输出顺序:200 -> 700 -> 1000 -> 300 -> 2000
*/

解法

核心思路:创建 max 个 worker,每个 worker 串行地从共享队列中取任务执行,所有 worker 并行工作。

// 串行执行:一个任务完成后再执行下一个
const serial = async (tasks) => {
  while (tasks.length > 0) {
    const task = tasks.shift()
    if (task) await task()
  }
}

// 并行执行:所有任务同时执行
const parallel = async (tasks) => {
  await Promise.all(tasks.map((task) => task()))
}

// 并发执行:限制并发数量
const batch = async (tasks, max) => {
  const queue = [...tasks]

  // 创建 worker:串行地从队列中取任务执行
  const createWorker = () => async () => {
    await serial(queue)
  }

  // 创建 max 个 worker 并行工作
  const workers = Array(Math.min(max, tasks.length))
    .fill(null)
    .map(createWorker)

  await parallel(workers)
}

关键点

  • 每个 worker 内部使用 serial(queue) 串行消费共享队列
  • 多个 worker 使用 parallel(workers) 并行工作
  • 哪个 worker 空闲,就自动取下一个任务,实现负载均衡
  • 任务完成立即启动下一个,充分利用并发槽位

题目四:批量加载图片

实现一个批量加载图片的函数,要求:

  1. 支持批量加载多个图片 URL
  2. 限制并发请求数量
  3. 支持失败重试
  4. 返回所有图片的加载结果

测试用例

const imageUrls = [
  'https://example.com/image1.jpg',
  'https://example.com/image2.jpg',
  'https://example.com/image3.jpg',
  'https://example.com/image4.jpg',
  'https://example.com/image5.jpg',
]

const results = await loadImages(imageUrls, {
  maxConcurrency: 3,  // 最多 3 个并发
  maxRetry: 2,        // 失败重试 2 次
})

console.log('加载结果:', results)

/* 输出示例:
加载结果: [
  { url: 'https://example.com/image1.jpg', status: 'success', image: Image },
  { url: 'https://example.com/image2.jpg', status: 'success', image: Image },
  { url: 'https://example.com/image3.jpg', status: 'failed', error: 'Load failed' },
  { url: 'https://example.com/image4.jpg', status: 'success', image: Image },
  { url: 'https://example.com/image5.jpg', status: 'success', image: Image },
]
*/

解法

async function loadImages(urls, options = {}) {
  const { maxConcurrency = 3, maxRetry = 2 } = options
  
  const results = []
  const executing = []

  for (const [index, url] of urls.entries()) {
    // 创建加载任务
    const promise = loadImageWithRetry(url, maxRetry)
      .then(image => {
        results[index] = { url, status: 'success', image }
      })
      .catch(error => {
        results[index] = { url, status: 'failed', error: error.message }
      })

    // 并发控制
    if (maxConcurrency <= urls.length) {
      const e = promise.then(() => executing.splice(executing.indexOf(e), 1))
      executing.push(e)

      if (executing.length >= maxConcurrency) {
        await Promise.race(executing)
      }
    }
  }

  // 等待所有任务完成
  await Promise.all(results.map((_, i) => 
    results[i] ? Promise.resolve() : Promise.reject()
  ))

  return results
}

// 加载单张图片
function loadImage(url) {
  return new Promise((resolve, reject) => {
    const img = new Image()
    
    img.onload = () => resolve(img)
    img.onerror = () => reject(new Error('Load failed'))
    
    img.src = url
  })
}

// 支持重试的加载
async function loadImageWithRetry(url, maxRetry) {
  let retryCount = 0
  
  while (retryCount <= maxRetry) {
    try {
      return await loadImage(url)
    } catch (error) {
      retryCount++
      
      if (retryCount > maxRetry) {
        throw error
      }
      
      // 等待后重试(指数退避)
      await new Promise(resolve => setTimeout(resolve, 1000 * retryCount))
    }
  }
}

关键点

  • 使用 new Image() 加载图片,通过 onloadonerror 监听加载状态
  • 复用函数式并发控制的实现(executing 数组 + Promise.race
  • 通过 loadImageWithRetry 封装重试逻辑,失败后指数退避
  • 结果按照 URL 添加顺序返回
  • 部分图片加载失败不影响其他图片