原文译文操作

Redis comes packed with a simple yet powerful PubSub API.  It provides low latency and scales well.  A message published on a channel is received by subscriber(s) at the other end.  However, if no active subscriber is found the message is simply lost.  This drawback puts Redis out of the probables list for several use cases where message persistence of unprocessed published messages is desired.  It’s also probably a reason why several open source projects that support Redis as a broker are based upon it’s list push / pop API.  In this post I will demonstrate how to modify Redis PubSub API to support message persistence, opening possibilities for several interesting use cases.

Redis 有一个简单但功能强大的  PubSub API 。 它延迟低并具有很好的扩展性。 一条频道上发布的消息由订阅该频道的订阅者接收。 但是,如果没有找到活跃的订阅者时,该条消息就会丢失。对于那些需要使用未处理的已发布消息的消息持久化用例来说,这个缺点将 Redis 拒之门外。这也可能是支持 Redis 作为代理的几个开源项目使用它的列表 push / pop API 的原因。 在本文中,我将演示如何修改 Redis PubSub API 以支持消息持久化,使它可以应用到几个有趣的用例当中。

纠正翻译

Last Published Message

Ability to fetch the last published message on a particular channel without subscribing to the channel opens doors for several interesting use cases.  src/pubsub.c:publishCommand is where Redis handles publish command.  Let’s add a line of code to persist the most recently published message on a channel:

void publishCommand(redisClient *c) {
    ....

    /* Persist last published message in channel specific key */
    setKey(c->db, c->argv[1], c->argv[2]);

    addReplyLongLong(....);
}

Above, we added a call to src/db.c:setKey function that sets the value of key c->argv[1] (channel name) to c->argv[2] (published message).

最后发布的消息

能够在不订阅频道的情况下获取特定频道上最后发布的消息,这个功能为几个有趣的用例敞开了大门。src/pubsub.c:publishCommand 是 Redis 处理发布命令的地方。 让我们添加一行代码,将最近发布的消息保留在频道上:

void publishCommand(redisClient *c) {
    ....

    /* Persist last published message in channel specific key */
    setKey(c->db, c->argv[1], c->argv[2]);

    addReplyLongLong(....);
}

上面,我们添加了一个对 src/db.c:setKey  函数的调用,该函数将 c-> argv [1](频道名称)的值设置为 c-> argv [2](发布的消息)。

纠正翻译

Run make from the project root directory and start ./src/redis-server. Now we can do something like:

127.0.0.1:6379> publish channel1 c1m1
(integer) 0
127.0.0.1:6379> get channel1
"c1m1"
127.0.0.1:6379> publish channel1 c1m2
(integer) 0
127.0.0.1:6379> get channel1
"c1m2"

Voila. We published a message with no subscriber. However, an incoming user can still be served with the last published message on the channel by fetching the value of key channel1 without explicitly subscribing to the channel.

Let’s take this idea one step ahead. XMPP Publish-Subscribe (XEP-0060) defines a specification for receiving the last published item. It says,

在项目的根目录运行 make 命令并启动 ./src/redis-server. 现在我们可以这样操作:

127.0.0.1:6379> publish channel1 c1m1
(integer) 0
127.0.0.1:6379> get channel1
"c1m1"
127.0.0.1:6379> publish channel1 c1m2
(integer) 0
127.0.0.1:6379> get channel1
"c1m2"

瞧。 现在我们可以在没有订阅者的情况下发布消息。 然而,一个接入的用户仍然可以在通道上通过键  channel1 获取值而不用显式地订阅通道来使用最后发布的消息。

让我们更进一步。 XMPP 发布-订阅(XEP-0060)  定义了接收上次发布消息的规范。 它提到,

纠正翻译

When a subscription request is successfully processed, the service MAY send the last published item to the new subscriber.

Let’s add this idea to Redis PubSub mechanism. src/pubsub.c:subscribeCommand function is where Redis processes channel subscription requests. Add the following lines of code at the end of this function.

void subscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the subscribed channel(s) */
    robj *o;
    for (j = 1; j < c->argc; j++) {
    	o = lookupKeyRead(c->db, c->argv[j]);
    	if(o != NULL) {
		addReply(c,shared.mbulkhdr[3]);
		addReply(c,shared.messagebulk);
		addReplyBulk(c,c->argv[j]);
		addReplyBulk(c,o);
    	}
    }
}

当成功处理订阅请求时,服务可以将最近发布的消息发送给新的订阅者。

让我们将这个想法添加到 Redis PubSub 机制。 .src/pubsub.c:subscribeCommand  函数是 Redis 处理频道订阅请求的地方。 在该函数的末尾添加以下代码行。

void subscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the subscribed channel(s) */
    robj *o;
    for (j = 1; j < c->argc; j++) {
    	o = lookupKeyRead(c->db, c->argv[j]);
    	if(o != NULL) {
		addReply(c,shared.mbulkhdr[3]);
		addReply(c,shared.messagebulk);
		addReplyBulk(c,c->argv[j]);
		addReplyBulk(c,o);
    	}
    }
}
纠正翻译

Here, post subscription, we fetch and send the last published message for all channels that the client just subscribed to. make and restart ./src/redis-server. Now on a new ./src/redis-cli terminal subscribe to channel1:

127.0.0.1:6379> subscribe channel1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "message"
2) "channel1"
3) "c1m2"

Voila! Now Redis server will send the last published message upon subscription. But what about PSUBSCRIBE use case?

src/pubsub.c:psubscribeCommand handles pattern based channel subscription logic. Add following lines of code at the end of this function:

void psubscribeCommand(redisClient *c) {
    ....

    /* Send last received message on the channel(s) matching subscribed patterns */
    for (j = 1; j < c->argc; j++) {
    	robj *pat = c->argv[j];
    	dictIterator *di = dictGetIterator(server.pubsub_channels);
    	dictEntry *de;
    	while((de = dictNext(di)) != NULL) {
		robj *cobj = dictGetKey(de);
		sds channel = cobj->ptr;
		if (stringmatchlen((char*)pat->ptr,
				sdslen(pat->ptr),
				(char*)channel,
				sdslen(channel), 0)) {
			robj *o = lookupKeyRead(c->db, cobj);
			if(o != NULL) {
	                addReply(c,shared.mbulkhdr[4]);
	                addReply(c,shared.pmessagebulk);
	                addReplyBulk(c,pat);
	                addReplyBulk(c,cobj);
	                addReplyBulk(c,o);
			}
		}
	}
	dictReleaseIterator(di);
    }
}

 

修改翻译

Above, for every subscribed pattern, we iterate over active server.pubsub_channels and check if the active channel matches the subscription pattern. On match we fetch and send the last published message on the channel to the client.

With previous redis-cli subscribe terminal running, open a new terminal and try:

127.0.0.1:6379> psubscribe channel*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1
1) "pmessage"
2) "channel*"
3) "channel1"
4) "c1m2"

Next

You can checkout my Redis fork and commits under pubsub-persistencebranch. Enhancements described above can also be found on this github commit.

修改翻译

Currently it is unclear what Antirez (Sanfilippo Salvatore) plans to do further with PubSub in Redis. It stands on a solid base and recent efforts are rightly put behind Redis cluster. However, I see some interesting enhancements that can be made to Redis PubSub mainline. In the next post I will take the current idea one step ahead and add persistence support for all or only unprocessed published messages in a Redis list (possibly with a cap or expiration on persisted messages).

修改翻译