文档结构  
翻译进度:57%     翻译赏金:0 元 (?)    ¥ 我要打赏

Redis 有一个简单但功能强大的  PubSub API 。 它延迟低并具有很好的扩展性。 一条频道上发布的消息由订阅该频道的订阅者接收。 但是,如果没有找到活跃的订阅者时,该条消息就会丢失。对于那些需要使用未处理的已发布消息的消息持久化用例来说,这个缺点将 Redis 拒之门外。这也可能是支持 Redis 作为代理的几个开源项目使用它的列表 push / pop API 的原因。 在本文中,我将演示如何修改 Redis PubSub API 以支持消息持久化,使它可以应用到几个有趣的用例当中。

第 1 段(可获 1.44 积分)

最后发布的消息

能够在不订阅频道的情况下获取特定频道上最后发布的消息,这个功能为几个有趣的用例敞开了大门。src/pubsub.c:publishCommand 是 Redis 处理发布命令的地方。 让我们添加一行代码,将最近发布的消息保留在频道上:

void publishCommand(redisClient *c) {
    ....

    /* Persist last published message in channel specific key */
    setKey(c->db, c->argv[1], c->argv[2]);

    addReplyLongLong(....);
}

上面,我们添加了一个对 src/db.c:setKey  函数的调用,该函数将 c-> argv [1](频道名称)的值设置为 c-> argv [2](发布的消息)。

第 2 段(可获 0.84 积分)

在项目的根目录运行 make 命令并启动 ./src/redis-server. 现在我们可以这样操作:

127.0.0.1:6379> publish channel1 c1m1
(integer) 0
127.0.0.1:6379> get channel1
"c1m1"
127.0.0.1:6379> publish channel1 c1m2
(integer) 0
127.0.0.1:6379> get channel1
"c1m2"

瞧。 现在我们可以在没有订阅者的情况下发布消息。 然而,一个接入的用户仍然可以在通道上通过键  channel1 获取值而不用显式地订阅通道来使用最后发布的消息。

让我们更进一步。 XMPP 发布-订阅(XEP-0060)  定义了接收上次发布消息的规范。 它提到,

第 3 段(可获 0.9 积分)

当成功处理订阅请求时,服务可以将最近发布的消息发送给新的订阅者。

让我们将这个想法添加到 Redis PubSub 机制。 .src/pubsub.c:subscribeCommand  函数是 Redis 处理频道订阅请求的地方。 在该函数的末尾添加以下代码行。

void subscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the subscribed channel(s) */
    robj *o;
    for (j = 1; j < c->argc; j++) {
    	o = lookupKeyRead(c->db, c->argv[j]);
    	if(o != NULL) {
		addReply(c,shared.mbulkhdr[3]);
		addReply(c,shared.messagebulk);
		addReplyBulk(c,c->argv[j]);
		addReplyBulk(c,o);
    	}
    }
}
第 4 段(可获 0.6 积分)

Here, post subscription, we fetch and send the last published message for all channels that the client just subscribed to. make and restart ./src/redis-server. Now on a new ./src/redis-cli terminal subscribe to channel1:

127.0.0.1:6379> subscribe channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "message"
2) "channel1"
3) "c1m2"

Voila! Now Redis server will send the last published message upon subscription. But what about PSUBSCRIBE use case?

src/pubsub.c:psubscribeCommand handles pattern based channel subscription logic. Add following lines of code at the end of this function:

void psubscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the channel(s) matching subscribed patterns */
    for (j = 1; j < c->argc; j++) {
    	robj *pat = c->argv[j];
    	dictIterator *di = dictGetIterator(server.pubsub_channels);
    	dictEntry *de;
    	while((de = dictNext(di)) != NULL) {
		robj *cobj = dictGetKey(de);
		sds channel = cobj->ptr;
		if (stringmatchlen((char*)pat->ptr,
				sdslen(pat->ptr),
				(char*)channel,
				sdslen(channel), 0)) {
			robj *o = lookupKeyRead(c->db, cobj);
			if(o != NULL) {
	                addReply(c,shared.mbulkhdr[4]);
	                addReply(c,shared.pmessagebulk);
	                addReplyBulk(c,pat);
	                addReplyBulk(c,cobj);
	                addReplyBulk(c,o);
			}
		}
	}
	dictReleaseIterator(di);
    }
}

 

第 5 段(可获 0.81 积分)

Above, for every subscribed pattern, we iterate over active server.pubsub_channels and check if the active channel matches the subscription pattern. On match we fetch and send the last published message on the channel to the client.

With previous redis-cli subscribe terminal running, open a new terminal and try:

127.0.0.1:6379> psubscribe channel*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
1) "pmessage"
2) "channel*"
3) "channel1"
4) "c1m2"

Next

You can checkout my Redis fork and commits under pubsub-persistencebranch. Enhancements described above can also be found on this github commit.

第 6 段(可获 0.85 积分)

Currently it is unclear what Antirez (Sanfilippo Salvatore) plans to do further with PubSub in Redis. It stands on a solid base and recent efforts are rightly put behind Redis cluster. However, I see some interesting enhancements that can be made to Redis PubSub mainline. In the next post I will take the current idea one step ahead and add persistence support for all or only unprocessed published messages in a Redis list (possibly with a cap or expiration on persisted messages).

第 7 段(可获 1.03 积分)

文章评论