package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/MaintenanceEventTopicReader.class */
public class MaintenanceEventTopicReader implements MaintenanceEventReader {
    protected String _maintenanceEventTopic;
    protected Consumer<String, MaintenancePlan> _consumer;
    protected Set<TopicPartition> _currentPartitionAssignment;
    protected volatile boolean _shutdown = false;
    protected long _lastEventReadPeriodEndTimeMs;
    protected KafkaCruiseControl _kafkaCruiseControl;
    protected long _maintenancePlanExpirationMs;
    public static final String MAINTENANCE_PLAN_EXPIRATION_MS_CONFIG = "maintenance.plan.expiration.ms";
    public static final String MAINTENANCE_EVENT_TOPIC_CONFIG = "maintenance.event.topic";
    public static final String DEFAULT_MAINTENANCE_EVENT_TOPIC = "__MaintenanceEvent";
    public static final String MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG = "maintenance.event.topic.replication.factor";
    public static final short DEFAULT_MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR = 2;
    public static final String MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG = "maintenance.event.topic.partition.count";
    public static final int DEFAULT_MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT = 8;
    public static final String MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG = "maintenance.event.topic.retention.ms";
    public static final long INIT_MAINTENANCE_HISTORY_MS = 60000;
    private static final Logger LOG = LoggerFactory.getLogger(MaintenanceEventTopicReader.class);
    public static final long DEFAULT_MAINTENANCE_PLAN_EXPIRATION_MS = Duration.ofMinutes(15).toMillis();
    public static final long DEFAULT_MAINTENANCE_EVENT_TOPIC_RETENTION_TIME_MS = Duration.ofHours(6).toMillis();
    public static final Duration CONSUMER_CLOSE_TIMEOUT = Duration.ofSeconds(2);
    public static final String CONSUMER_CLIENT_ID_PREFIX = MaintenanceEventTopicReader.class.getSimpleName();

    protected Map<TopicPartition, Long> seekToRelevantOffsets() throws SamplingException {
        HashMap hashMap = new HashMap(this._currentPartitionAssignment.size());
        Iterator<TopicPartition> it = this._currentPartitionAssignment.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(this._lastEventReadPeriodEndTimeMs));
        }
        Map<TopicPartition, Long> endOffsets = this._consumer.endOffsets(new HashSet(this._currentPartitionAssignment));
        Map offsetsForTimes = this._consumer.offsetsForTimes(hashMap);
        KafkaCruiseControlUtils.sanityCheckOffsetFetch(endOffsets, offsetsForTimes);
        for (Map.Entry entry : offsetsForTimes.entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) entry.getValue();
            this._consumer.seek(topicPartition, offsetAndTimestamp != null ? offsetAndTimestamp.offset() : endOffsets.get(topicPartition).longValue());
        }
        return endOffsets;
    }

    protected void addMaintenancePlan(MaintenancePlan maintenancePlan, Set<MaintenanceEvent> set) {
        LOG.debug("Retrieved maintenance plan {}.", maintenancePlan);
        HashMap hashMap = new HashMap(4);
        hashMap.put(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this._kafkaCruiseControl);
        hashMap.put(AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, Long.valueOf(this._kafkaCruiseControl.timeMs()));
        hashMap.put(MaintenanceEvent.MAINTENANCE_EVENT_TYPE_CONFIG, maintenancePlan.maintenanceEventType());
        switch (maintenancePlan.maintenanceEventType()) {
            case ADD_BROKER:
                hashMap.put(MaintenanceEvent.BROKERS_OBJECT_CONFIG, ((AddBrokerPlan) maintenancePlan).brokers());
                break;
            case REMOVE_BROKER:
                hashMap.put(MaintenanceEvent.BROKERS_OBJECT_CONFIG, ((RemoveBrokerPlan) maintenancePlan).brokers());
                break;
            case FIX_OFFLINE_REPLICAS:
            case REBALANCE:
                break;
            case DEMOTE_BROKER:
                hashMap.put(MaintenanceEvent.BROKERS_OBJECT_CONFIG, ((DemoteBrokerPlan) maintenancePlan).brokers());
                break;
            case TOPIC_REPLICATION_FACTOR:
                hashMap.put(MaintenanceEvent.TOPICS_WITH_RF_UPDATE_CONFIG, ((TopicReplicationFactorPlan) maintenancePlan).topicRegexWithRFUpdate());
                break;
            default:
                throw new IllegalStateException(String.format("Unrecognized event type %s", maintenancePlan.maintenanceEventType()));
        }
        set.add((MaintenanceEvent) this._kafkaCruiseControl.config().getConfiguredInstance(AnomalyDetectorConfig.MAINTENANCE_EVENT_CLASS_CONFIG, MaintenanceEvent.class, hashMap));
    }

    @Override // com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventReader
    public Set<MaintenanceEvent> readEvents(Duration duration) throws SamplingException {
        LOG.debug("Reading maintenance events.");
        long timeMs = this._kafkaCruiseControl.timeMs();
        if (refreshPartitionAssignment()) {
            this._lastEventReadPeriodEndTimeMs = timeMs;
            return Collections.emptySet();
        }
        long millis = timeMs + duration.toMillis();
        HashSet hashSet = new HashSet();
        try {
            Map<TopicPartition, Long> seekToRelevantOffsets = seekToRelevantOffsets();
            LOG.debug("Started to consume from maintenance event topic partitions {}.", this._currentPartitionAssignment);
            this._consumer.resume(this._consumer.paused());
            HashSet hashSet2 = new HashSet();
            do {
                Iterator it = this._consumer.poll(duration).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    if (consumerRecord == null) {
                        LOG.warn("Cannot parse record, please update your Cruise Control version.");
                    } else {
                        long timeMs2 = ((MaintenancePlan) consumerRecord.value()).timeMs();
                        if (timeMs2 + this._maintenancePlanExpirationMs < timeMs) {
                            LOG.warn("Discarding the expired plan {}. (Expired: {} Evaluated: {}).", new Object[]{consumerRecord.value(), Long.valueOf(timeMs2 + this._maintenancePlanExpirationMs), Long.valueOf(timeMs)});
                        } else if (timeMs2 >= timeMs) {
                            TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
                            LOG.debug("Saw plan {} generated after the end time of event read period {}. Pausing {} at offset {}.", new Object[]{consumerRecord.value(), Long.valueOf(timeMs), topicPartition, Long.valueOf(consumerRecord.offset())});
                            hashSet2.add(topicPartition);
                        } else {
                            addMaintenancePlan((MaintenancePlan) consumerRecord.value(), hashSet);
                        }
                    }
                }
                if (!hashSet2.isEmpty()) {
                    this._consumer.pause(hashSet2);
                    hashSet2.clear();
                }
                if (KafkaCruiseControlUtils.consumptionDone(this._consumer, seekToRelevantOffsets)) {
                    break;
                }
            } while (this._kafkaCruiseControl.timeMs() < millis);
            if (hashSet.size() > 0) {
                LOG.info("Retrieved {} maintenance plans from partitions {} (range [{},{}]).", new Object[]{Integer.valueOf(hashSet.size()), this._currentPartitionAssignment, Long.valueOf(this._lastEventReadPeriodEndTimeMs), Long.valueOf(timeMs)});
            }
            return hashSet;
        } finally {
            this._lastEventReadPeriodEndTimeMs = timeMs;
        }
    }

    protected boolean refreshPartitionAssignment() {
        List<PartitionInfo> partitionsFor = this._consumer.partitionsFor(this._maintenanceEventTopic);
        if (partitionsFor == null) {
            LOG.error("Consumer returned null for maintenance event topic {}.", this._maintenanceEventTopic);
            return true;
        }
        if (partitionsFor.isEmpty()) {
            this._currentPartitionAssignment = Collections.emptySet();
            LOG.error("The set of partitions currently assigned to the maintenance event consumer is empty.");
            return true;
        }
        if (partitionsFor.size() == this._currentPartitionAssignment.size()) {
            return false;
        }
        this._currentPartitionAssignment = new HashSet(partitionsFor.size());
        for (PartitionInfo partitionInfo : partitionsFor) {
            this._currentPartitionAssignment.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        this._consumer.assign(this._currentPartitionAssignment);
        return false;
    }

    public void configure(Map<String, ?> map) {
        this._kafkaCruiseControl = AnomalyUtils.extractKafkaCruiseControlObjectFromConfig(map, KafkaAnomalyType.MAINTENANCE_EVENT);
        String str = (String) map.get(MAINTENANCE_EVENT_TOPIC_CONFIG);
        this._maintenanceEventTopic = (str == null || str.isEmpty()) ? DEFAULT_MAINTENANCE_EVENT_TOPIC : str;
        String str2 = (String) map.get(MAINTENANCE_PLAN_EXPIRATION_MS_CONFIG);
        this._maintenancePlanExpirationMs = (str2 == null || str2.isEmpty()) ? DEFAULT_MAINTENANCE_PLAN_EXPIRATION_MS : Long.parseLong(str2);
        this._consumer = AnomalyDetectorUtils.createMaintenanceEventConsumer(map, CONSUMER_CLIENT_ID_PREFIX);
        ensureTopicCreated(map);
        this._currentPartitionAssignment = Collections.emptySet();
        if (refreshPartitionAssignment()) {
            throw new IllegalStateException("Cannot find the maintenance event topic " + this._maintenanceEventTopic + " in the cluster.");
        }
        this._lastEventReadPeriodEndTimeMs = this._kafkaCruiseControl.timeMs() - 60000;
    }

    protected static short maintenanceEventTopicReplicationFactor(Map<String, ?> map, AdminClient adminClient) {
        String str = (String) map.get(MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG);
        if (str != null && !str.isEmpty()) {
            return Short.parseShort(str);
        }
        try {
            return (short) Math.min(2, (int) ((short) ((Collection) adminClient.describeCluster().nodes().get(30000L, TimeUnit.MILLISECONDS)).size()));
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new IllegalStateException("Auto creation of maintenance event topic failed due to failure to describe cluster.", e);
        }
    }

    protected static long maintenanceEventTopicRetentionMs(Map<String, ?> map) {
        String str = (String) map.get(MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG);
        return (str == null || str.isEmpty()) ? DEFAULT_MAINTENANCE_EVENT_TOPIC_RETENTION_TIME_MS : Long.parseLong(str);
    }

    protected static int maintenanceEventTopicPartitionCount(Map<String, ?> map) {
        String str = (String) map.get(MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG);
        if (str == null || str.isEmpty()) {
            return 8;
        }
        return Integer.parseInt(str);
    }

    protected void ensureTopicCreated(Map<String, ?> map) {
        AdminClient adminClient = this._kafkaCruiseControl.adminClient();
        short maintenanceEventTopicReplicationFactor = maintenanceEventTopicReplicationFactor(map, adminClient);
        long maintenanceEventTopicRetentionMs = maintenanceEventTopicRetentionMs(map);
        maybeCreateOrUpdateTopic(adminClient, KafkaCruiseControlUtils.wrapTopic(this._maintenanceEventTopic, maintenanceEventTopicPartitionCount(map), maintenanceEventTopicReplicationFactor, maintenanceEventTopicRetentionMs));
    }

    protected void maybeCreateOrUpdateTopic(AdminClient adminClient, NewTopic newTopic) {
        if (KafkaCruiseControlUtils.createTopic(adminClient, newTopic)) {
            return;
        }
        KafkaCruiseControlUtils.maybeUpdateTopicConfig(adminClient, newTopic);
        KafkaCruiseControlUtils.maybeIncreasePartitionCount(adminClient, newTopic);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this._shutdown = true;
        this._consumer.close(CONSUMER_CLOSE_TIMEOUT);
        this._currentPartitionAssignment.clear();
    }
}
