Backend changes
This commit is contained in:
@@ -1,7 +1,9 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Job represents a task that can be executed
|
||||
@@ -20,6 +22,7 @@ type JobQueue struct {
|
||||
|
||||
// NewJobQueue creates a new job queue with specified max concurrent workers
|
||||
func NewJobQueue(maxWorkers int) *JobQueue {
|
||||
log.Printf("[QUEUE] Initializing job queue with %d workers and buffer size 100", maxWorkers)
|
||||
jq := &JobQueue{
|
||||
jobs: make(chan Job, 100), // Buffer size of 100 jobs
|
||||
maxWorkers: maxWorkers,
|
||||
@@ -32,33 +35,52 @@ func NewJobQueue(maxWorkers int) *JobQueue {
|
||||
func (jq *JobQueue) start() {
|
||||
// Start the workers
|
||||
for i := 0; i < jq.maxWorkers; i++ {
|
||||
workerId := i + 1
|
||||
log.Printf("[WORKER-%d] Starting worker", workerId)
|
||||
jq.wg.Add(1)
|
||||
go func() {
|
||||
go func(id int) {
|
||||
defer jq.wg.Done()
|
||||
for job := range jq.jobs {
|
||||
jq.mu.Lock()
|
||||
jq.running++
|
||||
queueLen := len(jq.jobs)
|
||||
jq.mu.Unlock()
|
||||
|
||||
log.Printf("[WORKER-%d] Processing job (running: %d, queued: %d)",
|
||||
id, jq.running, queueLen)
|
||||
|
||||
startTime := time.Now()
|
||||
job.Execute()
|
||||
elapsed := time.Since(startTime)
|
||||
|
||||
jq.mu.Lock()
|
||||
jq.running--
|
||||
jq.mu.Unlock()
|
||||
|
||||
log.Printf("[WORKER-%d] Completed job in %v (running: %d, queued: %d)",
|
||||
id, elapsed, jq.running, len(jq.jobs))
|
||||
}
|
||||
}()
|
||||
log.Printf("[WORKER-%d] Worker shutting down", id)
|
||||
}(workerId)
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue adds a job to the queue
|
||||
func (jq *JobQueue) Enqueue(job Job) {
|
||||
jq.mu.Lock()
|
||||
queueLen := len(jq.jobs)
|
||||
jq.mu.Unlock()
|
||||
|
||||
log.Printf("[QUEUE] Job enqueued (queue length: %d, running: %d)", queueLen, jq.running)
|
||||
jq.jobs <- job
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the job queue
|
||||
func (jq *JobQueue) Stop() {
|
||||
log.Println("[QUEUE] Stopping job queue, waiting for running jobs to complete")
|
||||
close(jq.jobs)
|
||||
jq.wg.Wait()
|
||||
log.Println("[QUEUE] Job queue shutdown complete")
|
||||
}
|
||||
|
||||
// QueueStats returns statistics about the queue
|
||||
|
||||
Reference in New Issue
Block a user