/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"EI_EXPOSE_REP2"})
public class PatternConsumerUpdateQueue {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PatternConsumerUpdateQueue.class);
    private static final Pair<UpdateSubscriptionType, Collection<String>> RECHECK_OP = Pair.of(UpdateSubscriptionType.RECHECK, null);
    private final LinkedBlockingQueue<Pair<UpdateSubscriptionType, Collection<String>>> pendingTasks;
    private final PatternMultiTopicsConsumerImpl patternConsumer;
    private final PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener;
    private Pair<UpdateSubscriptionType, CompletableFuture<Void>> taskInProgress = null;
    private boolean recheckTaskInQueue = false;
    private volatile long lastRecheckTaskStartingTimestamp = 0L;
    private boolean closed;

    public PatternConsumerUpdateQueue(PatternMultiTopicsConsumerImpl patternConsumer) {
        this(patternConsumer, patternConsumer.topicsChangeListener);
    }

    @VisibleForTesting
    public PatternConsumerUpdateQueue(PatternMultiTopicsConsumerImpl patternConsumer, PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener) {
        this.patternConsumer = patternConsumer;
        this.topicsChangeListener = topicsChangeListener;
        this.pendingTasks = new LinkedBlockingQueue();
        this.doAppend(Pair.of(UpdateSubscriptionType.CONSUMER_INIT, null));
    }

    synchronized void appendTopicsAddedOp(Collection<String> topics) {
        if (topics == null || topics.isEmpty()) {
            return;
        }
        this.doAppend(Pair.of(UpdateSubscriptionType.TOPICS_ADDED, topics));
    }

    synchronized void appendTopicsRemovedOp(Collection<String> topics) {
        if (topics == null || topics.isEmpty()) {
            return;
        }
        this.doAppend(Pair.of(UpdateSubscriptionType.TOPICS_REMOVED, topics));
    }

    synchronized void appendRecheckOp() {
        this.doAppend(RECHECK_OP);
    }

    synchronized void doAppend(Pair<UpdateSubscriptionType, Collection<String>> task) {
        if (log.isDebugEnabled()) {
            log.debug("Pattern consumer [{}] try to append task. {} {}", new Object[]{this.patternConsumer.getSubscription(), task.getLeft(), task.getRight() == null ? "" : task.getRight()});
        }
        if (this.recheckTaskInQueue) {
            return;
        }
        if (this.pendingTasks.size() >= 30 && !task.getLeft().equals((Object)UpdateSubscriptionType.RECHECK)) {
            this.appendRecheckOp();
            return;
        }
        this.pendingTasks.add(task);
        if (task.getLeft().equals((Object)UpdateSubscriptionType.RECHECK)) {
            this.recheckTaskInQueue = true;
        }
        if (this.taskInProgress == null) {
            this.triggerNextTask();
        }
    }

    synchronized void triggerNextTask() {
        if (this.closed) {
            return;
        }
        Pair<UpdateSubscriptionType, Collection<String>> task = this.pendingTasks.poll();
        if (task == null) {
            this.taskInProgress = null;
            return;
        }
        if (this.recheckTaskInQueue && !task.getLeft().equals((Object)UpdateSubscriptionType.RECHECK)) {
            this.triggerNextTask();
            return;
        }
        CompletionStage<Void> newTaskFuture = null;
        switch (task.getLeft().ordinal()) {
            case 0: {
                newTaskFuture = ((CompletableFuture)this.patternConsumer.getSubscribeFuture().thenAccept(__ -> {})).exceptionally(ex -> {
                    PatternConsumerUpdateQueue patternConsumerUpdateQueue = this;
                    synchronized (patternConsumerUpdateQueue) {
                        this.closed = true;
                        this.patternConsumer.closeAsync().exceptionally(ex2 -> {
                            log.error("Pattern consumer failed to close, this error may left orphan consumers. Subscription: {}", (Object)this.patternConsumer.getSubscription());
                            return null;
                        });
                    }
                    return null;
                });
                break;
            }
            case 1: {
                newTaskFuture = this.topicsChangeListener.onTopicsAdded(task.getRight());
                break;
            }
            case 2: {
                newTaskFuture = this.topicsChangeListener.onTopicsRemoved(task.getRight());
                break;
            }
            case 3: {
                this.recheckTaskInQueue = false;
                this.lastRecheckTaskStartingTimestamp = System.currentTimeMillis();
                newTaskFuture = this.patternConsumer.recheckTopicsChange();
                break;
            }
            default: {
                throw new RuntimeException("Un-support UpdateSubscriptionType");
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Pattern consumer [{}] starting task. {} {} ", new Object[]{this.patternConsumer.getSubscription(), task.getLeft(), task.getRight() == null ? "" : task.getRight()});
        }
        this.taskInProgress = Pair.of(task.getLeft(), newTaskFuture);
        ((CompletableFuture)((CompletableFuture)newTaskFuture).thenAccept(ignore -> {
            if (log.isDebugEnabled()) {
                log.debug("Pattern consumer [{}] task finished. {} {} ", new Object[]{this.patternConsumer.getSubscription(), task.getLeft(), task.getRight() == null ? "" : task.getRight()});
            }
            this.triggerNextTask();
        })).exceptionally(ex -> {
            log.error("Pattern consumer [{}] task finished. {} {}. But it failed", new Object[]{this.patternConsumer.getSubscription(), task.getLeft(), task.getRight() == null ? "" : task.getRight(), ex});
            PatternConsumerUpdateQueue patternConsumerUpdateQueue = this;
            synchronized (patternConsumerUpdateQueue) {
                if (this.recheckTaskInQueue || this.closed) {
                    return null;
                }
            }
            long failedTime = System.currentTimeMillis();
            this.patternConsumer.getClient().timer().newTimeout(timeout -> {
                if (this.lastRecheckTaskStartingTimestamp <= failedTime) {
                    this.appendRecheckOp();
                }
            }, 10L, TimeUnit.SECONDS);
            this.triggerNextTask();
            return null;
        });
    }

    public synchronized CompletableFuture<Void> cancelAllAndWaitForTheRunningTask() {
        this.closed = true;
        if (this.taskInProgress == null) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.taskInProgress.getLeft().equals((Object)UpdateSubscriptionType.CONSUMER_INIT)) {
            return CompletableFuture.completedFuture(null);
        }
        return ((CompletableFuture)this.taskInProgress.getRight().thenAccept(__ -> {})).exceptionally(ex -> null);
    }

    private static enum UpdateSubscriptionType {
        CONSUMER_INIT,
        TOPICS_ADDED,
        TOPICS_REMOVED,
        RECHECK;

    }
}

