package queue import ( "log/slog" "sync" ) // Item represents a queue item type Item struct { Platform string Request map[string]interface{} } // HandlerFunc defines a function that processes queue items type HandlerFunc func(item Item) // Queue represents a message queue type Queue struct { items chan Item wg sync.WaitGroup quit chan struct{} logger *slog.Logger running bool runMutex sync.Mutex } // New creates a new Queue instance func New(logger *slog.Logger) *Queue { return &Queue{ items: make(chan Item, 100), quit: make(chan struct{}), logger: logger, } } // Start starts processing queue items func (q *Queue) Start(handler HandlerFunc) { q.runMutex.Lock() defer q.runMutex.Unlock() if q.running { return } q.running = true // Start worker q.wg.Add(1) go q.worker(handler) } // Stop stops processing queue items func (q *Queue) Stop() { q.runMutex.Lock() defer q.runMutex.Unlock() if !q.running { return } q.running = false close(q.quit) q.wg.Wait() } // Add adds an item to the queue func (q *Queue) Add(item Item) { select { case q.items <- item: // Item added successfully default: // Queue is full q.logger.Info("Queue is full, dropping message") } } // worker processes queue items func (q *Queue) worker(handler HandlerFunc) { defer q.wg.Done() for { select { case item := <-q.items: // Process item func() { defer func() { if r := recover(); r != nil { q.logger.Error("Panic in queue worker", "error", r) } }() handler(item) }() case <-q.quit: // Quit worker return } } }