/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ConsumerInterceptors;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PatternConsumerUpdateQueue;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TopicListWatcher;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.topics.TopicsPattern;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PatternMultiTopicsConsumerImpl<T>
extends MultiTopicsConsumerImpl<T>
implements TimerTask {
    private final TopicsPattern topicsPattern;
    final TopicsChangedListener topicsChangeListener;
    private final CommandGetTopicsOfNamespace.Mode subscriptionMode;
    private final TopicListWatcher topicListWatcher;
    private final CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture();
    protected NamespaceName namespaceName;
    private final AtomicInteger recheckPatternEpoch = new AtomicInteger();
    private volatile Timeout recheckPatternTimeout = null;
    private volatile String topicsHash;
    private PatternConsumerUpdateQueue updateTaskQueue;
    private volatile boolean closed = false;
    private static final Logger log = LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);

    public PatternMultiTopicsConsumerImpl(TopicsPattern topicsPattern, String topicsHash, PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, CommandGetTopicsOfNamespace.Mode subscriptionMode, ConsumerInterceptors<T> interceptors) {
        super(client, conf, executorProvider, subscribeFuture, schema, interceptors, false);
        this.topicsPattern = topicsPattern;
        this.topicsHash = topicsHash;
        this.subscriptionMode = subscriptionMode;
        this.namespaceName = topicsPattern.namespace();
        this.topicsChangeListener = new PatternTopicsChangedListener();
        this.updateTaskQueue = new PatternConsumerUpdateQueue(this);
        if (subscriptionMode == CommandGetTopicsOfNamespace.Mode.PERSISTENT) {
            long watcherId = client.newTopicListWatcherId();
            this.topicListWatcher = new TopicListWatcher(this.updateTaskQueue, client, topicsPattern, watcherId, this.namespaceName, topicsHash, this.watcherFuture, () -> this.recheckTopicsChangeAfterReconnect());
            this.watcherFuture.exceptionally(ex -> {
                if (this.closed) {
                    log.warn("Pattern consumer [{}] was closed while creating topic list watcher", (Object)conf.getSubscriptionName(), ex);
                } else {
                    log.warn("Pattern consumer [{}] unable to create topic list watcher. Falling back to only polling for new topics", (Object)conf.getSubscriptionName(), ex);
                    this.recheckPatternTimeout = client.timer().newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
                }
                return null;
            });
        } else {
            log.debug("Pattern consumer [{}] not creating topic list watcher for subscription mode {}", (Object)conf.getSubscriptionName(), (Object)subscriptionMode);
            this.topicListWatcher = null;
            this.watcherFuture.complete(null);
            this.recheckPatternTimeout = client.timer().newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
        }
    }

    private void recheckTopicsChangeAfterReconnect() {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return;
        }
        this.updateTaskQueue.appendRecheckOp();
    }

    @Override
    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled() || this.closed) {
            return;
        }
        this.updateTaskQueue.appendRecheckOp();
    }

    CompletableFuture<Void> recheckTopicsChange() {
        String pattern = this.topicsPattern.inputPattern();
        int epoch = this.recheckPatternEpoch.incrementAndGet();
        return ((CompletableFuture)this.client.getLookup().getTopicsUnderNamespace(this.namespaceName, this.subscriptionMode, pattern, this.topicsHash).thenCompose(getTopicsResult -> {
            PatternMultiTopicsConsumerImpl patternMultiTopicsConsumerImpl = this;
            synchronized (patternMultiTopicsConsumerImpl) {
                if (this.recheckPatternEpoch.get() > epoch) {
                    return CompletableFuture.completedFuture(null);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Pattern consumer [{}] get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}", new Object[]{this.getSubscription(), this.namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(), getTopicsResult.isFiltered()});
                    getTopicsResult.getTopics().forEach(topicName -> log.debug("Get topics under namespace {}, topic: {}", (Object)this.namespaceName, topicName));
                }
                ArrayList<String> oldTopics = new ArrayList<String>(this.getPartitions());
                return PatternMultiTopicsConsumerImpl.updateSubscriptions(this.topicsPattern, this::setTopicsHash, getTopicsResult, this.topicsChangeListener, oldTopics, this.subscription);
            }
        })).thenAccept(__ -> {
            if (this.recheckPatternTimeout != null) {
                this.recheckPatternTimeout = this.client.timer().newTimeout(this, Math.max(1, this.conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
            }
        });
    }

    static CompletableFuture<Void> updateSubscriptions(TopicsPattern topicsPattern, java.util.function.Consumer<String> topicsHashSetter, GetTopicsResult getTopicsResult, TopicsChangedListener topicsChangedListener, List<String> oldTopics, String subscriptionForLog) {
        topicsHashSetter.accept(getTopicsResult.getTopicsHash());
        if (!getTopicsResult.isChanged()) {
            return CompletableFuture.completedFuture(null);
        }
        List<String> newTopics = getTopicsResult.isFiltered() ? getTopicsResult.getNonPartitionedOrPartitionTopics() : getTopicsResult.filterTopics(topicsPattern).getNonPartitionedOrPartitionTopics();
        ArrayList<CompletableFuture<Void>> listenersCallback = new ArrayList<CompletableFuture<Void>>(2);
        Set<String> topicsAdded = TopicList.minus(newTopics, oldTopics);
        Set<String> topicsRemoved = TopicList.minus(oldTopics, newTopics);
        if (log.isDebugEnabled()) {
            log.debug("Pattern consumer [{}] Recheck pattern consumer's topics. topicsAdded: {}, topicsRemoved: {}", new Object[]{subscriptionForLog, topicsAdded, topicsRemoved});
        }
        listenersCallback.add(topicsChangedListener.onTopicsAdded(topicsAdded));
        listenersCallback.add(topicsChangedListener.onTopicsRemoved(topicsRemoved));
        return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
    }

    public TopicsPattern getPattern() {
        return this.topicsPattern;
    }

    @VisibleForTesting
    void setTopicsHash(String topicsHash) {
        this.topicsHash = topicsHash;
    }

    @Override
    @SuppressFBWarnings
    public CompletableFuture<Void> closeAsync() {
        this.closed = true;
        Timeout timeout = this.recheckPatternTimeout;
        if (timeout != null) {
            timeout.cancel();
            this.recheckPatternTimeout = null;
        }
        CompletionStage topicListWatcherCloseFuture = Optional.ofNullable(this.topicListWatcher).map(TopicListWatcher::closeAsync).orElse(CompletableFuture.completedFuture(null)).exceptionally(t2 -> null);
        CompletableFuture<Void> runningTaskCancelFuture = this.updateTaskQueue.cancelAllAndWaitForTheRunningTask();
        return ((CompletableFuture)FutureUtil.waitForAll(Lists.newArrayList(topicListWatcherCloseFuture, runningTaskCancelFuture)).exceptionally(t2 -> null)).thenCompose(__ -> super.closeAsync());
    }

    @VisibleForTesting
    int getRecheckPatternEpoch() {
        return this.recheckPatternEpoch.get();
    }

    @VisibleForTesting
    Timeout getRecheckPatternTimeout() {
        return this.recheckPatternTimeout;
    }

    @Override
    protected void handleSubscribeOneTopicError(String topicName, Throwable error, CompletableFuture<Void> subscribeFuture) {
        subscribeFuture.completeExceptionally(error);
    }

    private class PatternTopicsChangedListener
    implements TopicsChangedListener {
        private PatternTopicsChangedListener() {
        }

        @Override
        public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) {
            if (removedTopics.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            ArrayList unsubscribeList = new ArrayList(removedTopics.size());
            HashSet<String> partialRemoved = new HashSet<String>(removedTopics.size());
            HashSet<String> partialRemovedForLog = new HashSet<String>(removedTopics.size());
            for (String tp : removedTopics) {
                TopicName topicName = TopicName.get(tp);
                ConsumerImpl consumer = (ConsumerImpl)PatternMultiTopicsConsumerImpl.this.consumers.get(topicName.toString());
                if (consumer == null) continue;
                CompletableFuture unsubscribeFuture = new CompletableFuture();
                consumer.closeAsync().whenComplete((__, ex) -> {
                    if (ex != null) {
                        log.error("Pattern consumer [{}] failed to unsubscribe from topics: {}", new Object[]{PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName.toString(), ex});
                        unsubscribeFuture.completeExceptionally((Throwable)ex);
                    } else {
                        PatternMultiTopicsConsumerImpl.this.consumers.remove(topicName.toString(), consumer);
                        unsubscribeFuture.complete(null);
                    }
                });
                unsubscribeList.add(unsubscribeFuture);
                partialRemoved.add(topicName.getPartitionedTopicName());
                partialRemovedForLog.add(topicName.toString());
            }
            if (log.isDebugEnabled()) {
                log.debug("Pattern consumer [{}] remove topics. {}", (Object)PatternMultiTopicsConsumerImpl.this.getSubscription(), partialRemovedForLog);
            }
            return FutureUtil.waitForAll(unsubscribeList).handle((__, ex) -> {
                ArrayList<String> removedPartitionedTopicsForLog = new ArrayList<String>();
                for (String groupedTopicRemoved : partialRemoved) {
                    Integer partitions = (Integer)PatternMultiTopicsConsumerImpl.this.partitionedTopics.get(groupedTopicRemoved);
                    if (partitions == null) continue;
                    boolean allPartitionsHasBeenRemoved = true;
                    for (int i = 0; i < partitions; ++i) {
                        if (!PatternMultiTopicsConsumerImpl.this.consumers.containsKey(TopicName.get(groupedTopicRemoved).getPartition(i).toString())) continue;
                        allPartitionsHasBeenRemoved = false;
                        break;
                    }
                    if (!allPartitionsHasBeenRemoved) continue;
                    removedPartitionedTopicsForLog.add(String.format("%s with %s partitions", groupedTopicRemoved, partitions));
                    PatternMultiTopicsConsumerImpl.this.partitionedTopics.remove(groupedTopicRemoved, partitions);
                }
                if (log.isDebugEnabled()) {
                    log.debug("Pattern consumer [{}] remove partitioned topics because all partitions have been removed. {}", (Object)PatternMultiTopicsConsumerImpl.this.getSubscription(), removedPartitionedTopicsForLog);
                }
                return null;
            });
        }

        @Override
        public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
            TopicName topicName;
            if (addedTopics.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            ArrayList<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(addedTopics.size());
            HashSet<String> groupedTopics = new HashSet<String>();
            ArrayList<String> expendPartitionsForLog = new ArrayList<String>();
            for (String tp : addedTopics) {
                topicName = TopicName.get(tp);
                groupedTopics.add(topicName.getPartitionedTopicName());
            }
            for (String tp : addedTopics) {
                topicName = TopicName.get(tp);
                if (PatternMultiTopicsConsumerImpl.this.partitionedTopics.containsKey(topicName.getPartitionedTopicName())) {
                    if (!PatternMultiTopicsConsumerImpl.this.consumers.containsKey(topicName.toString())) {
                        if (topicName.getPartitionIndex() < 0) {
                            log.error("Pattern consumer [{}] skip to subscribe to the non-partitioned topic {}, because hassubscribed a partitioned topic with the same name", (Object)PatternMultiTopicsConsumerImpl.this.getSubscription(), (Object)topicName.toString());
                        } else {
                            if (topicName.getPartitionIndex() + 1 > (Integer)PatternMultiTopicsConsumerImpl.this.partitionedTopics.get(topicName.getPartitionedTopicName())) {
                                PatternMultiTopicsConsumerImpl.this.partitionedTopics.put(topicName.getPartitionedTopicName(), topicName.getPartitionIndex() + 1);
                            }
                            expendPartitionsForLog.add(topicName.toString());
                            CompletableFuture<Void> consumerFuture = PatternMultiTopicsConsumerImpl.this.subscribeAsync(topicName.toString(), 0);
                            consumerFuture.whenComplete((__, ex) -> {
                                if (ex != null) {
                                    log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}", new Object[]{PatternMultiTopicsConsumerImpl.this.getSubscription(), topicName, ex});
                                }
                            });
                            futures.add(consumerFuture);
                        }
                    }
                    groupedTopics.remove(topicName.getPartitionedTopicName());
                    continue;
                }
                if (PatternMultiTopicsConsumerImpl.this.consumers.containsKey(topicName.toString())) {
                    groupedTopics.remove(topicName.getPartitionedTopicName());
                    continue;
                }
                if (!PatternMultiTopicsConsumerImpl.this.consumers.containsKey(topicName.getPartitionedTopicName()) || topicName.getPartitionIndex() < 0) continue;
                log.error("Pattern consumer [{}] skip to subscribe to the partitioned topic {}, because hassubscribed a non-partitioned topic with the same name", (Object)PatternMultiTopicsConsumerImpl.this.getSubscription(), (Object)topicName);
                groupedTopics.remove(topicName.getPartitionedTopicName());
            }
            for (String partitionedTopic : groupedTopics) {
                CompletableFuture<Void> consumerFuture = PatternMultiTopicsConsumerImpl.this.subscribeAsync(partitionedTopic, false);
                consumerFuture.whenComplete((__, ex) -> {
                    if (ex != null) {
                        log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}", new Object[]{PatternMultiTopicsConsumerImpl.this.getSubscription(), partitionedTopic, ex});
                    }
                });
                futures.add(consumerFuture);
            }
            if (log.isDebugEnabled()) {
                log.debug("Pattern consumer [{}] add topics. expend partitions {}, new subscribing {}", new Object[]{PatternMultiTopicsConsumerImpl.this.getSubscription(), expendPartitionsForLog, groupedTopics});
            }
            return FutureUtil.waitForAll(futures);
        }
    }

    static interface TopicsChangedListener {
        public CompletableFuture<Void> onTopicsRemoved(Collection<String> var1);

        public CompletableFuture<Void> onTopicsAdded(Collection<String> var1);
    }
}

