/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.producer.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.clients.Metadata;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.requests.MetadataRequest;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.requests.MetadataResponse;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public class ProducerMetadata
extends Metadata {
    private final long metadataIdleMs;
    private final Map<String, Long> topics = new HashMap<String, Long>();
    private final Set<String> newTopics = new HashSet<String>();
    private final Logger log;
    private final Time time;

    public ProducerMetadata(long refreshBackoffMs, long metadataExpireMs, long metadataIdleMs, LogContext logContext, ClusterResourceListeners clusterResourceListeners, Time time) {
        super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners);
        this.metadataIdleMs = metadataIdleMs;
        this.log = logContext.logger(ProducerMetadata.class);
        this.time = time;
    }

    @Override
    public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
        return new MetadataRequest.Builder(new ArrayList<String>(this.topics.keySet()), true);
    }

    @Override
    public synchronized MetadataRequest.Builder newMetadataRequestBuilderForNewTopics() {
        return new MetadataRequest.Builder(new ArrayList<String>(this.newTopics), true);
    }

    public synchronized void add(String topic, long nowMs) {
        Objects.requireNonNull(topic, "topic cannot be null");
        if (this.topics.put(topic, nowMs + this.metadataIdleMs) == null) {
            this.newTopics.add(topic);
            this.requestUpdateForNewTopics();
        }
    }

    public synchronized int requestUpdateForTopic(String topic) {
        if (this.newTopics.contains(topic)) {
            return this.requestUpdateForNewTopics();
        }
        return this.requestUpdate();
    }

    synchronized Set<String> topics() {
        return this.topics.keySet();
    }

    synchronized Set<String> newTopics() {
        return this.newTopics;
    }

    public synchronized boolean containsTopic(String topic) {
        return this.topics.containsKey(topic);
    }

    @Override
    public synchronized boolean retainTopic(String topic, boolean isInternal, long nowMs) {
        Long expireMs = this.topics.get(topic);
        if (expireMs == null) {
            return false;
        }
        if (this.newTopics.contains(topic)) {
            return true;
        }
        if (expireMs <= nowMs) {
            this.log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", new Object[]{topic, expireMs, nowMs});
            this.topics.remove(topic);
            return false;
        }
        return true;
    }

    public synchronized void awaitUpdate(int lastVersion, long timeoutMs) throws InterruptedException {
        long currentTimeMs = this.time.milliseconds();
        long deadlineMs = currentTimeMs + timeoutMs < 0L ? Long.MAX_VALUE : currentTimeMs + timeoutMs;
        this.time.waitObject(this, () -> {
            this.maybeThrowFatalException();
            return this.updateVersion() > lastVersion || this.isClosed();
        }, deadlineMs);
        if (this.isClosed()) {
            throw new KafkaException("Requested metadata update after close");
        }
    }

    @Override
    public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {
        super.update(requestVersion, response, isPartialUpdate, nowMs);
        if (!this.newTopics.isEmpty()) {
            for (MetadataResponse.TopicMetadata metadata : response.topicMetadata()) {
                this.newTopics.remove(metadata.topic());
            }
        }
        this.notifyAll();
    }

    @Override
    public synchronized void fatalError(KafkaException fatalException) {
        super.fatalError(fatalException);
        this.notifyAll();
    }

    @Override
    public synchronized void close() {
        super.close();
        this.notifyAll();
    }
}

