99 lines
No EOL
1.6 KiB
Go
99 lines
No EOL
1.6 KiB
Go
package queue
|
|
|
|
import (
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// Item represents a queue item
|
|
type Item struct {
|
|
Platform string
|
|
Request map[string]interface{}
|
|
}
|
|
|
|
// HandlerFunc defines a function that processes queue items
|
|
type HandlerFunc func(item Item)
|
|
|
|
// Queue represents a message queue
|
|
type Queue struct {
|
|
items chan Item
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
logger *slog.Logger
|
|
running bool
|
|
runMutex sync.Mutex
|
|
}
|
|
|
|
// New creates a new Queue instance
|
|
func New(logger *slog.Logger) *Queue {
|
|
return &Queue{
|
|
items: make(chan Item, 100),
|
|
quit: make(chan struct{}),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Start starts processing queue items
|
|
func (q *Queue) Start(handler HandlerFunc) {
|
|
q.runMutex.Lock()
|
|
defer q.runMutex.Unlock()
|
|
|
|
if q.running {
|
|
return
|
|
}
|
|
|
|
q.running = true
|
|
|
|
// Start worker
|
|
q.wg.Add(1)
|
|
go q.worker(handler)
|
|
}
|
|
|
|
// Stop stops processing queue items
|
|
func (q *Queue) Stop() {
|
|
q.runMutex.Lock()
|
|
defer q.runMutex.Unlock()
|
|
|
|
if !q.running {
|
|
return
|
|
}
|
|
|
|
q.running = false
|
|
close(q.quit)
|
|
q.wg.Wait()
|
|
}
|
|
|
|
// Add adds an item to the queue
|
|
func (q *Queue) Add(item Item) {
|
|
select {
|
|
case q.items <- item:
|
|
// Item added successfully
|
|
default:
|
|
// Queue is full
|
|
q.logger.Info("Queue is full, dropping message")
|
|
}
|
|
}
|
|
|
|
// worker processes queue items
|
|
func (q *Queue) worker(handler HandlerFunc) {
|
|
defer q.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case item := <-q.items:
|
|
// Process item
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
q.logger.Error("Panic in queue worker", "error", r)
|
|
}
|
|
}()
|
|
|
|
handler(item)
|
|
}()
|
|
case <-q.quit:
|
|
// Quit worker
|
|
return
|
|
}
|
|
}
|
|
} |