靠山

今天在脉脉上面看到了一个帖子,对照有意思:

这个帖子的意思是:在使用Kafka的时刻,我们已经设置了多个分区,若何去提升消费能力?若是使用线程池的方式去提升若何保证重启时新闻不丢。

这个题实在问了两个点,第一个是若何提升消费能力,第二个是若是选择线程池,我们若何做到新闻不丢。

这里先解释一下这两个问题到底是怎么回事,在许多新闻行列中都有一个看法叫partion,代表着分区,分区是我们提高新闻行列消费的要害,我们的消费者消费的渠道就是从每个分区中来的,一个分区只能被一个消费者持有,如下图所示:

有点类似银行排队,行列的个数越多,排队的时间相对来说就会越少,固然也可以通过异步的方式去处置,好比线程池,把所有的新闻都扔到线程池中去执行,这就引出了作者说的第二个问题,首先我们来看看同步消费为什么不会丢新闻呢?

若是我们使用的是同步模子,当我们消费了之后会将offset ack回去,若是我们泛起了重启,没有乐成offset,那么这部分数据将会再次消费,若是是用线程池举行消费,那么我们若何举行ack呢,好比我们用线程池消费了 10,11,12 三条新闻若是12先消费完,那么我们ack 13吗?若是这样做的话,这个时刻重启,kafka就会以为你已经处置了10,11的新闻,这个时刻新闻就会泛起丢失,而发这个帖子的同砚就是对于这一块是对照疑惑。

网友的回覆

我们来看看网友的一些回覆:

网友A:

这名网友的回覆本质照样使用线程池,作者也回复了,并没有解决线程池的问题。

网友B:


这个方式类似银行排队,只要行列多,那么处置速率就会加速,的确是第一个问题的解决设施之一。

网友C:

,

欧博手机版下载

欢迎进入欧博手机版下载(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。

,

这一类主要解决了第二个问题,通过外部维护offset,好比通过offset入库的方式,我们就能找到准确的应该消费的offset,这个相对来说对照复杂,使用一个MQ还得配套一个数据库,万一我使用MQ的服务基本都没有数据库,还得单独去申请。

网友D:

另有另外一种看法就是,代码写好一点,让消费的速率提高,那消费能力自然就上去了,这个的确是一个很主要的点,通常被其他人给忽略,有时刻消费对照慢,许多人可能一上来就是思量中间件应该怎么设置,往往会忽略自己的代码。

看了这么多帖子的一个回复,感受没有真正能让我满足的谜底,下面来说说我心中的一些思绪。

我的想法

对于第一个问题的话,若何提升消费能力?这个问题实在可以总结为三个设施:

  1. 若是每台消费者机械消费线程是牢固的,那么我们可以扩容消费机械和partion,类似银行排队增添排队窗口一样。
  2. 若是机械和partion是牢固的,增添消费线程就是一个对照好的设施,然则若是是顺序消费,就不能通过增添线程数的方式来提升消费能力,由于顺序消费每个partion都是一个单独的线程,只能通过第一种方式去解决。
  3. 增添自身代码的消费能力,你想想若是银行做事,若是柜员的做事效率能提升的异常高,那么整个排队速率一定也是很快的。
    对于第二个问题,若是我们使用线程池模子,若何去解决新闻丢失问题,这里我对照推荐的是RocketMQ中的做法,我们之前说了用数据库去保留offset对照复杂,性能还对照差,在RocketMQ中使用了一个TreeMap的结构做了我们上面提到的数据库的事:
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

这个TreeMap的key是每个message的offset,value就是这条新闻的一些信息,TreeMap的底层是使用红黑树去实现的,我们可以很快获取其中的最小值和最大值,当我们每次处置完某一条新闻的时刻我们会将这条新闻从msgTreeMap中移除,

public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {
                            removedCnt--;
                            msgSize.addAndGet(0 - msg.getBody().length);
                        }
                    }
                    msgCount.addAndGet(removedCnt);

                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
            log.error("removeMessage exception", t);
        }
        return result;
    }

removeMessage这个方式就是移除已经消费过的新闻,而且返回当前最新的消费offset,这里返回的效果就是msgTreeMap.firstKey(),我们ack给新闻行列server的值实在也是这个,回到我们这个问题上,若是我们发生重启,那么实在也不需要忧郁我们会泛起新闻丢失。

最后

这里只是简朴的对新闻行列提升新闻能力做了一些先容,若是人人对新闻行列有兴趣的,可以看我之前的一些文章:

  • 你必须要知道的kafka
  • 你应该知道的RocketMQ
  • 深入明白RocketMq通俗新闻和顺序新闻使用,原理,优化
  • 深度剖析若何实现事务新闻
    若是人人以为这篇文章对你有辅助,你的关注和转发是对我最大的支持,