Queues
Background job processing with multiple drivers and automatic retries.
RapidGo's queue system lets you defer time-consuming tasks to background workers with automatic retries and failure handling.
Configuration
QUEUE_DRIVER=database # database, redis, memory, sync
QUEUE_DEFAULT=default # Default queue name
QUEUE_MAX_ATTEMPTS=3 # Max retry attempts
QUEUE_RETRY_DELAY=30 # Seconds between retries
QUEUE_TIMEOUT=60 # Job timeout in seconds
Available Drivers
| Driver | Description | Best For |
|---|---|---|
database |
GORM-backed persistent queue | Production without Redis |
redis |
Redis-backed queue | High-throughput production |
memory |
In-process queue | Development and testing |
sync |
Synchronous (no queue) | Testing and debugging |
Defining Jobs
Register job handlers by type name:
import "github.com/RAiWorks/RapidGo/v2/core/queue"
// In app/jobs/jobs.go
func RegisterJobs() {
queue.RegisterHandler("send_welcome_email", HandleWelcomeEmail)
queue.RegisterHandler("process_payment", HandlePayment)
}
func HandleWelcomeEmail(ctx context.Context, payload json.RawMessage) error {
var data struct {
UserID uint `json:"user_id"`
Email string `json:"email"`
}
if err := json.Unmarshal(payload, &data); err != nil {
return err
}
// Send the email...
return nil
}
Dispatching Jobs
// Dispatch to the default queue
dispatcher.Dispatch(ctx, "default", "send_welcome_email", map[string]interface{}{
"user_id": user.ID,
"email": user.Email,
})
// Dispatch with a delay
dispatcher.DispatchDelayed(ctx, "default", "process_payment", payload, 5*time.Minute)
Running Workers
Start a worker process to consume jobs:
go run cmd/main.go work
Worker Configuration
worker := queue.NewWorker(driver, queue.WorkerConfig{
Queues: []string{"default", "emails"},
Concurrency: 4, // Number of goroutines
PollInterval: 3 * time.Second,
MaxAttempts: 3,
RetryDelay: 30 * time.Second,
Timeout: 60 * time.Second,
}, logger)
worker.Run(ctx) // Blocks until ctx is cancelled
Job Lifecycle
- Dispatch — Job is serialized and pushed to the driver
- Pop — Worker picks up the next available job
- Execute — Handler runs with timeout and panic recovery
- Success — Job is deleted from the queue
- Failure — Job is released back for retry (up to MaxAttempts)
- Dead — After max attempts, job is moved to failed storage
Job Struct
type Job struct {
ID uint64
Queue string
Type string // Maps to registered handler
Payload json.RawMessage
Attempts uint
MaxAttempts uint
AvailableAt time.Time // For delayed jobs
ReservedAt *time.Time // Set when a worker picks it up
CreatedAt time.Time
}