/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.EncodedOffsetPair;
import io.confluent.parallelconsumer.EncodingNotSupportedException;
import io.confluent.parallelconsumer.InternalRuntimeError;
import io.confluent.parallelconsumer.OffsetDecodingError;
import io.confluent.parallelconsumer.OffsetEncoding;
import io.confluent.parallelconsumer.OffsetSimpleSerialisation;
import io.confluent.parallelconsumer.OffsetSimultaneousEncoder;
import io.confluent.parallelconsumer.WorkManager;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniSets;

public class OffsetMapCodecManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(OffsetMapCodecManager.class);
    public static int DefaultMaxMetadataSize = 4096;
    public static final Charset CHARSET_TO_USE = StandardCharsets.UTF_8;
    private final WorkManager<K, V> wm;
    Consumer<K, V> consumer;
    public static Optional<OffsetEncoding> forcedCodec = Optional.empty();

    public OffsetMapCodecManager(WorkManager<K, V> wm, Consumer<K, V> consumer) {
        this.wm = wm;
        this.consumer = consumer;
    }

    void loadAllAssignedOffsetMap() {
        Set assignment = this.consumer.assignment();
        this.loadOffsetMapForPartition(assignment);
    }

    void loadOffsetMapForPartition(Set<TopicPartition> assignment) {
        Map committed = null;
        int attempts = 0;
        while (committed == null) {
            WakeupException lastWakeupException = null;
            try {
                committed = this.consumer.committed(assignment);
            }
            catch (WakeupException exception) {
                log.warn("Woken up trying to get assignment", (Throwable)exception);
                lastWakeupException = exception;
            }
            if (++attempts <= 10) continue;
            throw new InternalRuntimeError("Failed to get partition assignment - continuously woken up.", lastWakeupException);
        }
        committed.forEach((tp, offsetAndMeta) -> {
            if (offsetAndMeta != null) {
                long offset = offsetAndMeta.offset();
                String metadata = offsetAndMeta.metadata();
                try {
                    this.loadOffsetMetadataPayload(offset, (TopicPartition)tp, metadata);
                }
                catch (OffsetDecodingError offsetDecodingError) {
                    log.error("Error decoding offsets from assigned partition, dropping offset map (will replay previously completed messages - partition: {}, data: {})", new Object[]{tp, offsetAndMeta, offsetDecodingError});
                }
            }
        });
    }

    static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(long finalBaseComittedOffsetForPartition, String base64EncodedOffsetPayload) throws OffsetDecodingError {
        byte[] decodedBytes;
        try {
            decodedBytes = OffsetSimpleSerialisation.decodeBase64(base64EncodedOffsetPayload);
        }
        catch (IllegalArgumentException a) {
            throw new OffsetDecodingError(StringUtils.msg("Error decoding offset metadata, input was: {}", base64EncodedOffsetPayload), a);
        }
        return OffsetMapCodecManager.decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decodedBytes);
    }

    void loadOffsetMetadataPayload(long startOffset, TopicPartition tp, String offsetMetadataPayload) throws OffsetDecodingError {
        HighestOffsetAndIncompletes incompletes = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(startOffset, offsetMetadataPayload);
        this.wm.raisePartitionHighWaterMark(incompletes.getHighestSeenOffset(), tp);
        Set<Long> incompleteOffsets = incompletes.getIncompleteOffsets();
        this.wm.partitionIncompleteOffsets.put(tp, incompleteOffsets);
        log.debug("Loaded incomplete offsets from offset payload {}", (Object)incompletes);
    }

    String makeOffsetMetadataPayload(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
        String offsetMap = this.serialiseIncompleteOffsetMapToBase64(finalOffsetForPartition, tp, incompleteOffsets);
        return offsetMap;
    }

    String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
        byte[] compressedEncoding = this.encodeOffsetsCompressed(finalOffsetForPartition, tp, incompleteOffsets);
        String b64 = OffsetSimpleSerialisation.base64(compressedEncoding);
        return b64;
    }

    byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
        Long nextExpectedOffset = this.wm.partitionOffsetHighWaterMarks.get(tp) + 1L;
        OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, nextExpectedOffset, incompleteOffsets).invoke();
        if (forcedCodec.isPresent()) {
            OffsetEncoding forcedOffsetEncoding = forcedCodec.get();
            log.debug("Forcing use of {}, for testing", (Object)forcedOffsetEncoding);
            Map<OffsetEncoding, byte[]> encodingMap = simultaneousEncoder.getEncodingMap();
            byte[] bytes = encodingMap.get((Object)forcedOffsetEncoding);
            if (bytes == null) {
                throw new EncodingNotSupportedException(StringUtils.msg("Can't force an encoding that hasn't been run: {}", new Object[]{forcedOffsetEncoding}));
            }
            return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
        }
        return simultaneousEncoder.packSmallest();
    }

    static HighestOffsetAndIncompletes decodeCompressedOffsets(long finalOffsetForPartition, byte[] decodedBytes) {
        if (decodedBytes.length == 0) {
            return HighestOffsetAndIncompletes.of(finalOffsetForPartition, UniSets.of());
        }
        EncodedOffsetPair result = EncodedOffsetPair.unwrap(decodedBytes);
        HighestOffsetAndIncompletes incompletesTuple = result.getDecodedIncompletes(finalOffsetForPartition);
        Set<Long> incompletes = incompletesTuple.getIncompleteOffsets();
        long highWater = incompletesTuple.getHighestSeenOffset();
        return HighestOffsetAndIncompletes.of(highWater, incompletes);
    }

    String incompletesToBitmapString(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
        StringBuilder runLengthString = new StringBuilder();
        Long lowWaterMark = finalOffsetForPartition;
        Long highWaterMark = this.wm.partitionOffsetHighWaterMarks.get(tp);
        long end = highWaterMark - lowWaterMark;
        for (Integer relativeOffset : Range.range(end)) {
            long offset = lowWaterMark + (long)relativeOffset.intValue();
            if (incompleteOffsets.contains(offset)) {
                runLengthString.append("o");
                continue;
            }
            runLengthString.append("x");
        }
        return runLengthString.toString();
    }

    static Set<Long> bitmapStringToIncomplete(long baseOffset, String inputBitmapString) {
        HashSet<Long> incompleteOffsets = new HashSet<Long>();
        long longLength = inputBitmapString.length();
        Range.range(longLength).forEach(i -> {
            char bit = inputBitmapString.charAt((int)i);
            if (bit == 'o') {
                incompleteOffsets.add(baseOffset + (long)i.intValue());
            } else if (bit == 'x') {
                log.trace("Dropping completed offset");
            } else {
                throw new IllegalArgumentException("Invalid encoding - unexpected char: " + bit);
            }
        });
        return incompleteOffsets;
    }

    static final class HighestOffsetAndIncompletes {
        private final long highestSeenOffset;
        private final Set<Long> incompleteOffsets;

        public static HighestOffsetAndIncompletes of(long highestSeenOffset, Set<Long> incompleteOffsets) {
            return new HighestOffsetAndIncompletes(highestSeenOffset, incompleteOffsets);
        }

        public HighestOffsetAndIncompletes(long highestSeenOffset, Set<Long> incompleteOffsets) {
            this.highestSeenOffset = highestSeenOffset;
            this.incompleteOffsets = incompleteOffsets;
        }

        public long getHighestSeenOffset() {
            return this.highestSeenOffset;
        }

        public Set<Long> getIncompleteOffsets() {
            return this.incompleteOffsets;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof HighestOffsetAndIncompletes)) {
                return false;
            }
            HighestOffsetAndIncompletes other = (HighestOffsetAndIncompletes)o;
            if (this.getHighestSeenOffset() != other.getHighestSeenOffset()) {
                return false;
            }
            Set<Long> this$incompleteOffsets = this.getIncompleteOffsets();
            Set<Long> other$incompleteOffsets = other.getIncompleteOffsets();
            return !(this$incompleteOffsets == null ? other$incompleteOffsets != null : !((Object)this$incompleteOffsets).equals(other$incompleteOffsets));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $highestSeenOffset = this.getHighestSeenOffset();
            result = result * 59 + (int)($highestSeenOffset >>> 32 ^ $highestSeenOffset);
            Set<Long> $incompleteOffsets = this.getIncompleteOffsets();
            result = result * 59 + ($incompleteOffsets == null ? 43 : ((Object)$incompleteOffsets).hashCode());
            return result;
        }

        public String toString() {
            return "OffsetMapCodecManager.HighestOffsetAndIncompletes(highestSeenOffset=" + this.getHighestSeenOffset() + ", incompleteOffsets=" + this.getIncompleteOffsets() + ")";
        }
    }
}

