/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.detector.AddBrokerPlan;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyUtils;
import com.linkedin.kafka.cruisecontrol.detector.DemoteBrokerPlan;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEvent;
import com.linkedin.kafka.cruisecontrol.detector.MaintenanceEventReader;
import com.linkedin.kafka.cruisecontrol.detector.MaintenancePlan;
import com.linkedin.kafka.cruisecontrol.detector.RemoveBrokerPlan;
import com.linkedin.kafka.cruisecontrol.detector.TopicReplicationFactorPlan;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaintenanceEventTopicReader
implements MaintenanceEventReader {
    private static final Logger LOG = LoggerFactory.getLogger(MaintenanceEventTopicReader.class);
    protected String _maintenanceEventTopic;
    protected Consumer<String, MaintenancePlan> _consumer;
    protected Set<TopicPartition> _currentPartitionAssignment;
    protected volatile boolean _shutdown = false;
    protected long _lastEventReadPeriodEndTimeMs;
    protected KafkaCruiseControl _kafkaCruiseControl;
    protected long _maintenancePlanExpirationMs;
    public static final String MAINTENANCE_PLAN_EXPIRATION_MS_CONFIG = "maintenance.plan.expiration.ms";
    public static final long DEFAULT_MAINTENANCE_PLAN_EXPIRATION_MS = Duration.ofMinutes(15L).toMillis();
    public static final String MAINTENANCE_EVENT_TOPIC_CONFIG = "maintenance.event.topic";
    public static final String DEFAULT_MAINTENANCE_EVENT_TOPIC = "__MaintenanceEvent";
    public static final String MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG = "maintenance.event.topic.replication.factor";
    public static final short DEFAULT_MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR = 2;
    public static final String MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG = "maintenance.event.topic.partition.count";
    public static final int DEFAULT_MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT = 8;
    public static final String MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG = "maintenance.event.topic.retention.ms";
    public static final long DEFAULT_MAINTENANCE_EVENT_TOPIC_RETENTION_TIME_MS = Duration.ofHours(6L).toMillis();
    public static final Duration CONSUMER_CLOSE_TIMEOUT = Duration.ofSeconds(2L);
    public static final String CONSUMER_CLIENT_ID_PREFIX = MaintenanceEventTopicReader.class.getSimpleName();
    public static final long INIT_MAINTENANCE_HISTORY_MS = 60000L;

    protected Map<TopicPartition, Long> seekToRelevantOffsets() throws SamplingException {
        HashMap<TopicPartition, Long> timestampToSeek = new HashMap<TopicPartition, Long>(this._currentPartitionAssignment.size());
        for (TopicPartition tp : this._currentPartitionAssignment) {
            timestampToSeek.put(tp, this._lastEventReadPeriodEndTimeMs);
        }
        HashSet<TopicPartition> assignment = new HashSet<TopicPartition>(this._currentPartitionAssignment);
        Map endOffsets = this._consumer.endOffsets(assignment);
        Map offsetsForTimes = this._consumer.offsetsForTimes(timestampToSeek);
        KafkaCruiseControlUtils.sanityCheckOffsetFetch(endOffsets, offsetsForTimes);
        for (Map.Entry entry : offsetsForTimes.entrySet()) {
            TopicPartition tp = (TopicPartition)entry.getKey();
            OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp)entry.getValue();
            this._consumer.seek(tp, offsetAndTimestamp != null ? offsetAndTimestamp.offset() : ((Long)endOffsets.get(tp)).longValue());
        }
        return endOffsets;
    }

    protected void addMaintenancePlan(MaintenancePlan maintenancePlan, Set<MaintenanceEvent> maintenanceEvents) {
        LOG.debug("Retrieved maintenance plan {}.", (Object)maintenancePlan);
        HashMap<String, Object> parameterConfigOverrides = new HashMap<String, Object>(4);
        parameterConfigOverrides.put("kafka.cruise.control.object", this._kafkaCruiseControl);
        parameterConfigOverrides.put("anomaly.detection.time.ms.object", this._kafkaCruiseControl.timeMs());
        parameterConfigOverrides.put("maintenance.event.type", (Object)maintenancePlan.maintenanceEventType());
        switch (maintenancePlan.maintenanceEventType()) {
            case ADD_BROKER: {
                parameterConfigOverrides.put("brokers.object", ((AddBrokerPlan)maintenancePlan).brokers());
                break;
            }
            case REMOVE_BROKER: {
                parameterConfigOverrides.put("brokers.object", ((RemoveBrokerPlan)maintenancePlan).brokers());
                break;
            }
            case FIX_OFFLINE_REPLICAS: 
            case REBALANCE: {
                break;
            }
            case DEMOTE_BROKER: {
                parameterConfigOverrides.put("brokers.object", ((DemoteBrokerPlan)maintenancePlan).brokers());
                break;
            }
            case TOPIC_REPLICATION_FACTOR: {
                parameterConfigOverrides.put("topics.with.rf.update", ((TopicReplicationFactorPlan)maintenancePlan).topicRegexWithRFUpdate());
                break;
            }
            default: {
                throw new IllegalStateException(String.format("Unrecognized event type %s", new Object[]{maintenancePlan.maintenanceEventType()}));
            }
        }
        maintenanceEvents.add(this._kafkaCruiseControl.config().getConfiguredInstance("maintenance.event.class", MaintenanceEvent.class, parameterConfigOverrides));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<MaintenanceEvent> readEvents(Duration timeout) throws SamplingException {
        LOG.debug("Reading maintenance events.");
        long eventReadPeriodEndMs = this._kafkaCruiseControl.timeMs();
        if (this.refreshPartitionAssignment()) {
            this._lastEventReadPeriodEndTimeMs = eventReadPeriodEndMs;
            return Collections.emptySet();
        }
        long timeoutEndMs = eventReadPeriodEndMs + timeout.toMillis();
        HashSet<MaintenanceEvent> maintenanceEvents = new HashSet<MaintenanceEvent>();
        try {
            Map<TopicPartition, Long> endOffsets = this.seekToRelevantOffsets();
            LOG.debug("Started to consume from maintenance event topic partitions {}.", this._currentPartitionAssignment);
            this._consumer.resume((Collection)this._consumer.paused());
            HashSet<TopicPartition> partitionsToPause = new HashSet<TopicPartition>();
            do {
                ConsumerRecords records = this._consumer.poll(timeout);
                for (ConsumerRecord record : records) {
                    if (record == null) {
                        LOG.warn("Cannot parse record, please update your Cruise Control version.");
                        continue;
                    }
                    long planGenerationTimeMs = ((MaintenancePlan)record.value()).timeMs();
                    if (planGenerationTimeMs + this._maintenancePlanExpirationMs < eventReadPeriodEndMs) {
                        LOG.warn("Discarding the expired plan {}. (Expired: {} Evaluated: {}).", new Object[]{record.value(), planGenerationTimeMs + this._maintenancePlanExpirationMs, eventReadPeriodEndMs});
                        continue;
                    }
                    if (planGenerationTimeMs >= eventReadPeriodEndMs) {
                        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                        LOG.debug("Saw plan {} generated after the end time of event read period {}. Pausing {} at offset {}.", new Object[]{record.value(), eventReadPeriodEndMs, tp, record.offset()});
                        partitionsToPause.add(tp);
                        continue;
                    }
                    this.addMaintenancePlan((MaintenancePlan)record.value(), maintenanceEvents);
                }
                if (partitionsToPause.isEmpty()) continue;
                this._consumer.pause(partitionsToPause);
                partitionsToPause.clear();
            } while (!KafkaCruiseControlUtils.consumptionDone(this._consumer, endOffsets) && this._kafkaCruiseControl.timeMs() < timeoutEndMs);
            if (maintenanceEvents.size() > 0) {
                LOG.info("Retrieved {} maintenance plans from partitions {} (range [{},{}]).", new Object[]{maintenanceEvents.size(), this._currentPartitionAssignment, this._lastEventReadPeriodEndTimeMs, eventReadPeriodEndMs});
            }
        }
        finally {
            this._lastEventReadPeriodEndTimeMs = eventReadPeriodEndMs;
        }
        return maintenanceEvents;
    }

    protected boolean refreshPartitionAssignment() {
        List remotePartitionInfo = this._consumer.partitionsFor(this._maintenanceEventTopic);
        if (remotePartitionInfo == null) {
            LOG.error("Consumer returned null for maintenance event topic {}.", (Object)this._maintenanceEventTopic);
            return true;
        }
        if (remotePartitionInfo.isEmpty()) {
            this._currentPartitionAssignment = Collections.emptySet();
            LOG.error("The set of partitions currently assigned to the maintenance event consumer is empty.");
            return true;
        }
        if (remotePartitionInfo.size() == this._currentPartitionAssignment.size()) {
            return false;
        }
        this._currentPartitionAssignment = new HashSet<TopicPartition>(remotePartitionInfo.size());
        for (PartitionInfo partitionInfo : remotePartitionInfo) {
            this._currentPartitionAssignment.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        this._consumer.assign(this._currentPartitionAssignment);
        return false;
    }

    public void configure(Map<String, ?> configs) {
        this._kafkaCruiseControl = AnomalyUtils.extractKafkaCruiseControlObjectFromConfig(configs, KafkaAnomalyType.MAINTENANCE_EVENT);
        String maintenanceEventTopic = (String)configs.get(MAINTENANCE_EVENT_TOPIC_CONFIG);
        this._maintenanceEventTopic = maintenanceEventTopic == null || maintenanceEventTopic.isEmpty() ? DEFAULT_MAINTENANCE_EVENT_TOPIC : maintenanceEventTopic;
        String maintenancePlanExpirationMs = (String)configs.get(MAINTENANCE_PLAN_EXPIRATION_MS_CONFIG);
        this._maintenancePlanExpirationMs = maintenancePlanExpirationMs == null || maintenancePlanExpirationMs.isEmpty() ? DEFAULT_MAINTENANCE_PLAN_EXPIRATION_MS : Long.parseLong(maintenancePlanExpirationMs);
        this._consumer = AnomalyDetectorUtils.createMaintenanceEventConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
        this.ensureTopicCreated(configs);
        this._currentPartitionAssignment = Collections.emptySet();
        if (this.refreshPartitionAssignment()) {
            throw new IllegalStateException("Cannot find the maintenance event topic " + this._maintenanceEventTopic + " in the cluster.");
        }
        this._lastEventReadPeriodEndTimeMs = this._kafkaCruiseControl.timeMs() - 60000L;
    }

    protected static short maintenanceEventTopicReplicationFactor(Map<String, ?> config, AdminClient adminClient) {
        String maintenanceEventTopicRF = (String)config.get(MAINTENANCE_EVENT_TOPIC_REPLICATION_FACTOR_CONFIG);
        if (maintenanceEventTopicRF == null || maintenanceEventTopicRF.isEmpty()) {
            short numberOfBrokersInCluster;
            try {
                numberOfBrokersInCluster = (short)((Collection)adminClient.describeCluster().nodes().get(30000L, TimeUnit.MILLISECONDS)).size();
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new IllegalStateException("Auto creation of maintenance event topic failed due to failure to describe cluster.", e);
            }
            return (short)Math.min(2, numberOfBrokersInCluster);
        }
        return Short.parseShort(maintenanceEventTopicRF);
    }

    protected static long maintenanceEventTopicRetentionMs(Map<String, ?> config) {
        String maintenanceEventTopicRetentionMs = (String)config.get(MAINTENANCE_EVENT_TOPIC_RETENTION_MS_CONFIG);
        return maintenanceEventTopicRetentionMs == null || maintenanceEventTopicRetentionMs.isEmpty() ? DEFAULT_MAINTENANCE_EVENT_TOPIC_RETENTION_TIME_MS : Long.parseLong(maintenanceEventTopicRetentionMs);
    }

    protected static int maintenanceEventTopicPartitionCount(Map<String, ?> config) {
        String maintenanceEventTopicPartitionCount = (String)config.get(MAINTENANCE_EVENT_TOPIC_PARTITION_COUNT_CONFIG);
        return maintenanceEventTopicPartitionCount == null || maintenanceEventTopicPartitionCount.isEmpty() ? 8 : Integer.parseInt(maintenanceEventTopicPartitionCount);
    }

    protected void ensureTopicCreated(Map<String, ?> config) {
        AdminClient adminClient = this._kafkaCruiseControl.adminClient();
        short replicationFactor = MaintenanceEventTopicReader.maintenanceEventTopicReplicationFactor(config, adminClient);
        long retentionMs = MaintenanceEventTopicReader.maintenanceEventTopicRetentionMs(config);
        int partitionCount = MaintenanceEventTopicReader.maintenanceEventTopicPartitionCount(config);
        NewTopic maintenanceEventTopic = KafkaCruiseControlUtils.wrapTopic(this._maintenanceEventTopic, partitionCount, replicationFactor, retentionMs);
        this.maybeCreateOrUpdateTopic(adminClient, maintenanceEventTopic);
    }

    protected void maybeCreateOrUpdateTopic(AdminClient adminClient, NewTopic maintenanceEventTopic) {
        if (!KafkaCruiseControlUtils.createTopic(adminClient, maintenanceEventTopic)) {
            KafkaCruiseControlUtils.maybeUpdateTopicConfig(adminClient, maintenanceEventTopic);
            KafkaCruiseControlUtils.maybeIncreasePartitionCount(adminClient, maintenanceEventTopic);
        }
    }

    @Override
    public void close() {
        this._shutdown = true;
        this._consumer.close(CONSUMER_CLOSE_TIMEOUT);
        this._currentPartitionAssignment.clear();
    }
}

