RapidGo
No results found

Queues

Background job processing with multiple drivers and automatic retries.

RapidGo's queue system lets you defer time-consuming tasks to background workers with automatic retries and failure handling.

Configuration

QUEUE_DRIVER=database    # database, redis, memory, sync
QUEUE_DEFAULT=default    # Default queue name
QUEUE_MAX_ATTEMPTS=3     # Max retry attempts
QUEUE_RETRY_DELAY=30     # Seconds between retries
QUEUE_TIMEOUT=60         # Job timeout in seconds

Available Drivers

Driver Description Best For
database GORM-backed persistent queue Production without Redis
redis Redis-backed queue High-throughput production
memory In-process queue Development and testing
sync Synchronous (no queue) Testing and debugging

Defining Jobs

Register job handlers by type name:

import "github.com/RAiWorks/RapidGo/v2/core/queue"

// In app/jobs/jobs.go
func RegisterJobs() {
    queue.RegisterHandler("send_welcome_email", HandleWelcomeEmail)
    queue.RegisterHandler("process_payment", HandlePayment)
}

func HandleWelcomeEmail(ctx context.Context, payload json.RawMessage) error {
    var data struct {
        UserID uint   `json:"user_id"`
        Email  string `json:"email"`
    }
    if err := json.Unmarshal(payload, &data); err != nil {
        return err
    }
    // Send the email...
    return nil
}

Dispatching Jobs

// Dispatch to the default queue
dispatcher.Dispatch(ctx, "default", "send_welcome_email", map[string]interface{}{
    "user_id": user.ID,
    "email":   user.Email,
})

// Dispatch with a delay
dispatcher.DispatchDelayed(ctx, "default", "process_payment", payload, 5*time.Minute)

Running Workers

Start a worker process to consume jobs:

go run cmd/main.go work

Worker Configuration

worker := queue.NewWorker(driver, queue.WorkerConfig{
    Queues:       []string{"default", "emails"},
    Concurrency:  4,           // Number of goroutines
    PollInterval: 3 * time.Second,
    MaxAttempts:  3,
    RetryDelay:   30 * time.Second,
    Timeout:      60 * time.Second,
}, logger)

worker.Run(ctx)  // Blocks until ctx is cancelled

Job Lifecycle

  1. Dispatch — Job is serialized and pushed to the driver
  2. Pop — Worker picks up the next available job
  3. Execute — Handler runs with timeout and panic recovery
  4. Success — Job is deleted from the queue
  5. Failure — Job is released back for retry (up to MaxAttempts)
  6. Dead — After max attempts, job is moved to failed storage

Job Struct

type Job struct {
    ID          uint64
    Queue       string
    Type        string          // Maps to registered handler
    Payload     json.RawMessage
    Attempts    uint
    MaxAttempts uint
    AvailableAt time.Time       // For delayed jobs
    ReservedAt  *time.Time      // Set when a worker picks it up
    CreatedAt   time.Time
}