/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.offsets;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.confluent.parallelconsumer.offsets.EncodedOffsetPair;
import io.confluent.parallelconsumer.offsets.NoEncodingPossibleException;
import io.confluent.parallelconsumer.offsets.OffsetDecodingError;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetSimpleSerialisation;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.PartitionStateManager;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetMapCodecManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(OffsetMapCodecManager.class);
    public static final String METADATA_DATA_SIZE_RESOURCE_LOCK = "Value doesn't matter, just needs a constant";
    public static int DefaultMaxMetadataSize = 4096;
    public static final Charset CHARSET_TO_USE = StandardCharsets.UTF_8;
    private final PCModule module;
    private Timer offsetEncodingTimer;
    private final Map<OffsetEncoding, Counter> encodingCounters = new HashMap<OffsetEncoding, Counter>();
    private final PCMetrics pcMetrics;
    private static ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy errorPolicy = ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy.FAIL;
    public static Optional<OffsetEncoding> forcedCodec = Optional.empty();

    public OffsetMapCodecManager(PCModule<K, V> module) {
        this.module = module;
        if (module != null) {
            errorPolicy = module.options().getInvalidOffsetMetadataPolicy();
        }
        this.pcMetrics = module.pcMetrics();
        this.initMeters();
    }

    private void initMeters() {
        this.offsetEncodingTimer = this.pcMetrics.getTimerFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_TIME, new Tag[0]);
    }

    public Map<TopicPartition, PartitionState<K, V>> loadPartitionStateForAssignment(Collection<TopicPartition> assignment) {
        Map partitionLastCommittedOffsets = null;
        int attempts = 0;
        while (partitionLastCommittedOffsets == null) {
            WakeupException lastWakeupException = null;
            try {
                partitionLastCommittedOffsets = this.module.consumer().committed(new HashSet<TopicPartition>(assignment));
            }
            catch (WakeupException exception) {
                log.debug("Woken up trying to get assignment", (Throwable)exception);
                lastWakeupException = exception;
            }
            if (++attempts <= 10) continue;
            throw new InternalRuntimeException("Failed to get partition assignment - continuously woken up.", lastWakeupException);
        }
        HashMap partitionStates = new HashMap();
        partitionLastCommittedOffsets.forEach((tp, offsetAndMeta) -> {
            if (offsetAndMeta != null) {
                try {
                    PartitionState<K, V> state = this.decodePartitionState((TopicPartition)tp, (OffsetAndMetadata)offsetAndMeta);
                    partitionStates.put((TopicPartition)tp, state);
                }
                catch (OffsetDecodingError offsetDecodingError) {
                    log.error("Error decoding offsets from assigned partition, dropping offset map (will replay previously completed messages - partition: {}, data: {})", new Object[]{tp, offsetAndMeta, offsetDecodingError});
                }
            }
        });
        assignment.stream().filter(topicPartition -> !partitionStates.containsKey(topicPartition)).forEach(topicPartition -> {
            PartitionStateManager psm = this.module.workManager().getPm();
            Long epoch = psm.getEpochOfPartition((TopicPartition)topicPartition);
            PartitionState defaultEntry = new PartitionState(epoch, this.module, (TopicPartition)topicPartition, HighestOffsetAndIncompletes.of());
            partitionStates.put((TopicPartition)topicPartition, defaultEntry);
        });
        return partitionStates;
    }

    private HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(OffsetAndMetadata offsetData) throws OffsetDecodingError {
        return OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(offsetData.offset(), offsetData.metadata());
    }

    public static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(long committedOffsetForPartition, String base64EncodedOffsetPayload) throws OffsetDecodingError {
        byte[] decodedBytes;
        try {
            decodedBytes = OffsetSimpleSerialisation.decodeBase64(base64EncodedOffsetPayload);
        }
        catch (IllegalArgumentException a) {
            throw new OffsetDecodingError(StringUtils.msg("Error decoding offset metadata, input was: {}", base64EncodedOffsetPayload), a);
        }
        return OffsetMapCodecManager.decodeCompressedOffsets(committedOffsetForPartition, decodedBytes);
    }

    PartitionState<K, V> decodePartitionState(TopicPartition tp, OffsetAndMetadata offsetData) throws OffsetDecodingError {
        HighestOffsetAndIncompletes incompletes = this.deserialiseIncompleteOffsetMapFromBase64(offsetData);
        log.debug("Loaded incomplete offsets from offset payload {}", (Object)incompletes);
        Long epoch = this.module.workManager().getPm().getEpochOfPartition(tp);
        return new PartitionState(epoch, this.module, tp, incompletes);
    }

    public String makeOffsetMetadataPayload(long baseOffsetForPartition, PartitionState<K, V> state) throws NoEncodingPossibleException {
        String offsetMap = this.serialiseIncompleteOffsetMapToBase64(baseOffsetForPartition, state);
        return offsetMap;
    }

    String serialiseIncompleteOffsetMapToBase64(long baseOffsetForPartition, PartitionState<K, V> state) throws NoEncodingPossibleException {
        byte[] compressedEncoding = this.encodeOffsetsCompressed(baseOffsetForPartition, state);
        String b64 = OffsetSimpleSerialisation.base64(compressedEncoding);
        return b64;
    }

    byte[] encodeOffsetsCompressed(long baseOffsetForPartition, PartitionState<K, V> partitionState) throws NoEncodingPossibleException {
        SortedSet<Long> incompleteOffsets = partitionState.getIncompleteOffsetsBelowHighestSucceeded();
        long highestSucceeded = partitionState.getOffsetHighestSucceeded();
        if (log.isDebugEnabled()) {
            log.debug("Encoding partition {}, highest succeeded {}, incomplete offsets to encode {}", new Object[]{partitionState.getTp(), highestSucceeded, incompleteOffsets});
        }
        OffsetSimultaneousEncoder simultaneousEncoder = null;
        try {
            simultaneousEncoder = new OffsetSimultaneousEncoder(baseOffsetForPartition, highestSucceeded, incompleteOffsets);
            this.offsetEncodingTimer.recordCallable(simultaneousEncoder::invoke);
        }
        catch (Exception e) {
            throw new InternalRuntimeException("Error encoding offsets", e);
        }
        if (forcedCodec.isPresent()) {
            OffsetEncoding forcedOffsetEncoding = forcedCodec.get();
            log.debug("Forcing use of {}, for testing", (Object)forcedOffsetEncoding);
            this.getCounterMeterForEncoding(forcedOffsetEncoding).increment();
            Map<OffsetEncoding, byte[]> encodingMap = simultaneousEncoder.getEncodingMap();
            byte[] bytes = encodingMap.get((Object)forcedOffsetEncoding);
            if (bytes == null) {
                throw new NoEncodingPossibleException(StringUtils.msg("Can't force an encoding that hasn't been run: {}", new Object[]{forcedOffsetEncoding}));
            }
            return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
        }
        this.getCounterMeterForEncoding(simultaneousEncoder.sortedEncodings.first().getEncoding()).increment();
        return simultaneousEncoder.packSmallest();
    }

    private Counter getCounterMeterForEncoding(OffsetEncoding encoding) {
        Counter counter = this.encodingCounters.get((Object)encoding);
        if (counter == null) {
            counter = this.pcMetrics.getCounterFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_USAGE, Tag.of((String)"encoding", (String)encoding.name()));
            this.encodingCounters.put(encoding, counter);
        }
        return counter;
    }

    static HighestOffsetAndIncompletes decodeCompressedOffsets(long nextExpectedOffset, byte[] decodedBytes) {
        if (decodedBytes.length == 0) {
            long highestSeenOffsetIsThen = nextExpectedOffset - 1L;
            return HighestOffsetAndIncompletes.of(highestSeenOffsetIsThen);
        }
        EncodedOffsetPair result = EncodedOffsetPair.unwrap(decodedBytes);
        return result.getDecodedIncompletes(nextExpectedOffset, errorPolicy);
    }

    public static final class HighestOffsetAndIncompletes {
        private final Optional<Long> highestSeenOffset;
        private final SortedSet<Long> incompleteOffsets;

        public static HighestOffsetAndIncompletes of(long highestSeenOffset) {
            return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new TreeSet<Long>());
        }

        public static HighestOffsetAndIncompletes of(long highestSeenOffset, SortedSet<Long> incompleteOffsets) {
            return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), incompleteOffsets);
        }

        public static HighestOffsetAndIncompletes of() {
            return new HighestOffsetAndIncompletes(Optional.empty(), new TreeSet<Long>());
        }

        public HighestOffsetAndIncompletes(Optional<Long> highestSeenOffset, SortedSet<Long> incompleteOffsets) {
            this.highestSeenOffset = highestSeenOffset;
            this.incompleteOffsets = incompleteOffsets;
        }

        public Optional<Long> getHighestSeenOffset() {
            return this.highestSeenOffset;
        }

        public SortedSet<Long> getIncompleteOffsets() {
            return this.incompleteOffsets;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof HighestOffsetAndIncompletes)) {
                return false;
            }
            HighestOffsetAndIncompletes other = (HighestOffsetAndIncompletes)o;
            Optional<Long> this$highestSeenOffset = this.getHighestSeenOffset();
            Optional<Long> other$highestSeenOffset = other.getHighestSeenOffset();
            if (this$highestSeenOffset == null ? other$highestSeenOffset != null : !((Object)this$highestSeenOffset).equals(other$highestSeenOffset)) {
                return false;
            }
            SortedSet<Long> this$incompleteOffsets = this.getIncompleteOffsets();
            SortedSet<Long> other$incompleteOffsets = other.getIncompleteOffsets();
            return !(this$incompleteOffsets == null ? other$incompleteOffsets != null : !this$incompleteOffsets.equals(other$incompleteOffsets));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Optional<Long> $highestSeenOffset = this.getHighestSeenOffset();
            result = result * 59 + ($highestSeenOffset == null ? 43 : ((Object)$highestSeenOffset).hashCode());
            SortedSet<Long> $incompleteOffsets = this.getIncompleteOffsets();
            result = result * 59 + ($incompleteOffsets == null ? 43 : $incompleteOffsets.hashCode());
            return result;
        }

        public String toString() {
            return "OffsetMapCodecManager.HighestOffsetAndIncompletes(highestSeenOffset=" + this.getHighestSeenOffset() + ", incompleteOffsets=" + this.getIncompleteOffsets() + ")";
        }
    }
}

