jqz1226 ZUEL 发布于2025-06-29
回复 17
浏览 3837
53
多人反复询问stream模式,而且不知道接受信号的代码放在哪儿。对于实时性要求不高的策略,可以简单地把 consumer的代码放到handle_bar里面,1分钟读取一次。如果对实时性要求高,就需要开辟单独的线程进行堵塞式读取。
声明:以下代码由DeepSeek生成
```Python
from typing import Dict, List
import redis
class RedisStreamManager:
def __init__(self, stream_name: str, redis_url: str = "", timeout: int = 2, max_length: int = 100000):
self.redis_url = redis_url or "redis://:password@host_ip:6379"
self.redis = self._connect_to_redis()
self.stream_name = stream_name
self.timeout = timeout
self.max_length = max_length
self.redis.xtrim(self.stream_name, maxlen=self.max_length, approximate=False)
def _connect_to_redis(self):
try:
client = redis.StrictRedis.from_url(self.redis_url, decode_responses=True)
client.ping()
return client
except redis.ConnectionError as e:
raise e
def add(self, message_data: Dict[str, str]) -> str:
return self.redis.xadd(self.stream_name, message_data, maxlen=self.max_length)
def ensure_group(self, group_name: str):
try:
self.redis.xgroup_create(self.stream_name, group_name, id="0", mkstream=True)
except redis.exceptions.ResponseError as e:
if "BUSYGROUP Consumer Group name already exists" not in str(e):
raise e
def consume(self, group_name: str, consumer_name: str, count: int = 10) -> List[Dict]:
self.ensure_group(group_name)
raw_messages = self.redis.xreadgroup(
group_name, consumer_name, {self.stream_name: ">"}, count=count, block=5000
)
messages = []
for _, message_list in raw_messages:
for message_id, message_data in message_list:
messages.append({
"message_id": message_id,
"message_data": dict(message_data)
})
return messages
def ack(self, group_name: str, message_id: str):
return self.redis.xack(self.stream_name, group_name, message_id)
```
这个类将为生产者和消费者公用。
```Python
# Producer 生产消息
stream_manager = RedisStreamManager("test_stream")
message = {"event": "user_login", "user_id": "12345"}
msg_id = stream_manager.add(message)
print(f"Produced message with ID: {msg_id}")
```
```Python
# Consumer 消费消息
stream_manager = RedisStreamManager("test_stream")
while True:
messages = stream_manager.consume("test_group", "consumer1")
for msg in messages:
print(f"Consumed message: {msg['message_data']}")
stream_manager.ack("test_group", msg["message_id"])
```
Redis的pub/sub模式,是收音机,你没有打开收音机之前(sub之前),无线电台发送的信息(pub),永远消失在宇宙里面了。你能收到的节目,只是开机(sub)之后的。
< -
Redis的stream模式,具有持久性。可以想象成Producer.XADD生产的消息“存储在磁盘上”,消费者睡了懒觉起来,也不会错过。
Redis的stream,可以被多个消费组(Consumer Group)重复消费,Group1消费了,Group2还可以再消费,并不会因为已经被Group1消费过了,Group2再来,就货架空空了。
Redis的Stream,对同一个消费组来说,**消费者之间是竞争关系**。消费组Group1中的消费者consumer1一旦读取了stream中的某条消息,那同组的消费者consumer2,就不可能再看见这条消息了。想象一下,一个饭店突然一下子涌进来十几桌客人,一个服务员肯定忙不过来,需要多个服务员同时服务。而服务员A一旦开始接待message1,服务员B就不可能再凑过去也服务于message1了。
最后,服务员A给message1服务完毕了,需要告知(ACKnowledge)一下大家,免得其它服务员看见message1还在桌子上坐着但没有服务员在边上站着了,就又过来端茶倒水点菜啥的。
评论
点赞,现在redis相比旧版还有个好处是ACL可以设置权限,可以创建个用户只有pub/sub权限,那么连接就不需要怕管理员账密泄露了
2025-06-29
点赞收藏,蒋老师出品必属精品
2025-06-29
我有一个思路,之前一直用的wxpusher消息推送出了浏览器插件,可以推送到浏览器上,收到的推送信息会进入windows的通知里,通过实时监听Windows通知,实现自动下单。不知道是否可行
2025-06-29
stream都用上了, 你其实在搞高频吧? 能分享些经验吗?
2025-06-30
@O_iX 就自己用,也不用担心管理员账号泄密的问题。Windows版的Redis6及以上的版本,资源比较少,我尝试自己编译了一个。
2025-07-01
感谢蒋老师!
大家在用P T_R_E_A_D的时候,会出现外部的redis服务器连不上么?
2025-07-01
蒋老师怎么获得?我邮箱885412596@qq.com,能发我一份学习吗,谢谢
2025-07-02
@玄铁重剑 PT开放外网了?redis库有吗?
2025-07-12
向蒋老师学习。现在有一个策略,可以看看。桐峰小市值-debug第二版,10年千倍。
2025-07-19
@玄铁重剑 同问,解决了吗?
2025-08-15
将老师是华中科大计算机院校的老师吗?嘻嘻
2025-09-06
@猪猪向前冲 我去,蒋老师真是学校老师!
2025-11-06
蒋老师能给个交易端代码学习吗?690933977@qq.com,感谢
2025-11-07