/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MessagesImpl;
import org.apache.pulsar.client.impl.MultiMessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicConsumerStatsRecorderImpl;
import org.apache.pulsar.client.impl.PartitionsChangedListener;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.UnAckedMessageTracker;
import org.apache.pulsar.client.impl.UnAckedTopicMessageTracker;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableMap;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.apache.pulsar.shade.javax.annotation.Nullable;
import org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTopicsConsumerImpl<T>
extends ConsumerBase<T> {
    public static final String DUMMY_TOPIC_NAME_PREFIX = "MultiTopicsConsumer-";
    protected final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
    protected final ConcurrentHashMap<String, Integer> partitionedTopics;
    protected final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
    AtomicInteger allTopicPartitionsNumber;
    private volatile boolean paused = false;
    private final Object pauseMutex = new Object();
    private volatile Timeout partitionsAutoUpdateTimeout = null;
    TopicsPartitionChangedListener topicsPartitionChangedListener;
    CompletableFuture<Void> partitionsAutoUpdateFuture = null;
    private final MultiTopicConsumerStatsRecorderImpl stats;
    private final ConsumerConfigurationData<T> internalConfig;
    private final MessageIdAdv startMessageId;
    private volatile boolean duringSeek = false;
    private final long startMessageRollbackDurationInSec;
    private final ConsumerInterceptors<T> internalConsumerInterceptors;
    private TimerTask partitionsAutoUpdateTimerTask = new TimerTask(){

        @Override
        public void run(Timeout timeout) throws Exception {
            try {
                if (timeout.isCancelled() || MultiTopicsConsumerImpl.this.getState() != HandlerState.State.Ready) {
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] run partitionsAutoUpdateTimerTask", (Object)MultiTopicsConsumerImpl.this.topic);
                }
                if (MultiTopicsConsumerImpl.this.partitionsAutoUpdateFuture == null || MultiTopicsConsumerImpl.this.partitionsAutoUpdateFuture.isDone()) {
                    MultiTopicsConsumerImpl.this.partitionsAutoUpdateFuture = MultiTopicsConsumerImpl.this.topicsPartitionChangedListener.onTopicsExtended(MultiTopicsConsumerImpl.this.partitionedTopics.keySet());
                }
            }
            catch (Throwable th) {
                log.warn("Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled.", th);
            }
            finally {
                MultiTopicsConsumerImpl.this.partitionsAutoUpdateTimeout = MultiTopicsConsumerImpl.this.client.timer().newTimeout(MultiTopicsConsumerImpl.this.partitionsAutoUpdateTimerTask, MultiTopicsConsumerImpl.this.conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
            }
        }
    };
    private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);

    MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
        this(client, DUMMY_TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric(5), conf, executorProvider, subscribeFuture, schema, interceptors, createTopicIfDoesNotExist);
    }

    MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, long startMessageRollbackDurationInSec) {
        this(client, DUMMY_TOPIC_NAME_PREFIX + RandomStringUtils.randomAlphanumeric(5), conf, executorProvider, subscribeFuture, schema, interceptors, createTopicIfDoesNotExist, startMessageId, startMessageRollbackDurationInSec);
    }

    MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
        this(client, singleTopic, conf, executorProvider, subscribeFuture, schema, interceptors, createTopicIfDoesNotExist, null, 0L);
    }

    MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist, MessageId startMessageId, long startMessageRollbackDurationInSec) {
        super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture, schema, interceptors);
        this.internalConsumerInterceptors = interceptors != null ? this.getInternalConsumerInterceptors(interceptors) : null;
        Preconditions.checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer");
        this.partitionedTopics = new ConcurrentHashMap();
        this.consumers = new ConcurrentHashMap();
        this.pausedConsumers = new ConcurrentLinkedQueue();
        this.allTopicPartitionsNumber = new AtomicInteger(0);
        this.startMessageId = (MessageIdAdv)startMessageId;
        this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
        this.paused = conf.isStartPaused();
        this.internalConfig = this.getInternalConsumerConfig();
        MultiTopicConsumerStatsRecorderImpl multiTopicConsumerStatsRecorderImpl = this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new MultiTopicConsumerStatsRecorderImpl(this) : null;
        if (conf.isAutoUpdatePartitions()) {
            this.topicsPartitionChangedListener = new TopicsPartitionChangedListener();
            this.partitionsAutoUpdateTimeout = client.timer().newTimeout(this.partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
        }
        if (conf.getTopicNames().isEmpty()) {
            this.setState(HandlerState.State.Ready);
            this.subscribeFuture().complete(this);
            return;
        }
        Preconditions.checkArgument(MultiTopicsConsumerImpl.topicNamesValid(conf.getTopicNames()), "Subscription topics include duplicate items or invalid names.");
        List futures = conf.getTopicNames().stream().map(t2 -> this.subscribeAsync((String)t2, createTopicIfDoesNotExist)).collect(Collectors.toList());
        ((CompletableFuture)FutureUtil.waitForAll(futures).thenAccept(finalFuture -> {
            if (this.allTopicPartitionsNumber.get() > this.getCurrentReceiverQueueSize()) {
                this.setCurrentReceiverQueueSize(this.allTopicPartitionsNumber.get());
            }
            this.setState(HandlerState.State.Ready);
            this.startReceivingMessages(new ArrayList<ConsumerImpl<T>>(this.consumers.values()));
            log.info("[{}] [{}] Created topics consumer with {} sub-consumers", new Object[]{this.topic, this.subscription, this.allTopicPartitionsNumber.get()});
            this.subscribeFuture().complete(this);
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to subscribe topics: {}, closing consumer", (Object)this.topic, (Object)ex.getMessage());
            this.closeAsync().whenComplete((res, closeEx) -> {
                if (closeEx != null) {
                    log.error("[{}] Failed to unsubscribe after failed consumer creation: {}", (Object)this.topic, (Object)closeEx.getMessage());
                }
                subscribeFuture.completeExceptionally((Throwable)ex);
            });
            return null;
        });
    }

    private static boolean topicNamesValid(Collection<String> topics) {
        Preconditions.checkState(topics != null && topics.size() >= 1, "topics should contain more than 1 topic");
        HashSet<TopicName> topicNames = new HashSet<TopicName>();
        for (String topic : topics) {
            if (!TopicName.isValid(topic)) {
                log.warn("Received invalid topic name: {}", (Object)topic);
                return false;
            }
            topicNames.add(TopicName.get(topic));
        }
        if (topicNames.size() == topics.size()) {
            return true;
        }
        log.warn("Topic names not unique. unique/all : {}/{}", (Object)topicNames.size(), (Object)topics.size());
        return false;
    }

    private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}", new Object[]{this.topic, newConsumers.size(), this.getState()});
        }
        if (this.getState() == HandlerState.State.Ready) {
            newConsumers.forEach(consumer -> {
                consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), consumer.getCurrentReceiverQueueSize());
                this.internalPinnedExecutor.execute(() -> this.receiveMessageFromConsumer((ConsumerImpl<T>)consumer, true));
            });
        }
    }

    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
        if (this.duringSeek) {
            log.info("[{}] Pause receiving messages for topic {} due to seek", (Object)this.subscription, (Object)consumer.getTopic());
            return;
        }
        CompletionStage messagesFuture = batchReceive ? consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl)msgs).getMessageList()) : consumer.receiveAsync().thenApply(Collections::singletonList);
        ((CompletableFuture)((CompletableFuture)messagesFuture).thenAcceptAsync(messages -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Receive message from sub consumer:{}", new Object[]{this.topic, this.subscription, consumer.getTopic()});
            }
            if (this.getState() == HandlerState.State.Closed) {
                return;
            }
            messages.forEach(msg -> {
                boolean skipDueToSeek = this.duringSeek;
                MessageImpl msgImpl = (MessageImpl)msg;
                ClientCnx cnx = msgImpl.getCnx();
                boolean isValidEpoch = this.isValidConsumerEpoch(msgImpl);
                if (isValidEpoch && !skipDueToSeek) {
                    this.messageReceived(consumer, (Message<T>)msg);
                } else if (!isValidEpoch) {
                    consumer.increaseAvailablePermits(cnx);
                } else if (skipDueToSeek) {
                    log.info("[{}] [{}] Skip processing message {} received during seek", new Object[]{this.topic, this.subscription, msg.getMessageId()});
                }
            });
            int size = this.incomingMessages.size();
            int maxReceiverQueueSize = this.getCurrentReceiverQueueSize();
            int sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
            if (size >= maxReceiverQueueSize || size > sharedQueueResumeThreshold && !this.pausedConsumers.isEmpty()) {
                this.pausedConsumers.add(consumer);
                this.resumeReceivingFromPausedConsumersIfNeeded();
            } else {
                this.receiveMessageFromConsumer(consumer, messages.size() > 0);
            }
        }, (Executor)this.internalPinnedExecutor)).exceptionally(ex -> {
            if (ex instanceof PulsarClientException.AlreadyClosedException || ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                return null;
            }
            log.error("Receive operation failed on consumer {} - Retrying later", (Object)consumer, ex);
            ((ScheduledExecutorService)this.client.getScheduledExecutorProvider().getExecutor()).schedule(() -> this.receiveMessageFromConsumer(consumer, true), 10L, TimeUnit.SECONDS);
            return null;
        });
    }

    private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
        CompletableFuture receivedFuture;
        Preconditions.checkArgument(message instanceof MessageImpl);
        TopicMessageImpl<T> topicMessage = new TopicMessageImpl<T>(consumer.getTopic(), message, consumer);
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received message from topics-consumer {}", new Object[]{this.topic, this.subscription, message.getMessageId()});
        }
        if ((receivedFuture = this.nextPendingReceive()) != null) {
            this.unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount());
            Message<T> interceptMessage = this.beforeConsume(topicMessage);
            this.completePendingReceive(receivedFuture, interceptMessage);
        } else if (this.enqueueMessageAndCheckBatchReceive(topicMessage) && this.hasPendingBatchReceive()) {
            this.notifyPendingBatchReceivedCallBack();
        }
        this.tryTriggerListener();
    }

    @Override
    protected synchronized void messageProcessed(Message<?> msg) {
        this.unAckedMessageTracker.add(msg.getMessageId(), msg.getRedeliveryCount());
        this.decreaseIncomingMessageSize(msg);
    }

    private void resumeReceivingFromPausedConsumersIfNeeded() {
        if (this.incomingMessages.size() <= this.getCurrentReceiverQueueSize() / 2 && !this.pausedConsumers.isEmpty()) {
            ConsumerImpl<T> consumer;
            while ((consumer = this.pausedConsumers.poll()) != null) {
                this.internalPinnedExecutor.execute(() -> this.receiveMessageFromConsumer(consumer, true));
            }
        }
    }

    @Override
    public int minReceiverQueueSize() {
        int size = Math.min(1, this.maxReceiverQueueSize);
        if (this.batchReceivePolicy.getMaxNumMessages() > 0) {
            size = Math.max(size, this.batchReceivePolicy.getMaxNumMessages());
        }
        if (this.allTopicPartitionsNumber != null) {
            size = Math.max(this.allTopicPartitionsNumber.get(), size);
        }
        return size;
    }

    @Override
    protected Message<T> internalReceive() throws PulsarClientException {
        try {
            if (this.incomingMessages.isEmpty()) {
                this.expectMoreIncomingMessages();
            }
            Message message = (Message)this.incomingMessages.take();
            this.decreaseIncomingMessageSize(message);
            Preconditions.checkState(message instanceof TopicMessageImpl);
            this.unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
            this.resumeReceivingFromPausedConsumersIfNeeded();
            return this.beforeConsume(message);
        }
        catch (Exception e) {
            ExceptionHandler.handleInterruptedException(e);
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    @Override
    protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarClientException {
        try {
            Message message;
            if (this.incomingMessages.isEmpty()) {
                this.expectMoreIncomingMessages();
            }
            if ((message = (Message)this.incomingMessages.poll(timeout, unit)) != null) {
                this.decreaseIncomingMessageSize(message);
                Preconditions.checkArgument(message instanceof TopicMessageImpl);
                this.trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
                message = this.listener == null ? this.beforeConsume(message) : message;
            }
            this.resumeReceivingFromPausedConsumersIfNeeded();
            return message;
        }
        catch (Exception e) {
            ExceptionHandler.handleInterruptedException(e);
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    @Override
    protected Messages<T> internalBatchReceive() throws PulsarClientException {
        try {
            return this.internalBatchReceiveAsync().get();
        }
        catch (InterruptedException | ExecutionException e) {
            ExceptionHandler.handleInterruptedException(e);
            HandlerState.State state = this.getState();
            if (state != HandlerState.State.Closing && state != HandlerState.State.Closed) {
                this.stats.incrementNumBatchReceiveFailed();
                throw PulsarClientException.unwrap((Throwable)e);
            }
            return null;
        }
    }

    @Override
    protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
        CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            if (this.hasEnoughMessagesForBatchReceive()) {
                this.notifyPendingBatchReceivedCallBack(result);
            } else {
                this.expectMoreIncomingMessages();
                ConsumerBase.OpBatchReceive opBatchReceive = ConsumerBase.OpBatchReceive.of(result);
                this.pendingBatchReceives.add(opBatchReceive);
                this.triggerBatchReceiveTimeoutTask();
                cancellationHandler.setCancelAction(() -> this.pendingBatchReceives.remove(opBatchReceive));
            }
            this.resumeReceivingFromPausedConsumersIfNeeded();
        });
        return result;
    }

    @Override
    protected CompletableFuture<Message<T>> internalReceiveAsync() {
        CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
        this.internalPinnedExecutor.execute(() -> {
            Message message = (Message)this.incomingMessages.poll();
            if (message == null) {
                this.expectMoreIncomingMessages();
                this.pendingReceives.add(result);
                cancellationHandler.setCancelAction(() -> this.pendingReceives.remove(result));
            } else {
                this.decreaseIncomingMessageSize(message);
                Preconditions.checkState(message instanceof TopicMessageImpl);
                this.unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
                this.resumeReceivingFromPausedConsumersIfNeeded();
                result.complete(this.beforeConsume(message));
            }
        });
        return result;
    }

    @Override
    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txnImpl) {
        if (!(messageId instanceof TopicMessageId)) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotAllowedException("Only TopicMessageId is allowed to acknowledge for a multi-topics consumer, while messageId is " + messageId.getClass().getName()));
        }
        if (this.getState() != HandlerState.State.Ready) {
            return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
        }
        ConsumerImpl<T> consumer = this.consumers.get(((TopicMessageId)messageId).getOwnerTopic());
        if (consumer == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotConnectedException());
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            return consumer.acknowledgeCumulativeAsync(messageId);
        }
        return consumer.doAcknowledgeWithTxn(messageId, ackType, properties, txnImpl).thenRun(() -> this.unAckedMessageTracker.remove(messageId));
    }

    @Override
    protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, CommandAck.AckType ackType, Map<String, Long> properties, TransactionImpl txn) {
        for (MessageId messageId2 : messageIdList) {
            if (messageId2 instanceof TopicMessageId) continue;
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotAllowedException("Only TopicMessageId is allowed to acknowledge for a multi-topics consumer, while messageId is " + messageId2.getClass().getName()));
        }
        ArrayList resultFutures = new ArrayList();
        if (ackType == CommandAck.AckType.Cumulative) {
            messageIdList.forEach(messageId -> resultFutures.add(this.doAcknowledge((MessageId)messageId, ackType, properties, txn)));
        } else {
            if (this.getState() != HandlerState.State.Ready) {
                return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
            }
            HashMap topicToMessageIdMap = new HashMap();
            for (MessageId messageId3 : messageIdList) {
                String ownerTopic = ((TopicMessageId)messageId3).getOwnerTopic();
                topicToMessageIdMap.putIfAbsent(ownerTopic, new ArrayList());
                ((List)topicToMessageIdMap.get(ownerTopic)).add(messageId3);
            }
            IdentityHashMap<ConsumerImpl, List> consumerToMessageIds = new IdentityHashMap<ConsumerImpl, List>();
            for (Map.Entry entry : topicToMessageIdMap.entrySet()) {
                ConsumerImpl<T> consumer2 = this.consumers.get(entry.getKey());
                if (consumer2 == null) {
                    return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotConnectedException());
                }
                consumerToMessageIds.put(consumer2, (List)entry.getValue());
            }
            consumerToMessageIds.forEach((consumer, messageIds) -> resultFutures.add(consumer.doAcknowledgeWithTxn((List<MessageId>)messageIds, ackType, properties, txn).thenAccept(res -> messageIdList.forEach(this.unAckedMessageTracker::remove))));
        }
        return CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0]));
    }

    @Override
    protected CompletableFuture<Void> doReconsumeLater(Message<?> message, CommandAck.AckType ackType, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
        MessageId messageId = message.getMessageId();
        if (messageId == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId"));
        }
        if (!(messageId instanceof TopicMessageId)) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotAllowedException("Only TopicMessageId is allowed for reconsumeLater for a multi-topics consumer, while messageId is " + message.getClass().getName()));
        }
        TopicMessageId topicMessageId = (TopicMessageId)messageId;
        if (this.getState() != HandlerState.State.Ready) {
            return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            Consumer individualConsumer = this.consumers.get(topicMessageId.getOwnerTopic());
            if (individualConsumer != null) {
                return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
            }
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotConnectedException());
        }
        ConsumerImpl<T> consumer = this.consumers.get(topicMessageId.getOwnerTopic());
        return consumer.doReconsumeLater(message, ackType, customProperties, delayTime, unit).thenRun(() -> this.unAckedMessageTracker.remove((MessageId)topicMessageId));
    }

    public void negativeAcknowledge(MessageId messageId) {
        Preconditions.checkArgument(messageId instanceof TopicMessageId);
        ConsumerImpl<T> consumer = this.consumers.get(((TopicMessageId)messageId).getOwnerTopic());
        consumer.negativeAcknowledge(messageId);
        this.unAckedMessageTracker.remove(messageId);
    }

    @Override
    public void negativeAcknowledge(Message<?> message) {
        MessageId messageId = message.getMessageId();
        Preconditions.checkArgument(messageId instanceof TopicMessageId);
        ConsumerImpl<T> consumer = this.consumers.get(((TopicMessageId)messageId).getOwnerTopic());
        consumer.negativeAcknowledge(message);
        this.unAckedMessageTracker.remove(messageId);
    }

    @Override
    public CompletableFuture<Void> unsubscribeAsync(boolean force) {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
        }
        this.setState(HandlerState.State.Closing);
        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<Void>();
        List futureList = this.consumers.values().stream().map(c -> c.unsubscribeAsync(force)).collect(Collectors.toList());
        ((CompletableFuture)FutureUtil.waitForAll(futureList).thenComposeAsync(r -> {
            this.setState(HandlerState.State.Closed);
            this.cleanupMultiConsumer();
            log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer", new Object[]{this.topic, this.subscription, this.consumerName});
            return this.failPendingReceive();
        }, (Executor)this.internalPinnedExecutor)).whenComplete((r, ex) -> {
            if (ex == null) {
                unsubscribeFuture.complete(null);
            } else {
                this.setState(HandlerState.State.Failed);
                unsubscribeFuture.completeExceptionally((Throwable)ex);
                log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer", new Object[]{this.topic, this.subscription, this.consumerName, ex.getCause()});
            }
        });
        return unsubscribeFuture;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            if (this.unAckedMessageTracker != null) {
                this.unAckedMessageTracker.close();
            }
            return CompletableFuture.completedFuture(null);
        }
        this.setState(HandlerState.State.Closing);
        if (this.partitionsAutoUpdateTimeout != null) {
            this.partitionsAutoUpdateTimeout.cancel();
            this.partitionsAutoUpdateTimeout = null;
        }
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        List futureList = this.consumers.values().stream().map(consumer -> consumer.closeAsync().exceptionally(t2 -> {
            Throwable cause = FutureUtil.unwrapCompletionException(t2);
            if (!(cause instanceof PulsarClientException.AlreadyClosedException)) {
                log.warn("[{}] [{}] Error closing individual consumer", new Object[]{consumer.getTopic(), consumer.getSubscription(), cause});
            }
            return null;
        })).collect(Collectors.toList());
        ((CompletableFuture)FutureUtil.waitForAll(futureList).thenComposeAsync(r -> {
            this.setState(HandlerState.State.Closed);
            this.cleanupMultiConsumer();
            log.info("[{}] [{}] Closed Topics Consumer", (Object)this.topic, (Object)this.subscription);
            return this.failPendingReceive();
        }, (Executor)this.internalPinnedExecutor)).whenComplete((r, ex) -> {
            if (ex == null) {
                closeFuture.complete(null);
            } else {
                this.setState(HandlerState.State.Failed);
                closeFuture.completeExceptionally((Throwable)ex);
                log.error("[{}] [{}] Could not close Topics Consumer", new Object[]{this.topic, this.subscription, ex.getCause()});
            }
        });
        return closeFuture;
    }

    private void cleanupMultiConsumer() {
        if (this.unAckedMessageTracker != null) {
            this.unAckedMessageTracker.close();
            this.unAckedMessageTracker = null;
        }
        if (this.partitionsAutoUpdateTimeout != null) {
            this.partitionsAutoUpdateTimeout.cancel();
            this.partitionsAutoUpdateTimeout = null;
        }
        this.client.cleanupConsumer(this);
    }

    public boolean isConnected() {
        return this.consumers.values().stream().allMatch(consumer -> consumer.isConnected());
    }

    @Override
    String getHandlerName() {
        return this.subscription;
    }

    private ConsumerConfigurationData<T> getInternalConsumerConfig() {
        Object internalConsumerConfig = this.conf.clone();
        ((ConsumerConfigurationData)internalConsumerConfig).setSubscriptionName(this.subscription);
        ((ConsumerConfigurationData)internalConsumerConfig).setConsumerName(this.consumerName);
        ((ConsumerConfigurationData)internalConsumerConfig).setMessageListener(null);
        return internalConsumerConfig;
    }

    public void redeliverUnacknowledgedMessages() {
        this.internalPinnedExecutor.execute(() -> {
            this.incomingQueueLock.lock();
            try {
                CONSUMER_EPOCH.incrementAndGet(this);
                this.consumers.values().stream().forEach(consumer -> {
                    consumer.redeliverUnacknowledgedMessages();
                    consumer.unAckedChunkedMessageIdSequenceMap.clear();
                });
                this.clearIncomingMessages();
                this.unAckedMessageTracker.clear();
                this.resumeReceivingFromPausedConsumersIfNeeded();
            }
            finally {
                this.incomingQueueLock.unlock();
            }
        });
    }

    @Override
    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
        if (messageIds.isEmpty()) {
            return;
        }
        for (MessageId messageId : messageIds) {
            Preconditions.checkArgument(messageId instanceof TopicMessageId);
        }
        if (this.conf.getSubscriptionType() != SubscriptionType.Shared && this.conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
            this.redeliverUnacknowledgedMessages();
            return;
        }
        this.removeExpiredMessagesFromQueue(messageIds);
        messageIds.stream().collect(Collectors.groupingBy(msgId -> ((TopicMessageIdImpl)msgId).getOwnerTopic(), Collectors.toSet())).forEach((topicName, messageIds1) -> this.consumers.get(topicName).redeliverUnacknowledgedMessages((Set<MessageId>)messageIds1));
        this.resumeReceivingFromPausedConsumersIfNeeded();
    }

    @Override
    protected void updateAutoScaleReceiverQueueHint() {
        this.scaleReceiverQueueHint.set(this.incomingMessages.size() >= this.getCurrentReceiverQueueSize());
    }

    @Override
    protected void completeOpBatchReceive(ConsumerBase.OpBatchReceive<T> op) {
        this.notifyPendingBatchReceivedCallBack(op.future);
        this.resumeReceivingFromPausedConsumersIfNeeded();
    }

    public void seek(MessageId messageId) throws PulsarClientException {
        try {
            this.seekAsync(messageId).get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void seek(long timestamp) throws PulsarClientException {
        try {
            this.seekAsync(timestamp).get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public void seek(Function<String, Object> function) throws PulsarClientException {
        try {
            this.seekAsync(function).get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
        return this.seekAllAsync(consumer -> consumer.seekAsync(function));
    }

    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        ConsumerImpl<T> internalConsumer;
        if (messageId instanceof TopicMessageId) {
            TopicMessageId topicMessageId = (TopicMessageId)messageId;
            internalConsumer = this.consumers.get(topicMessageId.getOwnerTopic());
            if (internalConsumer == null) {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotAllowedException("The owner topic " + topicMessageId.getOwnerTopic() + " is not subscribed"));
            }
        } else {
            internalConsumer = null;
        }
        if (internalConsumer == null && MultiTopicsConsumerImpl.isIllegalMultiTopicsMessageId(messageId)) {
            return FutureUtil.failedFuture(new PulsarClientException("Illegal messageId, messageId can only be earliest/latest"));
        }
        if (internalConsumer == null) {
            return this.seekAllAsync(consumer -> consumer.seekAsync(messageId));
        }
        return this.seekAsyncInternal(Collections.singleton(internalConsumer), __ -> __.seekAsync(messageId));
    }

    public CompletableFuture<Void> seekAsync(long timestamp) {
        return this.seekAllAsync(consumer -> consumer.seekAsync(timestamp));
    }

    private CompletableFuture<Void> seekAsyncInternal(Collection<ConsumerImpl<T>> consumers, Function<ConsumerImpl<T>, CompletableFuture<Void>> seekFunc) {
        this.beforeSeek();
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        FutureUtil.waitForAll(consumers.stream().map(seekFunc).collect(Collectors.toList())).whenComplete((__, e) -> this.afterSeek(future, (Throwable)e));
        return future;
    }

    private CompletableFuture<Void> seekAllAsync(Function<ConsumerImpl<T>, CompletableFuture<Void>> seekFunc) {
        return this.seekAsyncInternal(this.consumers.values(), seekFunc);
    }

    private void beforeSeek() {
        this.duringSeek = true;
        this.unAckedMessageTracker.clear();
        this.clearIncomingMessages();
    }

    private void afterSeek(CompletableFuture<Void> seekFuture, @Nullable Throwable throwable) {
        this.duringSeek = false;
        log.info("[{}] Resume receiving messages for {} since seek is done", (Object)this.subscription, (Object)this.consumers.keySet());
        this.startReceivingMessages(new ArrayList<ConsumerImpl<T>>(this.consumers.values()));
        if (throwable == null) {
            seekFuture.complete(null);
        } else {
            seekFuture.completeExceptionally(throwable);
        }
    }

    @Override
    public int getAvailablePermits() {
        return this.consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
    }

    public boolean hasReachedEndOfTopic() {
        return this.consumers.values().stream().allMatch(Consumer::hasReachedEndOfTopic);
    }

    public boolean hasMessageAvailable() throws PulsarClientException {
        try {
            return this.hasMessageAvailableAsync().get();
        }
        catch (Exception e) {
            throw PulsarClientException.unwrap((Throwable)e);
        }
    }

    public CompletableFuture<Boolean> hasMessageAvailableAsync() {
        if (this.numMessagesInQueue() > 0) {
            return CompletableFuture.completedFuture(true);
        }
        ArrayList<CompletionStage> futureList = new ArrayList<CompletionStage>();
        AtomicBoolean hasMessageAvailable = new AtomicBoolean(false);
        for (ConsumerImpl<T> consumer : this.consumers.values()) {
            futureList.add(consumer.hasMessageAvailableAsync().thenAccept(isAvailable -> {
                if (isAvailable.booleanValue()) {
                    hasMessageAvailable.compareAndSet(false, true);
                }
            }));
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<Boolean>();
        FutureUtil.waitForAll(futureList).whenComplete((result, exception) -> {
            if (exception != null) {
                completableFuture.completeExceptionally((Throwable)exception);
            } else {
                completableFuture.complete(hasMessageAvailable.get() || this.numMessagesInQueue() > 0);
            }
        });
        return completableFuture;
    }

    @Override
    public int numMessagesInQueue() {
        return this.incomingMessages.size() + this.consumers.values().stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
    }

    public synchronized ConsumerStats getStats() {
        if (this.stats == null) {
            return null;
        }
        this.stats.reset();
        this.consumers.forEach((partition, consumer) -> this.stats.updateCumulativeStats((String)partition, consumer.getStats()));
        return this.stats;
    }

    @Override
    public UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
        Message peek = (Message)this.incomingMessages.peek();
        if (peek != null) {
            if (!messageIds.contains(peek.getMessageId())) {
                return;
            }
            Message message = (Message)this.incomingMessages.poll();
            Preconditions.checkState(message instanceof TopicMessageImpl);
            while (message != null) {
                this.decreaseIncomingMessageSize(message);
                MessageId messageId = message.getMessageId();
                if (!messageIds.contains(messageId)) {
                    messageIds.add(messageId);
                    break;
                }
                message.release();
                message = (Message)this.incomingMessages.poll();
            }
        }
    }

    private TopicName getTopicName(String topic) {
        try {
            return TopicName.get(topic);
        }
        catch (Exception ignored) {
            return null;
        }
    }

    private String getFullTopicName(String topic) {
        TopicName topicName = this.getTopicName(topic);
        return topicName != null ? topicName.toString() : null;
    }

    private void removeTopic(String topic) {
        String fullTopicName = this.getFullTopicName(topic);
        if (fullTopicName != null) {
            this.partitionedTopics.remove(topic);
        }
    }

    public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
        TopicName topicNameInstance = this.getTopicName(topicName);
        if (topicNameInstance == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Topic name not valid"));
        }
        String fullTopicName = topicNameInstance.toString();
        if (this.consumers.containsKey(fullTopicName) || this.partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName));
        }
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
        }
        CompletableFuture<Void> subscribeResult = new CompletableFuture<Void>();
        ((CompletableFuture)this.client.getPartitionedTopicMetadata(topicName, true, false).thenAccept(metadata -> this.subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, createTopicIfDoesNotExist))).exceptionally(ex1 -> {
            log.warn("[{}] Failed to get partitioned topic metadata: {}", (Object)fullTopicName, (Object)ex1.getMessage());
            subscribeResult.completeExceptionally((Throwable)ex1);
            return null;
        });
        return subscribeResult;
    }

    public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, int numPartitions, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
        Preconditions.checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 topic for partitioned consumer");
        Object cloneConf = conf.clone();
        String topicName = ((ConsumerConfigurationData)cloneConf).getSingleTopic();
        ((ConsumerConfigurationData)cloneConf).getTopicNames().remove(topicName);
        CompletableFuture<Consumer<T>> future = new CompletableFuture<Consumer<T>>();
        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, executorProvider, future, schema, interceptors, true);
        ((CompletableFuture)((CompletableFuture)future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))).thenRun(() -> subscribeFuture.complete(consumer))).exceptionally(e -> {
            log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}", new Object[]{topicName, numPartitions, e});
            consumer.cleanupMultiConsumer();
            subscribeFuture.completeExceptionally(PulsarClientException.wrap((Throwable)e.getCause(), (String)String.format("Failed to subscribe %s with %d partitions", topicName, numPartitions)));
            return null;
        });
        return consumer;
    }

    CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) {
        TopicName topicNameInstance = this.getTopicName(topicName);
        if (topicNameInstance == null) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Topic name not valid"));
        }
        String fullTopicName = topicNameInstance.toString();
        if (this.consumers.containsKey(fullTopicName)) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName));
        }
        if (!topicNameInstance.isPartitioned() && this.partitionedTopics.containsKey(topicNameInstance.getPartitionedTopicName())) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName));
        }
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
        }
        CompletableFuture<Void> subscribeResult = new CompletableFuture<Void>();
        this.subscribeTopicPartitions(subscribeResult, fullTopicName, numberPartitions, true);
        return subscribeResult;
    }

    private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions, boolean createIfDoesNotExist) {
        ((CompletableFuture)this.client.preProcessSchemaBeforeSubscribe(this.client, this.schema, topicName).thenAccept(schema -> this.doSubscribeTopicPartitions((Schema<T>)schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist))).exceptionally(cause -> {
            subscribeResult.completeExceptionally((Throwable)cause);
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSubscribeTopicPartitions(Schema<T> schema, CompletableFuture<Void> subscribeResult, String topicName, int numPartitions, boolean createIfDoesNotExist) {
        CompletionStage subscribeAllPartitionsFuture;
        if (log.isDebugEnabled()) {
            log.debug("Subscribe to topic {} metadata.partitions: {}", (Object)topicName, (Object)numPartitions);
        }
        if (numPartitions != 0) {
            boolean isTopicBeingSubscribedForInOtherThread;
            boolean bl = isTopicBeingSubscribedForInOtherThread = this.partitionedTopics.putIfAbsent(topicName, numPartitions) != null;
            if (isTopicBeingSubscribedForInOtherThread) {
                String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. Topic is already being subscribed for in other thread.", this.topic, topicName);
                log.warn(errorMessage);
                subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
                return;
            }
            this.allTopicPartitionsNumber.addAndGet(numPartitions);
            int receiverQueueSize = Math.min(this.conf.getReceiverQueueSize(), this.conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
            ConsumerConfigurationData<T> configurationData = this.getInternalConsumerConfig();
            configurationData.setReceiverQueueSize(receiverQueueSize);
            CompletableFuture<List<Integer>> partitionsFuture = createIfDoesNotExist || !TopicName.get(topicName).isPersistent() ? CompletableFuture.completedFuture(IntStream.range(0, numPartitions).mapToObj(i -> i).collect(Collectors.toList())) : this.getExistsPartitions(topicName.toString());
            subscribeAllPartitionsFuture = partitionsFuture.thenCompose(partitions -> {
                if (partitions.isEmpty()) {
                    this.partitionedTopics.remove(topicName, numPartitions);
                    return CompletableFuture.completedFuture(null);
                }
                ArrayList<CompletableFuture<Consumer<T>>> subscribeList = new ArrayList<CompletableFuture<Consumer<T>>>();
                Iterator iterator = partitions.iterator();
                while (iterator.hasNext()) {
                    int partitionIndex = (Integer)iterator.next();
                    String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
                    CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<Consumer<T>>();
                    configurationData.setStartPaused(this.paused);
                    ConsumerImpl<T> newConsumer = this.createInternalConsumer(configurationData, partitionName, partitionIndex, subFuture, createIfDoesNotExist, schema);
                    Object object = this.pauseMutex;
                    synchronized (object) {
                        if (this.paused) {
                            newConsumer.pause();
                        } else {
                            newConsumer.resume();
                        }
                        Consumer originalValue = this.consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                        if (originalValue != null) {
                            newConsumer.closeAsync().exceptionally(ex -> {
                                log.error("[{}] [{}] Failed to close the orphan consumer", new Object[]{partitionName, this.subscription, ex});
                                return null;
                            });
                        }
                    }
                    subscribeList.add(subFuture);
                }
                return FutureUtil.waitForAll(subscribeList);
            });
        } else {
            this.allTopicPartitionsNumber.incrementAndGet();
            CompletableFuture subscribeFuture = new CompletableFuture();
            subscribeAllPartitionsFuture = subscribeFuture.thenAccept(__ -> {});
            Object object = this.pauseMutex;
            synchronized (object) {
                this.consumers.compute(topicName, (key, existingValue) -> {
                    if (existingValue != null) {
                        String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. Topic is already being subscribed for in other thread.", this.topic, topicName);
                        log.warn(errorMessage);
                        subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
                        return existingValue;
                    }
                    this.internalConfig.setStartPaused(this.paused);
                    ConsumerImpl<T> newConsumer = this.createInternalConsumer(this.internalConfig, topicName, -1, subscribeFuture, createIfDoesNotExist, schema);
                    if (this.paused) {
                        newConsumer.pause();
                    } else {
                        newConsumer.resume();
                    }
                    return newConsumer;
                });
            }
        }
        ((CompletableFuture)((CompletableFuture)subscribeAllPartitionsFuture).thenAccept(finalFuture -> {
            if (this.allTopicPartitionsNumber.get() > this.getCurrentReceiverQueueSize()) {
                this.setCurrentReceiverQueueSize(this.allTopicPartitionsNumber.get());
            }
            this.startReceivingMessages(this.consumers.values().stream().filter(consumer1 -> {
                String consumerTopicName = consumer1.getTopic();
                return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(TopicName.get(topicName).getPartitionedTopicName());
            }).collect(Collectors.toList()));
            subscribeResult.complete(null);
            log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}", new Object[]{this.topic, this.subscription, topicName, numPartitions, this.allTopicPartitionsNumber.get()});
        })).exceptionally(ex -> {
            log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", new Object[]{this.topic, topicName, ex.getMessage()});
            this.handleSubscribeOneTopicError(topicName, (Throwable)ex, subscribeResult);
            return null;
        });
    }

    private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> configurationData, String partitionName, int partitionIndex, CompletableFuture<Consumer<T>> subFuture, boolean createIfDoesNotExist, Schema<T> schema) {
        BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder().maxNumMessages(Math.max(((ConsumerConfigurationData)configurationData).getReceiverQueueSize() / 2, 1)).maxNumBytes(-1).timeout(1, TimeUnit.MILLISECONDS).build();
        ((ConsumerConfigurationData)configurationData).setBatchReceivePolicy(internalBatchReceivePolicy);
        configurationData = ((ConsumerConfigurationData)configurationData).clone();
        return ConsumerImpl.newConsumerImpl(this.client, partitionName, configurationData, this.client.externalExecutorProvider(), partitionIndex, true, this.listener != null, subFuture, this.startMessageId, schema, this.internalConsumerInterceptors, createIfDoesNotExist, this.startMessageRollbackDurationInSec);
    }

    protected void handleSubscribeOneTopicError(String topicName, Throwable error, CompletableFuture<Void> subscribeFuture) {
        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", new Object[]{this.topic, topicName, error.getMessage()});
        this.client.externalExecutorProvider().getExecutor().execute(() -> {
            AtomicInteger toCloseNum = new AtomicInteger(0);
            List<ConsumerImpl> filterConsumers = this.consumers.values().stream().filter(consumer1 -> {
                String consumerTopicName = consumer1.getTopic();
                if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(TopicName.get(topicName).getPartitionedTopicName())) {
                    toCloseNum.incrementAndGet();
                    return true;
                }
                return false;
            }).collect(Collectors.toList());
            if (filterConsumers.isEmpty()) {
                subscribeFuture.completeExceptionally(error);
                return;
            }
            filterConsumers.forEach(consumer2 -> consumer2.closeAsync().whenComplete((r, ex) -> {
                consumer2.subscribeFuture().completeExceptionally(error);
                this.allTopicPartitionsNumber.decrementAndGet();
                this.consumers.remove(consumer2.getTopic());
                if (toCloseNum.decrementAndGet() == 0) {
                    log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer, subscribe error: {}", new Object[]{this.topic, topicName, error.getMessage()});
                    this.removeTopic(topicName);
                    subscribeFuture.completeExceptionally(error);
                }
            }));
        });
    }

    public CompletableFuture<Void> unsubscribeAsync(String topicName) {
        Preconditions.checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
        }
        if (this.partitionsAutoUpdateTimeout != null) {
            this.partitionsAutoUpdateTimeout.cancel();
            this.partitionsAutoUpdateTimeout = null;
        }
        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<Void>();
        String topicPartName = TopicName.get(topicName).getPartitionedTopicName();
        List consumersToUnsub = this.consumers.values().stream().filter(consumer -> {
            String consumerTopicName = consumer.getTopic();
            return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName);
        }).collect(Collectors.toList());
        List futureList = consumersToUnsub.stream().map(ConsumerBase::unsubscribeAsync).collect(Collectors.toList());
        FutureUtil.waitForAll(futureList).whenComplete((r, ex) -> {
            if (ex == null) {
                consumersToUnsub.forEach(consumer1 -> {
                    this.consumers.remove(consumer1.getTopic());
                    this.pausedConsumers.remove(consumer1);
                    this.allTopicPartitionsNumber.decrementAndGet();
                });
                this.removeTopic(topicName);
                if (this.unAckedMessageTracker instanceof UnAckedTopicMessageTracker) {
                    ((UnAckedTopicMessageTracker)this.unAckedMessageTracker).removeTopicMessages(topicName);
                }
                unsubscribeFuture.complete(null);
                log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, allTopicPartitionsNumber: {}", new Object[]{topicName, this.subscription, this.consumerName, this.allTopicPartitionsNumber});
            } else {
                unsubscribeFuture.completeExceptionally((Throwable)ex);
                this.setState(HandlerState.State.Failed);
                log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer", new Object[]{topicName, this.subscription, this.consumerName, ex.getCause()});
            }
        });
        return unsubscribeFuture;
    }

    public List<String> getPartitionedTopics() {
        return this.partitionedTopics.keySet().stream().collect(Collectors.toList());
    }

    public List<String> getPartitions() {
        return this.consumers.keySet().stream().collect(Collectors.toList());
    }

    public List<ConsumerImpl<T>> getConsumers() {
        return this.consumers.values().stream().collect(Collectors.toList());
    }

    int getPartitionsOfTheTopicMap() {
        return this.partitionedTopics.values().stream().mapToInt(Integer::intValue).sum();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pause() {
        Object object = this.pauseMutex;
        synchronized (object) {
            this.paused = true;
            this.consumers.forEach((name, consumer) -> consumer.pause());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resume() {
        Object object = this.pauseMutex;
        synchronized (object) {
            this.paused = false;
            this.consumers.forEach((name, consumer) -> consumer.resume());
        }
    }

    public long getLastDisconnectedTimestamp() {
        long lastDisconnectedTimestamp = 0L;
        Optional<ConsumerImpl> c = this.consumers.values().stream().max(Comparator.comparingLong(ConsumerImpl::getLastDisconnectedTimestamp));
        if (c.isPresent()) {
            lastDisconnectedTimestamp = c.get().getLastDisconnectedTimestamp();
        }
        return lastDisconnectedTimestamp;
    }

    private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) {
        int oldPartitionNumber = this.partitionedTopics.get(topicName);
        return ((CompletableFuture)this.client.getPartitionsForTopic(topicName).thenCompose(list -> {
            int currentPartitionNumber = Long.valueOf(list.stream().filter(t2 -> TopicName.get(t2).isPartitioned()).count()).intValue();
            if (log.isDebugEnabled()) {
                log.debug("[{}] partitions number. old: {}, new: {}", new Object[]{topicName, oldPartitionNumber, currentPartitionNumber});
            }
            if (oldPartitionNumber == currentPartitionNumber) {
                return CompletableFuture.completedFuture(null);
            }
            if (currentPartitionNumber == 0) {
                this.partitionedTopics.put(topicName, 0);
                this.allTopicPartitionsNumber.addAndGet(-oldPartitionNumber);
                ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
                for (Map.Entry<String, ConsumerImpl<T>> e : this.consumers.entrySet()) {
                    String partitionedTopicName = TopicName.get(e.getKey()).getPartitionedTopicName();
                    if (!partitionedTopicName.equals(topicName)) continue;
                    futures.add(e.getValue().closeAsync());
                    this.consumers.remove(e.getKey());
                }
                return FutureUtil.waitForAll(futures);
            }
            if (oldPartitionNumber < currentPartitionNumber) {
                this.allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber);
                this.partitionedTopics.put(topicName, currentPartitionNumber);
                List newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber);
                List futureList = newPartitions.stream().map(partitionName -> {
                    int partitionIndex = TopicName.getPartitionIndex(partitionName);
                    CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<Consumer<T>>();
                    ConsumerConfigurationData<T> configurationData = this.getInternalConsumerConfig();
                    configurationData.setStartPaused(this.paused);
                    ConsumerImpl<T> newConsumer = this.createInternalConsumer(configurationData, (String)partitionName, partitionIndex, subFuture, true, (Schema<T>)this.schema);
                    Object object = this.pauseMutex;
                    synchronized (object) {
                        if (this.paused) {
                            newConsumer.pause();
                        } else {
                            newConsumer.resume();
                        }
                        this.consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] create consumer {} for partitionName: {}", new Object[]{topicName, newConsumer.getTopic(), partitionName});
                    }
                    return subFuture;
                }).collect(Collectors.toList());
                this.onPartitionsChange(topicName, currentPartitionNumber);
                return FutureUtil.waitForAll(futureList).thenAccept(finalFuture -> {
                    List<ConsumerImpl<T>> newConsumerList = newPartitions.stream().map(partitionTopic -> this.consumers.get(partitionTopic)).collect(Collectors.toList());
                    this.startReceivingMessages(newConsumerList);
                });
            }
            log.error("[{}] not support shrink topic partitions. old: {}, new: {}", new Object[]{topicName, oldPartitionNumber, currentPartitionNumber});
            return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotSupportedException("not support shrink topic partitions"));
        })).exceptionally(throwable -> {
            log.warn("Failed to get partitions for topic to determine if new partitions are added", throwable);
            return null;
        });
    }

    @VisibleForTesting
    public Timeout getPartitionsAutoUpdateTimeout() {
        return this.partitionsAutoUpdateTimeout;
    }

    @Override
    @Deprecated
    public CompletableFuture<MessageId> getLastMessageIdAsync() {
        CompletableFuture<MessageId> returnFuture = new CompletableFuture<MessageId>();
        Map<String, CompletableFuture> messageIdFutures = this.consumers.entrySet().stream().map(entry -> Pair.of((String)entry.getKey(), ((ConsumerImpl)entry.getValue()).getLastMessageIdAsync())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        CompletableFuture.allOf(messageIdFutures.values().toArray(new CompletableFuture[0])).whenComplete((ignore, ex) -> {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            messageIdFutures.forEach((key, future) -> {
                MessageId messageId;
                try {
                    messageId = (MessageId)future.get();
                }
                catch (Exception e) {
                    log.warn("[{}] Exception when topic {} getLastMessageId.", key, (Object)e);
                    messageId = MessageId.earliest;
                }
                builder.put(key, messageId);
            });
            returnFuture.complete(new MultiMessageIdImpl(builder.build()));
        });
        return returnFuture;
    }

    public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
        List futures = this.consumers.values().stream().map(ConsumerImpl::getLastMessageIdsAsync).collect(Collectors.toList());
        return FutureUtil.waitForAll(futures).thenApply(__ -> {
            ArrayList messageIds = new ArrayList();
            futures.stream().map(CompletableFuture::join).forEach(messageIds::addAll);
            return messageIds;
        });
    }

    public static boolean isIllegalMultiTopicsMessageId(MessageId messageId) {
        return !messageId.equals((Object)MessageId.earliest) && !messageId.equals((Object)MessageId.latest);
    }

    public void tryAcknowledgeMessage(Message<T> msg) {
        if (msg != null) {
            this.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
                log.warn("[{}][{}] acknowledge message {} cumulative fail.", new Object[]{this.topic, this.subscription, msg.getMessageId(), ex});
                return null;
            });
        }
    }

    @Override
    protected void setCurrentReceiverQueueSize(int newSize) {
        Preconditions.checkArgument(newSize > 0, "receiver queue size should larger than 0");
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] setMaxReceiverQueueSize={}, previous={}", new Object[]{this.topic, this.subscription, newSize, this.getCurrentReceiverQueueSize()});
        }
        CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
        this.resumeReceivingFromPausedConsumersIfNeeded();
    }

    private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
        TopicName topicName = TopicName.get(topic);
        if (!topicName.isPersistent()) {
            return FutureUtil.failedFuture(new IllegalArgumentException("The method getExistsPartitions does not support non-persistent topic yet."));
        }
        return this.client.getLookup().getTopicsUnderNamespace(topicName.getNamespaceObject(), CommandGetTopicsOfNamespace.Mode.PERSISTENT, TopicName.getPattern(topicName.getPartitionedTopicName()), null).thenApply(getTopicsResult -> {
            if (getTopicsResult.getNonPartitionedOrPartitionTopics() == null || getTopicsResult.getNonPartitionedOrPartitionTopics().isEmpty()) {
                return Collections.emptyList();
            }
            Predicate<String> clientSideFilter = getTopicsResult.isFiltered() ? __ -> true : tp -> Pattern.compile(TopicName.getPartitionPattern(topic)).matcher((CharSequence)tp).matches();
            ArrayList<Integer> list = new ArrayList<Integer>(getTopicsResult.getNonPartitionedOrPartitionTopics().size());
            for (String partition : getTopicsResult.getNonPartitionedOrPartitionTopics()) {
                int partitionIndex = TopicName.get(partition).getPartitionIndex();
                if (partitionIndex < 0 || !clientSideFilter.test(partition)) continue;
                list.add(partitionIndex);
            }
            Collections.sort(list);
            return list;
        });
    }

    private ConsumerInterceptors<T> getInternalConsumerInterceptors(final ConsumerInterceptors<T> multiTopicInterceptors) {
        return new ConsumerInterceptors<T>(new ArrayList()){

            @Override
            public Message<T> onArrival(Consumer<T> consumer, Message<T> message) {
                return multiTopicInterceptors.onArrival(consumer, message);
            }

            @Override
            public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
                return message;
            }

            @Override
            public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
                multiTopicInterceptors.onAcknowledge(consumer, messageId, exception);
            }

            @Override
            public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
                multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception);
            }

            @Override
            public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> set) {
                multiTopicInterceptors.onNegativeAcksSend(consumer, set);
            }

            @Override
            public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> set) {
                multiTopicInterceptors.onAckTimeoutSend(consumer, set);
            }

            @Override
            public void onPartitionsChange(String topicName, int partitions) {
                multiTopicInterceptors.onPartitionsChange(topicName, partitions);
            }

            @Override
            public void close() throws IOException {
                multiTopicInterceptors.close();
            }
        };
    }

    private class TopicsPartitionChangedListener
    implements PartitionsChangedListener {
        private TopicsPartitionChangedListener() {
        }

        @Override
        public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            if (topicsExtended.isEmpty()) {
                future.complete(null);
                return future;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}]  run onTopicsExtended: {}, size: {}", new Object[]{MultiTopicsConsumerImpl.this.topic, topicsExtended.toString(), topicsExtended.size()});
            }
            ArrayList futureList = Lists.newArrayListWithExpectedSize(topicsExtended.size());
            topicsExtended.forEach(topic -> futureList.add(MultiTopicsConsumerImpl.this.subscribeIncreasedTopicPartitions(topic)));
            ((CompletableFuture)FutureUtil.waitForAll(futureList).thenAccept(finalFuture -> future.complete(null))).exceptionally(ex -> {
                log.warn("[{}] Failed to subscribe increased topics partitions: {}", (Object)MultiTopicsConsumerImpl.this.topic, (Object)ex.getMessage());
                future.completeExceptionally((Throwable)ex);
                return null;
            });
            return future;
        }
    }
}

