Location>code7788 >text

ThinkPHP Integrated Redis Queue: From Beginner to Actual Technology Sharing

Popularity:726 ℃/2025-04-24 10:45:26

1. Introduction

In a distributed system architecture, asynchronous processing, service decoupling and traffic peak cutting are the core requirements for improving system performance.Redis As a high-performance in-memory database, with its rich data structures (e.g.ListStreamSorted Set) and lightweight features are ideal for implementing queue functions. This article will combineThinkPHP The characteristics of the framework explain in detail how to passRedis The queue builds a highly available and scalable asynchronous processing system, covering basic concepts, environmental configuration, practical cases and best practices.

two,Redis Analysis of the core concept of queue

2.1 Why chooseRedis queue?

Redis The core advantages of queues are reflected in three aspects:

  1. Extreme performance: Based on memory operation, single node supports tens of thousands of levels QPS, meet the real-time response needs in high concurrency scenarios.
  2. Lightweight deployment: No need to Kafka/RabbitMQ Complex configurations of middleware such asPHP Extended integration, suitable for the rapid implementation of small and medium-sized businesses.
  3. Flexible structure: Provides multiple data structures to adapt to different business scenarios:

◦ FIFO queue(List: Based on left in and right out (LPUSH/RPOP) Implement simple asynchronous tasks such as order status updates.

◦ Priority queue (Sorted Set: Pass the score (Score) Controls the task execution order and is suitable for urgent processing of high-priority orders.

◦ Persistence queue (Stream: Supports message persistence, group consumption and confirmation mechanisms, suitable for reliable message delivery under the microservice architecture.

2.2 Comparison of core data structures

 

Data structure

characteristic

Typical scenarios

Redis Core Commands

ThinkPHP Operation example

List

First-in, first-out, simple and efficient

SMS sending and log asynchronous writing

lpush/rpop, brpop

$redis->lpush('queue:log', json_encode($log))

Stream

Persistence, group consumption

Distributed task scheduling, message retry

xadd, xgroup, xreadgroup

$redis->xadd('stream:task', '*', $fields)

Sorted Set

Priority / Delay processing

Coupon expiration reminder, timeout order cancellation

zadd, zrange, zrem

$redis->zadd('delay:order', time()+60, $oid)

3. Development environment construction and configuration

3.1 Depend on installation

3.1.1 PHP Redis Extended installation

 

# Method 1: ByPECL Installphpredis(recommend)

pecl install redis  

# Method 2: PassComposer InstallPredis(Applicable to cluster environment)

composer require predis/predis  

3.1.2 ThinkPHP Configuration adjustment

Revise config/, configuration Redis Connection parameters:

 

return [  

    'default' => [  

        'type'       => 'redis',  

        'host'       => env('', '127.0.0.1'),  // Support environment variable injection

        'port'       => env('', 6379),  

        'password'   => env('', ''),  

        'select'     => 0,                               // Database index (0-15)  

        'timeout'    => 5,                               // Connection timeout (seconds)

        'persistent' => true,                             // Enable long connection (recommended to enable it in production environment)

    ],  

    // Cluster configuration example (suitable for high availability scenarios)

    'cluster' => [  

        'type'      => 'redis',  

        'mode'      => 'cluster',  

        'nodes'     => [  

            ['host' => '', 'port' => 6380],  

            ['host' => '', 'port' => 6381],  

        ],  

        'password'  => 'cluster_pass',  

        'timeout'   => 3,  

    ]  

];  

IV. Based on List Basic queue combat

4.1 Queue operation core code

4.1.1 Join the queue operation (left stack press)

 

use think\facade\Cache;  

$redis = Cache::store('redis')->handler();  

// storageJSON Format task data (recommended method)

$task = [  

    'task_id'   => uniqid(),  

    'type'      => 'order_process',  

    'data'      => ['order_id' => '20231205001', 'amount' => 299.99]  

];  

$redis->lpush('queue:default', json_encode($task));  

4.1.2 Dequeue operation (blocking right pop-up)

 

// Dedicated to consumer scripts (blocking and waiting for tasks to avoid empty polling)

$result = $redis->brpop('queue:default', 10); // 10 Second timeout

if ($result) {  

    [$queueName, $taskJson] = $result;  

    $task = json_decode($taskJson, true);  

    // Execute business logic

    $this->handleTask($task);  

}  

4.2 Asynchronous order processing case

4.2.1 Front-end order interface (controller)

 

// app/controller/  

public function submitOrder() {  

    $orderData = $this->request->post();  

    // Verify order data...  

    // Asynchronous processing of joining the queue

    $redis = Cache::store('redis')->handler();  

    $redis->lpush('queue:order', json_encode([  

        'order_id'   => $orderData['order_id'],  

        'product_id' => $orderData['product_id'],  

        'quantity'   => $orderData['quantity']  

    ]));  

    return json(['code' => 200, 'msg' => 'The order is successfully placed, the system is processing']);  

}  

4.2.2 Backend consumer scripts (scripts/order_consumer.php

 

<?php  

require __DIR__ . '/../../thinkphp/';  

$redis = app(\think\cache\driver\Redis::class)->handler();  

while (true) {  

    $result = $redis->brpop('queue:order', 10);  

    if (!$result) continue;  

    $task = json_decode($result[1], true);  

    try {  

        // Simulated inventory deduction (actually call service)

        $this->deductStock($task['product_id'], $task['quantity']);  

        // Simulated logistics notification

        $this->sendLogisticsNotice($task['order_id']);  

        echo "[".date('Y-m-d H:i:s')."] Task completion:{$task['order_id']}\n";  

    } catch (\Exception $e) {  

        // Retry mechanism (most3 Second-rate)

        $this->retryTask($task, $e, 3);  

    }  

}  

4.2.3 Start consumer services

 

# Front desk operation (easy to debug)

php scripts/order_consumer.php  

# Background daemon running

nohup php scripts/order_consumer.php > 2>&1 &  

V. Based on Stream Advanced Queue Applications

5.1 Stream Queue Core Features

  • Persistent storage: The message is persisted to disk by default, and it supports continuing to process unfinished tasks after restarting.
  • Group consumption: Multiple consumers form a consumer group (Consumer Group), realize task load balancing (such as multipleworker Nodes jointly process orders).
  • Message confirmation mechanism:pass XACK The command marks the message has been processed to avoid duplicate execution or data loss.

5.2 Distributed task processing example

5.2.1 createStream And produce messages

 

// Production side: Add a task with retry times

$redis->xadd('stream:task', '*', [  

    'task_type' => 'payment_notify',  

    'order_id'  => '20231206001',  

    'retry'     => 0, // Initial retry times

    'create_at' => time()  

]);  

5.2.2 Initialize the consumer group

 

// Create a consumption group when running for the first time (consumption starts with the latest news)

$redis->xgroup('CREATE', 'stream:task', 'group_workers', '$', true);  

// If you need consumption history news,'$' Replace with'0-0'  

5.2.3 Consumer group node processing logic

 

// Consumer node1)  

$messages = $redis->xreadgroup(  

    'GROUP', 'group_workers', 'worker_1',  

    'STREAMS', 'stream:task', '>' // Get unconfirmed messages

);  

if ($messages) {  

    foreach ($messages[0][1] as $msgId => $fields) {  

        try {  

            $this->handlePaymentNotify($fields['order_id']);  

            $redis->xack('stream:task', 'group_workers', $msgId); // Confirm message

            echo "Worker1 deal with:{$fields['order_id']}\n";  

        } catch (\Exception $e) {  

            if ((int)$fields['retry'] < 3) {  

                // Increase the number of retry and re-enter

                $fields['retry'] = (int)$fields['retry'] + 1;  

                $redis->xadd('stream:task', '*', $fields);  

            } else {  

                // Record dead letter queue

                $redis->xadd('stream:deadletter', '*', $fields);  

            }  

        }  

    }  

}  

6. Best practices in production environment

6.1 Message Serialization Specification

  • Forced use JSON Format

 

// Recommended practices

$redis->lpush('queue', json_encode($data, JSON_UNESCAPED_UNICODE));  

// No usePHP Native serialization

// $redis->lpush('queue', serialize($data));  

  • Data verification: The consumer side needs to perform field verification on the deserialized data to avoid service abnormalities due to format errors.
  • AOF model: Recommended configuration appendfsync everysec, taking into account performance and data security (up to lose 1 seconds data).
  • RDB Backup: Generate regularly RDB Snapshots are used for disaster recovery, and it is recommended to cooperate with cloud storage (e.g.S3) Implement off-site backup.
  • Redis Cluster: Suitable for hyperscale data, supporting automatic sharding and failover.
  • Sentinel Sentinel mode: Monitor the status of the master and slave nodes, and automatically complete the master and slave switching. Configuration example:

6.2 Persistence and high availability configurations

6.2.1 Redis Persistence strategy

6.2.2 Cluster Solution

 

// ThinkPHP Sentinel mode configuration

'sentinel' => [  

    'type'      => 'redis',  

    'mode'      => 'sentinel',  

    'master'    => 'mymaster',  

    'sentinels' => [  

        ['host' => '', 'port' => 26379],  

        ['host' => '', 'port' => 26379],  

    ],  

    'password'  => 'sentinel_pass',  

]  

6.3 Performance optimization tips

  1. Batch operation:use LPUSH Push multiple tasks at once to reduce the network I/O frequency:

 

$redis->lpush('queue:batch', $task1, $task2, $task3);  

  1. Queue length control:pass LTRIM Limit the maximum queue length to prevent memory overflow:

 

$redis->ltrim('queue:order', 0, 999); // Keep the latest1000 Message

  1. Connection pool multiplexing:exist ThinkPHP Open long connection in medium (persistent => true) to avoid the overhead of frequent connection creation.

6.4 Idepotency design

  • Only task ID: Carry each task UUID Or a unique business identification (such as an order number), the consumer side passesRedis Distributed locks ensure idempotence:

 

$lockKey = "lock:task:{$task['task_id']}";  

if ($redis->set($lockKey, 1, ['NX', 'PX' => 60000])) {  

    // Execute business logic

}  

7. Extended functions and architecture evolution

7.1 Delay queue implementation

use Sorted Set The score (timestamp) of the task delays execution:

 

// Set the delay time (unit: seconds) when joining the queue

$delayTime = 60; // Delay1 Minute execution

$redis->zadd('delay:queue', time() + $delayTime, json_encode($task));  

// Consumer timed scan expired tasks

$now = time();  

$tasks = $redis->zrangebyscore('delay:queue', 0, $now, ['LIMIT' => 0, 100]);  

foreach ($tasks as $taskJson) {  

    $redis->zrem('delay:queue', $taskJson);  

    $this->handleDelayedTask(json_decode($taskJson, true));  

}  

7.2 Dead letter queue and monitoring

  • Dead letter queue: Transfer tasks that failed to retry to a standalone queue (e.g. stream:deadletter), manual intervention.
  • Monitoring system

◦ Queue length warning: when LLEN queue:order > 1000 triggers an alarm.

◦ Consumer status monitoring: via LASTMSGID Command to check the lag of consumption groups.

7.3 Technical selection suggestions

 

Business scenarios

Recommended data structure

Core advantages

Typical configuration

Simple asynchronous notifications

List

Lightweight and efficient, millisecond response

Single node + Non-persistent

Distributed task scheduling

Stream

Group consumption, guarantee message reliability

Consumer group + AOF Persistence

High priority task processing

Sorted Set

Dynamic priority adjustment

Score (Score+ Regular scan

8. Summary

Redis Queue andThinkPHP The combination of asynchronous processing provides a lightweight solution from the basicList Queue to advancedStream Group consumption can meet the needs of businesses of different sizes. In actual development, we need to focus on message reliability (persistence, retry mechanism), performance optimization (batch operations, connection pooling), and system stability (idempotence, monitoring and alarm). Through rational useRedis Data structure andThinkPHP The framework features can effectively improve the scalability and risk resistance of the system, laying a solid foundation for distributed architecture.

9. Reference resources

  1. Redis official documentation
  2. ThinkPHP Cache Driver Development Guide
  3. Redis design and implementation
  4. PHP Redis extension manual

This article covers the entire ThinkPHP integratedRedis The entire process of the queue provides imaginable code examples from basic concepts to production practices. If you need to further explore the optimization solutions or extension functions for specific scenarios, you are welcome to provide more business details.