This commit is contained in:
parent
21e4c434fd
commit
72c6dd6982
12 changed files with 695 additions and 48 deletions
|
@ -3,6 +3,9 @@ package queue
|
|||
import (
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.nakama.town/fmartingr/butterrobot/internal/model"
|
||||
)
|
||||
|
||||
// Item represents a queue item
|
||||
|
@ -14,14 +17,19 @@ type Item struct {
|
|||
// HandlerFunc defines a function that processes queue items
|
||||
type HandlerFunc func(item Item)
|
||||
|
||||
// ReminderHandlerFunc defines a function that processes reminder items
|
||||
type ReminderHandlerFunc func(reminder *model.Reminder)
|
||||
|
||||
// Queue represents a message queue
|
||||
type Queue struct {
|
||||
items chan Item
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
logger *slog.Logger
|
||||
running bool
|
||||
runMutex sync.Mutex
|
||||
items chan Item
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
logger *slog.Logger
|
||||
running bool
|
||||
runMutex sync.Mutex
|
||||
reminderTicker *time.Ticker
|
||||
reminderHandler ReminderHandlerFunc
|
||||
}
|
||||
|
||||
// New creates a new Queue instance
|
||||
|
@ -49,6 +57,24 @@ func (q *Queue) Start(handler HandlerFunc) {
|
|||
go q.worker(handler)
|
||||
}
|
||||
|
||||
// StartReminderScheduler starts the reminder scheduler
|
||||
func (q *Queue) StartReminderScheduler(handler ReminderHandlerFunc) {
|
||||
q.runMutex.Lock()
|
||||
defer q.runMutex.Unlock()
|
||||
|
||||
if q.reminderTicker != nil {
|
||||
return
|
||||
}
|
||||
|
||||
q.reminderHandler = handler
|
||||
|
||||
// Check for reminders every minute
|
||||
q.reminderTicker = time.NewTicker(1 * time.Minute)
|
||||
|
||||
q.wg.Add(1)
|
||||
go q.reminderWorker()
|
||||
}
|
||||
|
||||
// Stop stops processing queue items
|
||||
func (q *Queue) Stop() {
|
||||
q.runMutex.Lock()
|
||||
|
@ -59,6 +85,12 @@ func (q *Queue) Stop() {
|
|||
}
|
||||
|
||||
q.running = false
|
||||
|
||||
// Stop reminder ticker if it exists
|
||||
if q.reminderTicker != nil {
|
||||
q.reminderTicker.Stop()
|
||||
}
|
||||
|
||||
close(q.quit)
|
||||
q.wg.Wait()
|
||||
}
|
||||
|
@ -96,4 +128,34 @@ func (q *Queue) worker(handler HandlerFunc) {
|
|||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reminderWorker processes reminder items on a schedule
|
||||
func (q *Queue) reminderWorker() {
|
||||
defer q.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-q.reminderTicker.C:
|
||||
// This is triggered every minute to check for pending reminders
|
||||
q.logger.Debug("Checking for pending reminders")
|
||||
|
||||
if q.reminderHandler != nil {
|
||||
// The handler is responsible for fetching and processing reminders
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
q.logger.Error("Panic in reminder worker", "error", r)
|
||||
}
|
||||
}()
|
||||
|
||||
// Call the handler with a nil reminder to indicate it should check the database
|
||||
q.reminderHandler(nil)
|
||||
}()
|
||||
}
|
||||
case <-q.quit:
|
||||
// Quit worker
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue