并发控制:限制异步任务的并发数量
8 分钟
题目一:基础并发控制器
实现一个并发控制器,限制同时执行的异步任务数量。要求:
- 限制并发数:同时最多执行 N 个异步任务
- 队列管理:超出限制的任务进入等待队列
- 自动调度:任务完成后自动执行下一个任务
- 保留顺序:结果按照添加顺序返回
测试用例
const createTask = (id, delay) => {
return () => new Promise((resolve) => {
console.log(`任务 ${id} 开始`)
setTimeout(() => {
console.log(`任务 ${id} 完成`)
resolve(`结果 ${id}`)
}, delay)
})
}
const controller = new ConcurrencyController(2)
const tasks = [
createTask(1, 1000),
createTask(2, 500),
createTask(3, 300),
createTask(4, 400),
createTask(5, 200),
]
Promise.all(tasks.map(task => controller.add(task)))
.then(results => {
console.log('所有任务完成:', results)
})
// 输出:
// 任务 1 开始
// 任务 2 开始
// 任务 2 完成 (500ms后)
// 任务 3 开始
// 任务 3 完成 (300ms后)
// 任务 4 开始
// 任务 1 完成 (1000ms后)
// 任务 5 开始
// 任务 5 完成 (200ms后)
// 任务 4 完成 (400ms后)
// 所有任务完成: ['结果 1', '结果 2', '结果 3', '结果 4', '结果 5']
解法
class ConcurrencyController {
constructor(limit) {
this.limit = limit
this.running = 0
this.queue = []
}
add(asyncFn) {
return new Promise((resolve, reject) => {
this.queue.push({ asyncFn, resolve, reject })
this.run()
})
}
run() {
if (this.running >= this.limit || this.queue.length === 0) {
return
}
this.running++
const { asyncFn, resolve, reject } = this.queue.shift()
asyncFn()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--
this.run()
})
}
}
题目二:支持优先级和取消
在题目一的基础上,增加以下功能:
- 支持任务优先级:优先级高的任务优先执行
- 支持任务取消:可以取消等待队列中的任务
测试用例
const controller = new AdvancedConcurrencyController(2)
// 测试优先级
controller.add(() => Promise.resolve('低优先级'), 0)
controller.add(() => Promise.resolve('高优先级'), 10)
controller.add(() => Promise.resolve('中优先级'), 5)
// 测试取消
const { promise, cancel } = controller.add(() => Promise.resolve('可取消任务'))
cancel() // 取消任务
promise.catch(err => console.log('任务已取消'))
解法
class AdvancedConcurrencyController {
constructor(limit) {
this.limit = limit
this.running = 0
this.queue = []
this.taskId = 0
}
add(asyncFn, priority = 0) {
const id = this.taskId++
let canceled = false
const promise = new Promise((resolve, reject) => {
this.queue.push({
id,
asyncFn,
resolve,
reject,
priority,
isCanceled: () => canceled,
})
// 按优先级排序
this.queue.sort((a, b) => b.priority - a.priority)
this.run()
})
return {
promise,
cancel: () => {
canceled = true
const index = this.queue.findIndex(item => item.id === id)
if (index !== -1) {
this.queue.splice(index, 1)
}
},
}
}
run() {
if (this.running >= this.limit || this.queue.length === 0) {
return
}
const task = this.queue.shift()
// 跳过已取消的任务
if (task.isCanceled()) {
this.run()
return
}
this.running++
const { asyncFn, resolve, reject } = task
asyncFn()
.then(resolve)
.catch(reject)
.finally(() => {
this.running--
this.run()
})
}
}
题目三:使用 serial 和 parallel 实现并发控制(字节真题)
实现三个函数:serial(串行)、parallel(并行)、batch(并发控制),要求 batch 只能使用 serial 和 parallel 实现。
测试用例
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms))
const mock = (ms) => () => sleep(ms).then(() => console.log(ms))
const tasks = [mock(200), mock(1000), mock(700), mock(2000), mock(300)]
// 串行执行:按顺序执行,总耗时 4200ms
await serial(tasks)
// 并行执行:同时执行,总耗时 2000ms
await parallel(tasks)
// 并发执行:最多 2 个并发,总耗时 2900ms
await batch(tasks, 2)
/* batch(tasks, 2) 执行流程:
时间轴:
0ms: worker1 取 task1(200ms),worker2 取 task2(1000ms)
200ms: worker1 完成,立即取 task3(700ms)
900ms: worker1 完成,立即取 task4(2000ms)
1000ms: worker2 完成,立即取 task5(300ms)
1300ms: worker2 完成
2900ms: worker1 完成
输出顺序:200 -> 700 -> 1000 -> 300 -> 2000
*/
解法
核心思路:创建 max 个 worker,每个 worker 串行地从共享队列中取任务执行,所有 worker 并行工作。
// 串行执行:一个任务完成后再执行下一个
const serial = async (tasks) => {
while (tasks.length > 0) {
const task = tasks.shift()
if (task) await task()
}
}
// 并行执行:所有任务同时执行
const parallel = async (tasks) => {
await Promise.all(tasks.map((task) => task()))
}
// 并发执行:限制并发数量
const batch = async (tasks, max) => {
const queue = [...tasks]
// 创建 worker:串行地从队列中取任务执行
const createWorker = () => async () => {
await serial(queue)
}
// 创建 max 个 worker 并行工作
const workers = Array(Math.min(max, tasks.length))
.fill(null)
.map(createWorker)
await parallel(workers)
}
关键点:
- 每个 worker 内部使用
serial(queue)串行消费共享队列 - 多个 worker 使用
parallel(workers)并行工作 - 哪个 worker 空闲,就自动取下一个任务,实现负载均衡
- 任务完成立即启动下一个,充分利用并发槽位
题目四:批量加载图片
实现一个批量加载图片的函数,要求:
- 支持批量加载多个图片 URL
- 限制并发请求数量
- 支持失败重试
- 返回所有图片的加载结果
测试用例
const imageUrls = [
'https://example.com/image1.jpg',
'https://example.com/image2.jpg',
'https://example.com/image3.jpg',
'https://example.com/image4.jpg',
'https://example.com/image5.jpg',
]
const results = await loadImages(imageUrls, {
maxConcurrency: 3, // 最多 3 个并发
maxRetry: 2, // 失败重试 2 次
})
console.log('加载结果:', results)
/* 输出示例:
加载结果: [
{ url: 'https://example.com/image1.jpg', status: 'success', image: Image },
{ url: 'https://example.com/image2.jpg', status: 'success', image: Image },
{ url: 'https://example.com/image3.jpg', status: 'failed', error: 'Load failed' },
{ url: 'https://example.com/image4.jpg', status: 'success', image: Image },
{ url: 'https://example.com/image5.jpg', status: 'success', image: Image },
]
*/
解法
async function loadImages(urls, options = {}) {
const { maxConcurrency = 3, maxRetry = 2 } = options
const results = []
const executing = []
for (const [index, url] of urls.entries()) {
// 创建加载任务
const promise = loadImageWithRetry(url, maxRetry)
.then(image => {
results[index] = { url, status: 'success', image }
})
.catch(error => {
results[index] = { url, status: 'failed', error: error.message }
})
// 并发控制
if (maxConcurrency <= urls.length) {
const e = promise.then(() => executing.splice(executing.indexOf(e), 1))
executing.push(e)
if (executing.length >= maxConcurrency) {
await Promise.race(executing)
}
}
}
// 等待所有任务完成
await Promise.all(results.map((_, i) =>
results[i] ? Promise.resolve() : Promise.reject()
))
return results
}
// 加载单张图片
function loadImage(url) {
return new Promise((resolve, reject) => {
const img = new Image()
img.onload = () => resolve(img)
img.onerror = () => reject(new Error('Load failed'))
img.src = url
})
}
// 支持重试的加载
async function loadImageWithRetry(url, maxRetry) {
let retryCount = 0
while (retryCount <= maxRetry) {
try {
return await loadImage(url)
} catch (error) {
retryCount++
if (retryCount > maxRetry) {
throw error
}
// 等待后重试(指数退避)
await new Promise(resolve => setTimeout(resolve, 1000 * retryCount))
}
}
}
关键点:
- 使用
new Image()加载图片,通过onload和onerror监听加载状态 - 复用函数式并发控制的实现(
executing数组 +Promise.race) - 通过
loadImageWithRetry封装重试逻辑,失败后指数退避 - 结果按照 URL 添加顺序返回
- 部分图片加载失败不影响其他图片