/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.EncodedOffsetPair;
import io.confluent.parallelconsumer.EncodingNotSupportedException;
import io.confluent.parallelconsumer.OffsetDecodingError;
import io.confluent.parallelconsumer.OffsetEncoding;
import io.confluent.parallelconsumer.OffsetSimpleSerialisation;
import io.confluent.parallelconsumer.OffsetSimultaneousEncoder;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.WorkManager;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniSets;

public class OffsetMapCodecManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(OffsetMapCodecManager.class);
    public static final int DefaultMaxMetadataSize = 4096;
    public static final Charset CHARSET_TO_USE = StandardCharsets.UTF_8;
    private final WorkManager<K, V> wm;
    Consumer<K, V> consumer;
    static Optional<OffsetEncoding> forcedCodec = Optional.empty();

    public OffsetMapCodecManager(WorkManager<K, V> wm, Consumer<K, V> consumer) {
        this.wm = wm;
        this.consumer = consumer;
    }

    void loadAllAssignedOffsetMap() {
        Set assignment = this.consumer.assignment();
        this.loadOffsetMapForPartition(assignment);
    }

    void loadOffsetMapForPartition(Set<TopicPartition> assignment) {
        Map committed = this.consumer.committed(assignment);
        committed.forEach((tp, offsetAndMeta) -> {
            if (offsetAndMeta != null) {
                long offset = offsetAndMeta.offset();
                String metadata = offsetAndMeta.metadata();
                try {
                    this.loadOffsetMetadataPayload(offset, (TopicPartition)tp, metadata);
                }
                catch (OffsetDecodingError offsetDecodingError) {
                    log.error("Error decoding offsets from assigned partition, dropping offset map (will replay previously completed messages - partition: {}, data: {})", new Object[]{tp, offsetAndMeta, offsetDecodingError});
                }
            }
        });
    }

    static ParallelConsumer.Tuple<Long, TreeSet<Long>> deserialiseIncompleteOffsetMapFromBase64(long finalBaseComittedOffsetForPartition, String incompleteOffsetMap) throws OffsetDecodingError {
        byte[] decode;
        try {
            decode = OffsetSimpleSerialisation.decodeBase64(incompleteOffsetMap);
        }
        catch (IllegalArgumentException a) {
            throw new OffsetDecodingError(StringUtils.msg("Error decoding offset metadata, input was: {}", incompleteOffsetMap), a);
        }
        ParallelConsumer.Tuple<Long, Set<Long>> incompleteOffsets = OffsetMapCodecManager.decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decode);
        TreeSet longs = new TreeSet(incompleteOffsets.getRight());
        return ParallelConsumer.Tuple.pairOf(incompleteOffsets.getLeft(), longs);
    }

    void loadOffsetMetadataPayload(long startOffset, TopicPartition tp, String offsetMetadataPayload) throws OffsetDecodingError {
        ParallelConsumer.Tuple<Long, TreeSet<Long>> incompletes = OffsetMapCodecManager.deserialiseIncompleteOffsetMapFromBase64(startOffset, offsetMetadataPayload);
        this.wm.raisePartitionHighWaterMark(incompletes.getLeft(), tp);
        this.wm.partitionIncompleteOffsets.put(tp, incompletes.getRight());
    }

    String makeOffsetMetadataPayload(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
        String offsetMap = this.serialiseIncompleteOffsetMapToBase64(finalOffsetForPartition, tp, incompleteOffsets);
        return offsetMap;
    }

    String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
        byte[] compressedEncoding = this.encodeOffsetsCompressed(finalOffsetForPartition, tp, incompleteOffsets);
        String b64 = OffsetSimpleSerialisation.base64(compressedEncoding);
        return b64;
    }

    byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
        Long nextExpectedOffset = this.wm.partitionOffsetHighWaterMarks.get(tp) + 1L;
        OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, nextExpectedOffset, incompleteOffsets).invoke();
        if (forcedCodec.isPresent()) {
            OffsetEncoding forcedOffsetEncoding = forcedCodec.get();
            log.warn("Forcing use of {}, for testing", (Object)forcedOffsetEncoding);
            Map<OffsetEncoding, byte[]> encodingMap = simultaneousEncoder.getEncodingMap();
            byte[] bytes = encodingMap.get((Object)forcedOffsetEncoding);
            if (bytes == null) {
                throw new EncodingNotSupportedException(StringUtils.msg("Can't force an encoding that hasn't been run: {}", new Object[]{forcedOffsetEncoding}));
            }
            return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
        }
        return simultaneousEncoder.packSmallest();
    }

    static ParallelConsumer.Tuple<Long, Set<Long>> decodeCompressedOffsets(long finalOffsetForPartition, byte[] s) {
        if (s.length == 0) {
            return ParallelConsumer.Tuple.pairOf(finalOffsetForPartition, UniSets.of());
        }
        EncodedOffsetPair result = EncodedOffsetPair.unwrap(s);
        ParallelConsumer.Tuple<Long, Set<Long>> incompletesTuple = result.getDecodedIncompletes(finalOffsetForPartition);
        Set<Long> incompletes = incompletesTuple.getRight();
        long highWater = incompletesTuple.getLeft();
        ParallelConsumer.Tuple<Long, Set<Long>> tuple = ParallelConsumer.Tuple.pairOf(highWater, incompletes);
        return tuple;
    }

    String incompletesToBitmapString(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
        StringBuilder runLengthString = new StringBuilder();
        Long lowWaterMark = finalOffsetForPartition;
        Long highWaterMark = this.wm.partitionOffsetHighWaterMarks.get(tp);
        long end = highWaterMark - lowWaterMark;
        for (Integer relativeOffset : Range.range(end)) {
            long offset = lowWaterMark + (long)relativeOffset.intValue();
            if (incompleteOffsets.contains(offset)) {
                runLengthString.append("o");
                continue;
            }
            runLengthString.append("x");
        }
        return runLengthString.toString();
    }

    static Set<Long> bitmapStringToIncomplete(long baseOffset, String inputBitmapString) {
        HashSet<Long> incompleteOffsets = new HashSet<Long>();
        long longLength = inputBitmapString.length();
        Range.range(longLength).forEach(i -> {
            char bit = inputBitmapString.charAt((int)i);
            if (bit == 'o') {
                incompleteOffsets.add(baseOffset + (long)i.intValue());
            } else if (bit == 'x') {
                log.trace("Dropping completed offset");
            } else {
                throw new IllegalArgumentException("Invalid encoding - unexpected char: " + bit);
            }
        });
        return incompleteOffsets;
    }
}

