butterrobot/internal/queue/queue.go
Felipe M. 72c6dd6982
All checks were successful
ci/woodpecker/tag/release Pipeline was successful
feat: remindme plugin
2025-04-22 11:29:39 +02:00

161 lines
3 KiB
Go

package queue
import (
"log/slog"
"sync"
"time"
"git.nakama.town/fmartingr/butterrobot/internal/model"
)
// Item represents a queue item
type Item struct {
Platform string
Request map[string]interface{}
}
// HandlerFunc defines a function that processes queue items
type HandlerFunc func(item Item)
// ReminderHandlerFunc defines a function that processes reminder items
type ReminderHandlerFunc func(reminder *model.Reminder)
// Queue represents a message queue
type Queue struct {
items chan Item
wg sync.WaitGroup
quit chan struct{}
logger *slog.Logger
running bool
runMutex sync.Mutex
reminderTicker *time.Ticker
reminderHandler ReminderHandlerFunc
}
// New creates a new Queue instance
func New(logger *slog.Logger) *Queue {
return &Queue{
items: make(chan Item, 100),
quit: make(chan struct{}),
logger: logger,
}
}
// Start starts processing queue items
func (q *Queue) Start(handler HandlerFunc) {
q.runMutex.Lock()
defer q.runMutex.Unlock()
if q.running {
return
}
q.running = true
// Start worker
q.wg.Add(1)
go q.worker(handler)
}
// StartReminderScheduler starts the reminder scheduler
func (q *Queue) StartReminderScheduler(handler ReminderHandlerFunc) {
q.runMutex.Lock()
defer q.runMutex.Unlock()
if q.reminderTicker != nil {
return
}
q.reminderHandler = handler
// Check for reminders every minute
q.reminderTicker = time.NewTicker(1 * time.Minute)
q.wg.Add(1)
go q.reminderWorker()
}
// Stop stops processing queue items
func (q *Queue) Stop() {
q.runMutex.Lock()
defer q.runMutex.Unlock()
if !q.running {
return
}
q.running = false
// Stop reminder ticker if it exists
if q.reminderTicker != nil {
q.reminderTicker.Stop()
}
close(q.quit)
q.wg.Wait()
}
// Add adds an item to the queue
func (q *Queue) Add(item Item) {
select {
case q.items <- item:
// Item added successfully
default:
// Queue is full
q.logger.Info("Queue is full, dropping message")
}
}
// worker processes queue items
func (q *Queue) worker(handler HandlerFunc) {
defer q.wg.Done()
for {
select {
case item := <-q.items:
// Process item
func() {
defer func() {
if r := recover(); r != nil {
q.logger.Error("Panic in queue worker", "error", r)
}
}()
handler(item)
}()
case <-q.quit:
// Quit worker
return
}
}
}
// reminderWorker processes reminder items on a schedule
func (q *Queue) reminderWorker() {
defer q.wg.Done()
for {
select {
case <-q.reminderTicker.C:
// This is triggered every minute to check for pending reminders
q.logger.Debug("Checking for pending reminders")
if q.reminderHandler != nil {
// The handler is responsible for fetching and processing reminders
func() {
defer func() {
if r := recover(); r != nil {
q.logger.Error("Panic in reminder worker", "error", r)
}
}()
// Call the handler with a nil reminder to indicate it should check the database
q.reminderHandler(nil)
}()
}
case <-q.quit:
// Quit worker
return
}
}
}