RocketMQ 延迟消息
什么是延迟消息 (RocketMQ 4.5.1)
RocketMq支持的消息类型
- Simple Message
- Order Message (FIFO)
- Broadcasting
- Scheduled messages
- Batch
- Transaction messages
延迟消息跟普通消息的不一样之处就是, 他们不会立即发送, 而是等到给定时间后才投递
如何使用延迟消息
从官网给出的例子看, 只需要设置消息属性delayTimeLevel
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
RocketMQ 内部是怎么实现的
// DefaultMQProducerImpl#sendKernelImpl
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
// 标识为延迟消息
context.setMsgType(MessageType.Delay_Msg);
}
发送前判定是否含有属性值, 如果有, 设置为延迟消息的消息类型MessageType.Delay_Msg
BrokerController 启动会调用 ScheduleMessageService#start
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
- 根据配置的delayLevelTable, 创建 Timer 添加 TimerTask.
DeliverDelayedMessageTimerTask
- level对应延迟等级, offset 是已经转化成普通消息的偏移量
ScheduleMessageService.this.persist()
10s
DeliverDelayedMessageTimerTask
public void executeOnTimeup() {
// 读取队列 SCHEDULE_TOPIC_XXXX,
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 读延迟消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
// 投递时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 计算是否需要处理
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
// 还原普通消息
// 跟事物消息差不多. 还原真实的topic,清除延迟消息特殊属性
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 剩下的走普通消息逻辑
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 计算下次的偏移量
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新偏移量缓存表 `offsetTable`
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
- 根据不同延迟级别读取延迟消息
- 获取此级别的偏移量
- 还原普通消息(与事务消息差不多. 清除特定标识, 还原真实topic), 走普通消息逻辑
- 修改保存偏移量, 创建新 TimeTask
ScheduleMessageService.this.persist()
持久化ScheduleMessageService
相关进度到磁盘
broker
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 如果是commit的事务消息
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 设置延迟消息topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 备份真实topic queueId
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
- 备份真实topic
- 保存到特殊的延迟消息topic里
SCHEDULE_TOPIC_XXXX
总结
- 延迟消息保存在
SCHEDULE_TOPIC_XXX
中 - 延迟粒度对应是queueId
问题
RocketMq 使用一个Timer处理延迟消息有个缺陷, Timer 是单线程处理的, 如果处理调用卡了, 就会有高延迟发生, 延迟消息就非常的延迟…