97 lines
2.2 KiB
Go
97 lines
2.2 KiB
Go
package queue
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Job represents a task that can be executed
|
|
type Job interface {
|
|
Execute()
|
|
}
|
|
|
|
// JobQueue manages the execution of jobs with limited concurrency
|
|
type JobQueue struct {
|
|
jobs chan Job
|
|
maxWorkers int
|
|
wg sync.WaitGroup
|
|
running int
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewJobQueue creates a new job queue with specified max concurrent workers
|
|
func NewJobQueue(maxWorkers int) *JobQueue {
|
|
log.Printf("[QUEUE] Initializing job queue with %d workers and buffer size 100", maxWorkers)
|
|
jq := &JobQueue{
|
|
jobs: make(chan Job, 100), // Buffer size of 100 jobs
|
|
maxWorkers: maxWorkers,
|
|
}
|
|
jq.start()
|
|
return jq
|
|
}
|
|
|
|
// start initializes the worker pool
|
|
func (jq *JobQueue) start() {
|
|
// Start the workers
|
|
for i := 0; i < jq.maxWorkers; i++ {
|
|
workerId := i + 1
|
|
log.Printf("[WORKER-%d] Starting worker", workerId)
|
|
jq.wg.Add(1)
|
|
go func(id int) {
|
|
defer jq.wg.Done()
|
|
for job := range jq.jobs {
|
|
jq.mu.Lock()
|
|
jq.running++
|
|
queueLen := len(jq.jobs)
|
|
jq.mu.Unlock()
|
|
|
|
log.Printf("[WORKER-%d] Processing job (running: %d, queued: %d)",
|
|
id, jq.running, queueLen)
|
|
|
|
startTime := time.Now()
|
|
job.Execute()
|
|
elapsed := time.Since(startTime)
|
|
|
|
jq.mu.Lock()
|
|
jq.running--
|
|
jq.mu.Unlock()
|
|
|
|
log.Printf("[WORKER-%d] Completed job in %v (running: %d, queued: %d)",
|
|
id, elapsed, jq.running, len(jq.jobs))
|
|
}
|
|
log.Printf("[WORKER-%d] Worker shutting down", id)
|
|
}(workerId)
|
|
}
|
|
}
|
|
|
|
// Enqueue adds a job to the queue
|
|
func (jq *JobQueue) Enqueue(job Job) {
|
|
jq.mu.Lock()
|
|
queueLen := len(jq.jobs)
|
|
jq.mu.Unlock()
|
|
|
|
log.Printf("[QUEUE] Job enqueued (queue length: %d, running: %d)", queueLen, jq.running)
|
|
jq.jobs <- job
|
|
}
|
|
|
|
// Stop gracefully shuts down the job queue
|
|
func (jq *JobQueue) Stop() {
|
|
log.Println("[QUEUE] Stopping job queue, waiting for running jobs to complete")
|
|
close(jq.jobs)
|
|
jq.wg.Wait()
|
|
log.Println("[QUEUE] Job queue shutdown complete")
|
|
}
|
|
|
|
// QueueStats returns statistics about the queue
|
|
func (jq *JobQueue) QueueStats() map[string]int {
|
|
jq.mu.Lock()
|
|
defer jq.mu.Unlock()
|
|
|
|
return map[string]int{
|
|
"queue_length": len(jq.jobs),
|
|
"max_workers": jq.maxWorkers,
|
|
"running_jobs": jq.running,
|
|
}
|
|
}
|