/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.LegacySubscriptionInfoSerde;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class StreamsUpgradeTest {
    private static final ConsumerPartitionAssignor.RebalanceProtocol REBALANCE_PROTOCOL = ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE;

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but no provided: ");
        }
        String propFileName = args.length > 0 ? args[0] : null;
        Properties streamsProperties = Utils.loadProps((String)propFileName);
        System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
        System.out.println("props=" + streamsProperties);
        KafkaStreams streams = StreamsUpgradeTest.buildStreams(streamsProperties);
        streams.start();
        Exit.addShutdownHook((String)"streams-shutdown-hook", () -> {
            System.out.println("closing Kafka Streams instance");
            System.out.flush();
            streams.close();
            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
            System.out.flush();
        });
    }

    public static KafkaStreams buildStreams(Properties streamsProperties) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream dataStream = builder.stream("data");
        dataStream.process(SmokeTestUtil.printProcessorSupplier("data"), new String[0]);
        dataStream.to("echo");
        Properties config = new Properties();
        config.setProperty("application.id", "StreamsUpgradeTest");
        config.put("commit.interval.ms", (Object)1000);
        DefaultKafkaClientSupplier kafkaClientSupplier = streamsProperties.containsKey("test.future.metadata") ? new FutureKafkaClientSupplier() : new DefaultKafkaClientSupplier();
        config.putAll((Map<?, ?>)streamsProperties);
        return new KafkaStreams(builder.build(), config, (KafkaClientSupplier)kafkaClientSupplier);
    }

    private static class FutureAssignmentInfo
    extends AssignmentInfo {
        private final boolean bumpUsedVersion;
        private final boolean bumpSupportedVersion;
        final ByteBuffer originalUserMetadata;

        private FutureAssignmentInfo(boolean bumpUsedVersion, boolean bumpSupportedVersion, ByteBuffer bytes) {
            super(6, 6);
            this.bumpUsedVersion = bumpUsedVersion;
            this.bumpSupportedVersion = bumpSupportedVersion;
            this.originalUserMetadata = bytes;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public ByteBuffer encode() {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            this.originalUserMetadata.rewind();
            try (DataOutputStream out = new DataOutputStream(baos);){
                if (this.bumpUsedVersion) {
                    this.originalUserMetadata.getInt();
                    out.writeInt(7);
                } else {
                    out.writeInt(this.originalUserMetadata.getInt());
                }
                if (this.bumpSupportedVersion) {
                    this.originalUserMetadata.getInt();
                    out.writeInt(7);
                }
                try {
                    while (true) {
                        out.write(this.originalUserMetadata.get());
                    }
                }
                catch (BufferUnderflowException bufferUnderflowException) {
                    out.flush();
                    out.close();
                    ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
                    return byteBuffer;
                }
            }
            catch (IOException ex) {
                throw new TaskAssignmentException("Failed to encode AssignmentInfo", (Throwable)ex);
            }
        }
    }

    private static class FutureSubscriptionInfo {
        private final int version;
        private final UUID processId;
        private final Set<TaskId> prevTasks;
        private final Set<TaskId> standbyTasks;
        private final String userEndPoint;

        FutureSubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
            this.version = version;
            this.processId = processId;
            this.prevTasks = prevTasks;
            this.standbyTasks = standbyTasks;
            this.userEndPoint = userEndPoint;
            if (version <= 6) {
                throw new IllegalArgumentException("this class can't be used with version " + version);
            }
        }

        public ByteBuffer encode() {
            ByteBuffer buf = this.encodeFutureVersion();
            buf.rewind();
            return buf;
        }

        private ByteBuffer encodeFutureVersion() {
            byte[] endPointBytes = LegacySubscriptionInfoSerde.prepareUserEndPoint(this.userEndPoint);
            ByteBuffer buf = ByteBuffer.allocate(28 + this.prevTasks.size() * 8 + 4 + this.standbyTasks.size() * 8 + 4 + endPointBytes.length);
            buf.putInt(this.version);
            buf.putInt(this.version);
            LegacySubscriptionInfoSerde.encodeClientUUID(buf, this.processId);
            LegacySubscriptionInfoSerde.encodeTasks(buf, this.prevTasks);
            LegacySubscriptionInfoSerde.encodeTasks(buf, this.standbyTasks);
            LegacySubscriptionInfoSerde.encodeUserEndPoint(buf, endPointBytes);
            buf.rewind();
            return buf;
        }
    }

    public static class FutureStreamsPartitionAssignor
    extends StreamsPartitionAssignor {
        private AtomicInteger usedSubscriptionMetadataVersionPeek;

        public FutureStreamsPartitionAssignor() {
            this.usedSubscriptionMetadataVersion = 7;
        }

        public void configure(Map<String, ?> configs) {
            Object o = configs.get("test.future.metadata");
            this.usedSubscriptionMetadataVersionPeek = o instanceof AtomicInteger ? (AtomicInteger)o : new AtomicInteger();
            configs.remove("test.future.metadata");
            super.configure(configs);
        }

        public ByteBuffer subscriptionUserData(Set<String> topics) {
            TaskManager taskManager = this.taskManger();
            Set standbyTasks = taskManager.cachedTasksIds();
            Set activeTasks = FutureStreamsPartitionAssignor.prepareForSubscription((TaskManager)taskManager, topics, (Set)standbyTasks, (ConsumerPartitionAssignor.RebalanceProtocol)REBALANCE_PROTOCOL);
            if (this.usedSubscriptionMetadataVersion <= 6) {
                return new SubscriptionInfo(this.usedSubscriptionMetadataVersion, 7, taskManager.processId(), activeTasks, standbyTasks, this.userEndPoint()).encode();
            }
            return new FutureSubscriptionInfo(this.usedSubscriptionMetadataVersion, taskManager.processId(), activeTasks, standbyTasks, this.userEndPoint()).encode();
        }

        public void onAssignment(ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata metadata) {
            try {
                super.onAssignment(assignment, metadata);
                this.usedSubscriptionMetadataVersionPeek.set(this.usedSubscriptionMetadataVersion);
                return;
            }
            catch (TaskAssignmentException taskAssignmentException) {
                int usedVersion;
                ByteBuffer data = assignment.userData();
                data.rewind();
                try (DataInputStream in = new DataInputStream((InputStream)new ByteBufferInputStream(data));){
                    usedVersion = in.readInt();
                }
                catch (IOException ex) {
                    throw new TaskAssignmentException("Failed to decode AssignmentInfo", (Throwable)ex);
                }
                if (usedVersion > 7) {
                    throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + 6 + 1);
                }
                AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData().putInt(0, 6));
                if (this.maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) {
                    this.setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
                    this.usedSubscriptionMetadataVersionPeek.set(this.usedSubscriptionMetadataVersion);
                }
                ArrayList partitions = new ArrayList(assignment.partitions());
                partitions.sort(PARTITION_COMPARATOR);
                HashMap activeTasks = new HashMap();
                HashMap topicToPartitionInfo = new HashMap();
                HashMap partitionsToTaskId = new HashMap();
                FutureStreamsPartitionAssignor.processVersionTwoAssignment((String)"test ", (AssignmentInfo)info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId);
                Map partitionsByHost = info.partitionsByHost();
                TaskManager taskManager = this.taskManger();
                taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
                taskManager.setHostPartitionMappings(partitionsByHost, info.standbyPartitionByHost());
                taskManager.setPartitionsToTaskId(partitionsToTaskId);
                taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
                taskManager.updateSubscriptionsFromAssignment(partitions);
                taskManager.setRebalanceInProgress(false);
                this.usedSubscriptionMetadataVersionPeek.set(this.usedSubscriptionMetadataVersion);
                return;
            }
        }

        public ConsumerPartitionAssignor.GroupAssignment assign(Cluster metadata, ConsumerPartitionAssignor.GroupSubscription groupSubscription) {
            boolean bumpSupportedVersion;
            Map subscriptions = groupSubscription.groupSubscription();
            HashSet<Integer> supportedVersions = new HashSet<Integer>();
            for (Map.Entry entry : subscriptions.entrySet()) {
                ConsumerPartitionAssignor.Subscription subscription = (ConsumerPartitionAssignor.Subscription)entry.getValue();
                SubscriptionInfo info = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
                supportedVersions.add(info.latestSupportedVersion());
            }
            Map assignment = null;
            HashMap downgradedSubscriptions = new HashMap();
            for (ConsumerPartitionAssignor.Subscription subscription : subscriptions.values()) {
                SubscriptionInfo info = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
                if (info.version() >= 7) continue;
                assignment = super.assign(metadata, new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment();
                break;
            }
            boolean bumpUsedVersion = false;
            if (assignment != null) {
                bumpSupportedVersion = supportedVersions.size() == 1 && (Integer)supportedVersions.iterator().next() == 7;
            } else {
                for (Map.Entry entry : subscriptions.entrySet()) {
                    ConsumerPartitionAssignor.Subscription subscription = (ConsumerPartitionAssignor.Subscription)entry.getValue();
                    SubscriptionInfo info = SubscriptionInfo.decode((ByteBuffer)subscription.userData().putInt(0, 6).putInt(4, 6));
                    downgradedSubscriptions.put(entry.getKey(), new ConsumerPartitionAssignor.Subscription(subscription.topics(), new SubscriptionInfo(6, 6, info.processId(), info.prevTasks(), info.standbyTasks(), info.userEndPoint()).encode(), subscription.ownedPartitions()));
                }
                assignment = super.assign(metadata, new ConsumerPartitionAssignor.GroupSubscription(downgradedSubscriptions)).groupAssignment();
                bumpUsedVersion = true;
                bumpSupportedVersion = true;
            }
            HashMap newAssignment = new HashMap();
            for (Map.Entry entry : assignment.entrySet()) {
                ConsumerPartitionAssignor.Assignment singleAssignment = (ConsumerPartitionAssignor.Assignment)entry.getValue();
                newAssignment.put(entry.getKey(), new ConsumerPartitionAssignor.Assignment(singleAssignment.partitions(), new FutureAssignmentInfo(bumpUsedVersion, bumpSupportedVersion, singleAssignment.userData()).encode()));
            }
            return new ConsumerPartitionAssignor.GroupAssignment(newAssignment);
        }
    }

    private static class FutureKafkaClientSupplier
    extends DefaultKafkaClientSupplier {
        private FutureKafkaClientSupplier() {
        }

        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            config.put("partition.assignment.strategy", FutureStreamsPartitionAssignor.class.getName());
            return new KafkaConsumer(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        }
    }
}

