/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyContextImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

public class StandbyTask
extends AbstractTask {
    private boolean updateOffsetLimits;
    private final Sensor closeTaskSensor;
    private final Map<TopicPartition, Long> offsetLimits = new HashMap<TopicPartition, Long>();
    private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<TopicPartition, Long>();

    StandbyTask(TaskId id, Set<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory) {
        super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
        this.closeTaskSensor = metrics.threadLevelSensor(Thread.currentThread().getName(), "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]);
        this.processorContext = new StandbyContextImpl(id, config, this.stateMgr, metrics);
        HashSet<String> changelogTopicNames = new HashSet<String>(topology.storeToChangelogTopic().values());
        partitions.stream().filter(tp -> changelogTopicNames.contains(tp.topic())).forEach(tp -> this.offsetLimits.put((TopicPartition)tp, 0L));
        this.updateOffsetLimits = true;
    }

    @Override
    public void initializeMetadata() {
    }

    @Override
    public boolean initializeStateStores() {
        this.log.trace("Initializing state stores");
        this.registerStateStores();
        this.checkpointedOffsets = Collections.unmodifiableMap(this.stateMgr.checkpointed());
        this.processorContext.initialize();
        this.taskInitialized = true;
        return true;
    }

    @Override
    public void initializeTopology() {
    }

    @Override
    public void resume() {
        this.log.debug("Resuming");
        this.allowUpdateOfOffsetLimit();
    }

    @Override
    public void commit() {
        this.log.trace("Committing");
        this.flushAndCheckpointState();
        this.allowUpdateOfOffsetLimit();
        this.commitNeeded = false;
    }

    private void flushAndCheckpointState() {
        this.stateMgr.flush();
        this.stateMgr.checkpoint(Collections.emptyMap());
    }

    @Override
    public void close(boolean clean, boolean isZombie) {
        this.closeTaskSensor.record();
        if (!this.taskInitialized) {
            return;
        }
        this.log.debug("Closing");
        try {
            if (clean) {
                this.commit();
            }
        }
        finally {
            this.closeStateManager(true);
        }
        this.taskClosed = true;
    }

    public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
        if (records.isEmpty()) {
            return Collections.emptyList();
        }
        this.log.trace("Updating standby replicas of its state store for partition [{}]", (Object)partition);
        long limit = this.offsetLimits.getOrDefault(partition, Long.MAX_VALUE);
        long lastOffset = -1L;
        ArrayList<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(records.size());
        ArrayList<ConsumerRecord<byte[], byte[]>> remainingRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        for (ConsumerRecord<byte[], byte[]> record : records) {
            if (record.offset() >= limit && this.updateOffsetLimits) {
                limit = this.updateOffsetLimits(partition);
            }
            if (record.offset() < limit) {
                restoreRecords.add(record);
                lastOffset = record.offset();
                continue;
            }
            remainingRecords.add(record);
        }
        if (!restoreRecords.isEmpty()) {
            this.stateMgr.updateStandbyStates(partition, restoreRecords, lastOffset);
            this.commitNeeded = true;
        }
        return remainingRecords;
    }

    Map<TopicPartition, Long> checkpointedOffsets() {
        return this.checkpointedOffsets;
    }

    private long updateOffsetLimits(TopicPartition partition) {
        if (!this.offsetLimits.containsKey(partition)) {
            throw new IllegalArgumentException("Topic is not both a source and a changelog: " + partition);
        }
        Map<TopicPartition, Long> newLimits = this.committedOffsetForPartitions(this.offsetLimits.keySet());
        for (Map.Entry<TopicPartition, Long> newlimit : newLimits.entrySet()) {
            Long previousLimit = this.offsetLimits.get(newlimit.getKey());
            if (previousLimit == null || previousLimit <= newlimit.getValue()) continue;
            throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. New limit: " + newlimit.getValue() + ". Previous limit: " + previousLimit);
        }
        this.offsetLimits.putAll(newLimits);
        this.updateOffsetLimits = false;
        return this.offsetLimits.get(partition);
    }

    private Map<TopicPartition, Long> committedOffsetForPartitions(Set<TopicPartition> partitions) {
        try {
            return this.consumer.committed(partitions).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : ((OffsetAndMetadata)e.getValue()).offset()));
        }
        catch (AuthorizationException e2) {
            throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", this.id, partitions), e2);
        }
        catch (WakeupException e3) {
            throw e3;
        }
        catch (KafkaException e4) {
            throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", this.id, partitions), e4);
        }
    }

    void allowUpdateOfOffsetLimit() {
        this.updateOffsetLimits = true;
    }
}

