Skip to content

Commit 6cf9571

Browse files
jdaltonclaude
andcommitted
Add PromiseQueue utility with comprehensive tests
Implements a promise queue with configurable concurrency limits based on patterns from coana-package-manager. Features include: - Configurable max concurrency - Optional queue size limits to prevent memory buildup - Progress tracking (activeCount, pendingCount) - Idle detection with onIdle() - Proper error propagation Includes 7 comprehensive unit tests covering all functionality. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent a907f6f commit 6cf9571

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed

src/utils/promise-queue.mts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/**
2+
* A promise queue that limits concurrent execution of async tasks.
3+
* Based on patterns from coana-package-manager for resource-aware async operations.
4+
*/
5+
6+
type QueuedTask<T> = {
7+
fn: () => Promise<T>
8+
resolve: (value: T) => void
9+
reject: (error: unknown) => void
10+
}
11+
12+
export class PromiseQueue {
13+
private queue: Array<QueuedTask<unknown>> = []
14+
private running = 0
15+
16+
private readonly maxConcurrency: number
17+
private readonly maxQueueLength?: number
18+
19+
/**
20+
* Creates a new PromiseQueue
21+
* @param maxConcurrency - Maximum number of promises that can run concurrently
22+
* @param maxQueueLength - Maximum queue size (older tasks are dropped if exceeded)
23+
*/
24+
constructor(maxConcurrency: number, maxQueueLength?: number) {
25+
this.maxConcurrency = maxConcurrency
26+
this.maxQueueLength = maxQueueLength
27+
if (maxConcurrency < 1) {
28+
throw new Error('maxConcurrency must be at least 1')
29+
}
30+
}
31+
32+
/**
33+
* Add a task to the queue
34+
* @param fn - Async function to execute
35+
* @returns Promise that resolves with the function's result
36+
*/
37+
async add<T>(fn: () => Promise<T>): Promise<T> {
38+
return new Promise<T>((resolve, reject) => {
39+
const task: QueuedTask<T> = { fn, resolve, reject }
40+
41+
if (this.maxQueueLength && this.queue.length >= this.maxQueueLength) {
42+
// Drop oldest task to prevent memory buildup
43+
this.queue.shift()
44+
}
45+
46+
this.queue.push(task as QueuedTask<unknown>)
47+
this.runNext()
48+
})
49+
}
50+
51+
private runNext(): void {
52+
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
53+
return
54+
}
55+
56+
const task = this.queue.shift()
57+
if (!task) {
58+
return
59+
}
60+
61+
this.running++
62+
63+
task
64+
.fn()
65+
.then(task.resolve)
66+
.catch(task.reject)
67+
.finally(() => {
68+
this.running--
69+
this.runNext()
70+
})
71+
}
72+
73+
/**
74+
* Wait for all queued and running tasks to complete
75+
*/
76+
async onIdle(): Promise<void> {
77+
return new Promise<void>(resolve => {
78+
const check = () => {
79+
if (this.running === 0 && this.queue.length === 0) {
80+
resolve()
81+
} else {
82+
setImmediate(check)
83+
}
84+
}
85+
check()
86+
})
87+
}
88+
89+
/**
90+
* Get the number of tasks currently running
91+
*/
92+
get activeCount(): number {
93+
return this.running
94+
}
95+
96+
/**
97+
* Get the number of tasks waiting in the queue
98+
*/
99+
get pendingCount(): number {
100+
return this.queue.length
101+
}
102+
103+
/**
104+
* Clear all pending tasks from the queue (does not affect running tasks)
105+
*/
106+
clear(): void {
107+
this.queue = []
108+
}
109+
}

src/utils/promise-queue.test.mts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import { describe, expect, it } from 'vitest'
2+
3+
import { PromiseQueue } from './promise-queue.mts'
4+
5+
describe('PromiseQueue', () => {
6+
it('should execute tasks with limited concurrency', async () => {
7+
const queue = new PromiseQueue(2)
8+
const executed: number[] = []
9+
const tasks = Array.from({ length: 5 }, (_, i) => {
10+
return () =>
11+
new Promise<number>(resolve => {
12+
executed.push(i)
13+
setTimeout(() => resolve(i), 10)
14+
})
15+
})
16+
17+
const results = await Promise.all(tasks.map(task => queue.add(task)))
18+
19+
expect(results).toEqual([0, 1, 2, 3, 4])
20+
})
21+
22+
it('should track active and pending counts', async () => {
23+
const queue = new PromiseQueue(1)
24+
const task = () => new Promise(resolve => setTimeout(resolve, 50))
25+
26+
const p1 = queue.add(task)
27+
const p2 = queue.add(task)
28+
29+
expect(queue.activeCount).toBe(1)
30+
expect(queue.pendingCount).toBe(1)
31+
32+
await Promise.all([p1, p2])
33+
34+
expect(queue.activeCount).toBe(0)
35+
expect(queue.pendingCount).toBe(0)
36+
})
37+
38+
it('should limit queue size when maxQueueLength is set', () => {
39+
const queue = new PromiseQueue(1, 2)
40+
41+
// Add one running task
42+
queue.add(() => new Promise(resolve => setTimeout(resolve, 100)))
43+
44+
// Queue should be empty initially
45+
expect(queue.pendingCount).toBe(0)
46+
47+
// Add 3 tasks to queue (max 2, so oldest will be dropped)
48+
queue.add(() => Promise.resolve())
49+
expect(queue.pendingCount).toBe(1)
50+
51+
queue.add(() => Promise.resolve())
52+
expect(queue.pendingCount).toBe(2)
53+
54+
queue.add(() => Promise.resolve())
55+
// Should still be 2 because oldest was dropped
56+
expect(queue.pendingCount).toBe(2)
57+
})
58+
59+
it('should wait for all tasks to complete with onIdle', async () => {
60+
const queue = new PromiseQueue(2)
61+
const results: number[] = []
62+
63+
for (let i = 0; i < 5; i++) {
64+
queue.add(async () => {
65+
await new Promise(resolve => setTimeout(resolve, 10))
66+
results.push(i)
67+
})
68+
}
69+
70+
await queue.onIdle()
71+
72+
expect(results).toHaveLength(5)
73+
})
74+
75+
it('should clear pending tasks', async () => {
76+
const queue = new PromiseQueue(1)
77+
const results: string[] = []
78+
79+
const task = (id: string) => () =>
80+
new Promise<void>(resolve => {
81+
results.push(id)
82+
setTimeout(resolve, 50)
83+
})
84+
85+
queue.add(task('a'))
86+
queue.add(task('b'))
87+
queue.add(task('c'))
88+
89+
queue.clear()
90+
91+
await queue.onIdle()
92+
93+
// Only the first task should have run
94+
expect(results).toEqual(['a'])
95+
})
96+
97+
it('should handle task errors', async () => {
98+
const queue = new PromiseQueue(1)
99+
100+
const goodTask = () => Promise.resolve('success')
101+
const badTask = () => Promise.reject(new Error('failure'))
102+
103+
const result1 = await queue.add(goodTask)
104+
await expect(queue.add(badTask)).rejects.toThrow('failure')
105+
const result2 = await queue.add(goodTask)
106+
107+
expect(result1).toBe('success')
108+
expect(result2).toBe('success')
109+
})
110+
111+
it('should throw error for invalid concurrency', () => {
112+
expect(() => new PromiseQueue(0)).toThrow(
113+
'maxConcurrency must be at least 1',
114+
)
115+
expect(() => new PromiseQueue(-1)).toThrow(
116+
'maxConcurrency must be at least 1',
117+
)
118+
})
119+
})

0 commit comments

Comments
 (0)