/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag(value="integration")
@Timeout(value=600L)
public class TaskAssignorIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Test
    public void shouldProperlyConfigureTheAssignor(TestInfo testInfo) throws NoSuchFieldException, IllegalAccessException {
        String testId = IntegrationTestUtils.safeUniqueTestName(testInfo);
        String appId = "appId_" + testId;
        String inputTopic = "input" + testId;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
        AtomicInteger compilerDefeatingReference = new AtomicInteger(0);
        AssignorConfiguration.AssignmentListener configuredAssignmentListener = stable -> compilerDefeatingReference.incrementAndGet();
        Properties properties = Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"num.standby.replicas", (Object)"5"), Utils.mkEntry((Object)"acceptable.recovery.lag", (Object)"6"), Utils.mkEntry((Object)"max.warmup.replicas", (Object)"7"), Utils.mkEntry((Object)"probing.rebalance.interval.ms", (Object)"480000"), Utils.mkEntry((Object)"__assignment.listener__", (Object)configuredAssignmentListener), Utils.mkEntry((Object)"internal.task.assignor.class", (Object)MyLegacyTaskAssignor.class.getName())}));
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(inputTopic);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            kafkaStreams.start();
            Field threads = KafkaStreams.class.getDeclaredField("threads");
            threads.setAccessible(true);
            List streamThreads = (List)threads.get(kafkaStreams);
            StreamThread streamThread = (StreamThread)streamThreads.get(0);
            Field mainConsumer = StreamThread.class.getDeclaredField("mainConsumer");
            mainConsumer.setAccessible(true);
            KafkaConsumer parentConsumer = (KafkaConsumer)mainConsumer.get(streamThread);
            Field delegate = KafkaConsumer.class.getDeclaredField("delegate");
            delegate.setAccessible(true);
            Consumer consumer = (Consumer)delegate.get(parentConsumer);
            MatcherAssert.assertThat((Object)consumer, (Matcher)Matchers.instanceOf(ClassicKafkaConsumer.class));
            Field assignors = ClassicKafkaConsumer.class.getDeclaredField("assignors");
            assignors.setAccessible(true);
            List consumerPartitionAssignors = (List)assignors.get(consumer);
            StreamsPartitionAssignor streamsPartitionAssignor = (StreamsPartitionAssignor)consumerPartitionAssignors.get(0);
            Field assignmentConfigs = StreamsPartitionAssignor.class.getDeclaredField("assignmentConfigs");
            assignmentConfigs.setAccessible(true);
            AssignmentConfigs configs = (AssignmentConfigs)assignmentConfigs.get(streamsPartitionAssignor);
            Field assignmentListenerField = StreamsPartitionAssignor.class.getDeclaredField("assignmentListener");
            assignmentListenerField.setAccessible(true);
            AssignorConfiguration.AssignmentListener actualAssignmentListener = (AssignorConfiguration.AssignmentListener)assignmentListenerField.get(streamsPartitionAssignor);
            Field taskAssignorSupplierField = StreamsPartitionAssignor.class.getDeclaredField("legacyTaskAssignorSupplier");
            taskAssignorSupplierField.setAccessible(true);
            Supplier taskAssignorSupplier = (Supplier)taskAssignorSupplierField.get(streamsPartitionAssignor);
            LegacyTaskAssignor taskAssignor = (LegacyTaskAssignor)taskAssignorSupplier.get();
            MatcherAssert.assertThat((Object)configs.numStandbyReplicas(), (Matcher)Matchers.is((Object)5));
            MatcherAssert.assertThat((Object)configs.acceptableRecoveryLag(), (Matcher)Matchers.is((Object)6L));
            MatcherAssert.assertThat((Object)configs.maxWarmupReplicas(), (Matcher)Matchers.is((Object)7));
            MatcherAssert.assertThat((Object)configs.probingRebalanceIntervalMs(), (Matcher)Matchers.is((Object)480000L));
            MatcherAssert.assertThat((Object)actualAssignmentListener, (Matcher)Matchers.sameInstance((Object)configuredAssignmentListener));
            MatcherAssert.assertThat((Object)taskAssignor, (Matcher)Matchers.instanceOf(MyLegacyTaskAssignor.class));
        }
    }

    public static final class MyLegacyTaskAssignor
    extends HighAvailabilityTaskAssignor
    implements LegacyTaskAssignor {
    }
}

