This commit is contained in:
parent
9c78ea2d48
commit
7c684af8c3
79 changed files with 3594 additions and 3257 deletions
99
internal/queue/queue.go
Normal file
99
internal/queue/queue.go
Normal file
|
@ -0,0 +1,99 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Item represents a queue item
|
||||
type Item struct {
|
||||
Platform string
|
||||
Request map[string]interface{}
|
||||
}
|
||||
|
||||
// HandlerFunc defines a function that processes queue items
|
||||
type HandlerFunc func(item Item)
|
||||
|
||||
// Queue represents a message queue
|
||||
type Queue struct {
|
||||
items chan Item
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
logger *slog.Logger
|
||||
running bool
|
||||
runMutex sync.Mutex
|
||||
}
|
||||
|
||||
// New creates a new Queue instance
|
||||
func New(logger *slog.Logger) *Queue {
|
||||
return &Queue{
|
||||
items: make(chan Item, 100),
|
||||
quit: make(chan struct{}),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts processing queue items
|
||||
func (q *Queue) Start(handler HandlerFunc) {
|
||||
q.runMutex.Lock()
|
||||
defer q.runMutex.Unlock()
|
||||
|
||||
if q.running {
|
||||
return
|
||||
}
|
||||
|
||||
q.running = true
|
||||
|
||||
// Start worker
|
||||
q.wg.Add(1)
|
||||
go q.worker(handler)
|
||||
}
|
||||
|
||||
// Stop stops processing queue items
|
||||
func (q *Queue) Stop() {
|
||||
q.runMutex.Lock()
|
||||
defer q.runMutex.Unlock()
|
||||
|
||||
if !q.running {
|
||||
return
|
||||
}
|
||||
|
||||
q.running = false
|
||||
close(q.quit)
|
||||
q.wg.Wait()
|
||||
}
|
||||
|
||||
// Add adds an item to the queue
|
||||
func (q *Queue) Add(item Item) {
|
||||
select {
|
||||
case q.items <- item:
|
||||
// Item added successfully
|
||||
default:
|
||||
// Queue is full
|
||||
q.logger.Info("Queue is full, dropping message")
|
||||
}
|
||||
}
|
||||
|
||||
// worker processes queue items
|
||||
func (q *Queue) worker(handler HandlerFunc) {
|
||||
defer q.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case item := <-q.items:
|
||||
// Process item
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
q.logger.Error("Panic in queue worker", "error", r)
|
||||
}
|
||||
}()
|
||||
|
||||
handler(item)
|
||||
}()
|
||||
case <-q.quit:
|
||||
// Quit worker
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue