/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class HighAvailabilityTaskAssignorIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Test
    public void shouldScaleOutWithWarmupTasksAndInMemoryStores(TestInfo testInfo) throws InterruptedException {
        this.shouldScaleOutWithWarmupTasks(storeName -> Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)storeName)), testInfo);
    }

    @Test
    public void shouldScaleOutWithWarmupTasksAndPersistentStores(TestInfo testInfo) throws InterruptedException {
        this.shouldScaleOutWithWarmupTasks(storeName -> Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)storeName)), testInfo);
    }

    private void shouldScaleOutWithWarmupTasks(Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction, TestInfo testInfo) throws InterruptedException {
        String testId = IntegrationTestUtils.safeUniqueTestName(this.getClass(), testInfo);
        String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
        String inputTopic = "input" + testId;
        Set inputTopicPartitions = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(inputTopic, 0), new TopicPartition(inputTopic, 1)});
        String storeName = "store" + testId;
        String storeChangelog = appId + "-store" + testId + "-changelog";
        Set changelogTopicPartitions = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(storeChangelog, 0), new TopicPartition(storeChangelog, 1)});
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, storeChangelog);
        ReentrantLock assignmentLock = new ReentrantLock();
        AtomicInteger assignmentsCompleted = new AtomicInteger(0);
        ConcurrentHashMap assignmentsStable = new ConcurrentHashMap();
        AtomicBoolean assignmentStable = new AtomicBoolean(false);
        AssignorConfiguration.AssignmentListener assignmentListener = stable -> {
            assignmentLock.lock();
            try {
                int thisAssignmentIndex = assignmentsCompleted.incrementAndGet();
                assignmentsStable.put(thisAssignmentIndex, stable);
                assignmentStable.set(stable);
            }
            finally {
                assignmentLock.unlock();
            }
        };
        StreamsBuilder builder = new StreamsBuilder();
        builder.table(inputTopic, materializedFunction.apply(storeName));
        Topology topology = builder.build();
        int numberOfRecords = 500;
        this.produceTestData(inputTopic, 500);
        try (KafkaStreams kafkaStreams0 = new KafkaStreams(topology, HighAvailabilityTaskAssignorIntegrationTest.streamsProperties(appId, assignmentListener));
             KafkaStreams kafkaStreams1 = new KafkaStreams(topology, HighAvailabilityTaskAssignorIntegrationTest.streamsProperties(appId, assignmentListener));
             KafkaConsumer consumer = new KafkaConsumer(HighAvailabilityTaskAssignorIntegrationTest.getConsumerProperties());){
            kafkaStreams0.start();
            TestUtils.waitForCondition(() -> HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$3(inputTopicPartitions, (Consumer)consumer), (long)120000L, () -> HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$4(inputTopicPartitions, (Consumer)consumer));
            TestUtils.waitForCondition(() -> HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$5(changelogTopicPartitions, (Consumer)consumer), (long)120000L, () -> HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$6(changelogTopicPartitions, (Consumer)consumer));
            final AtomicLong instance1TotalRestored = new AtomicLong(-1L);
            final AtomicLong instance1NumRestored = new AtomicLong(-1L);
            final CountDownLatch restoreCompleteLatch = new CountDownLatch(1);
            kafkaStreams1.setGlobalStateRestoreListener(new StateRestoreListener(){

                public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
                }

                public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
                    instance1NumRestored.accumulateAndGet(numRestored, (prev, restored) -> prev == -1L ? restored : prev + restored);
                }

                public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
                    instance1TotalRestored.accumulateAndGet(totalRestored, (prev, restored) -> prev == -1L ? restored : prev + restored);
                    restoreCompleteLatch.countDown();
                }
            });
            int assignmentsBeforeScaleOut = assignmentsCompleted.get();
            kafkaStreams1.start();
            TestUtils.waitForCondition(() -> {
                assignmentLock.lock();
                try {
                    if (assignmentsCompleted.get() > assignmentsBeforeScaleOut) {
                        HighAvailabilityTaskAssignorIntegrationTest.assertFalseNoRetry((Boolean)assignmentsStable.get(assignmentsBeforeScaleOut + 1), "the first assignment after adding a node should be unstable while we warm up the state.");
                        boolean bl = true;
                        return bl;
                    }
                    boolean bl = false;
                    return bl;
                }
                finally {
                    assignmentLock.unlock();
                }
            }, (long)120000L, (String)("Never saw a first assignment after scale out: " + assignmentsCompleted.get()));
            TestUtils.waitForCondition(assignmentStable::get, (long)120000L, (String)("Assignment hasn't become stable: " + assignmentsCompleted.get() + " Note, if this does fail, check and see if the new instance just failed to catch up within the probing rebalance interval. A full minute should be long enough to read ~500 records in any test environment, but you never know..."));
            restoreCompleteLatch.await();
            MatcherAssert.assertThat((Object)instance1TotalRestored.get(), (Matcher)Matchers.is((Object)0L));
            MatcherAssert.assertThat((Object)instance1NumRestored.get(), (Matcher)Matchers.is((Object)-1L));
        }
    }

    private void produceTestData(String inputTopic, int numberOfRecords) {
        String kilo = HighAvailabilityTaskAssignorIntegrationTest.getKiloByteValue();
        Properties producerProperties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"acks", (Object)"all"), Utils.mkEntry((Object)"key.serializer", (Object)StringSerializer.class.getName()), Utils.mkEntry((Object)"value.serializer", (Object)StringSerializer.class.getName())}));
        try (KafkaProducer producer = new KafkaProducer(producerProperties);){
            for (int i = 0; i < numberOfRecords; ++i) {
                producer.send(new ProducerRecord(inputTopic, (Object)String.valueOf(i), (Object)kilo));
            }
        }
    }

    private static Properties getConsumerProperties() {
        return Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"key.deserializer", (Object)StringDeserializer.class.getName()), Utils.mkEntry((Object)"value.deserializer", (Object)StringDeserializer.class.getName())}));
    }

    private static String getKiloByteValue() {
        StringBuilder kiloBuilder = new StringBuilder(1000);
        for (int i = 0; i < 1000; ++i) {
            kiloBuilder.append('0');
        }
        return kiloBuilder.toString();
    }

    private static void assertFalseNoRetry(boolean assertion, String message) {
        if (assertion) {
            throw new NoRetryException((Throwable)((Object)new AssertionError((Object)message)));
        }
    }

    private static Properties streamsProperties(String appId, AssignorConfiguration.AssignmentListener configuredAssignmentListener) {
        return Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"num.standby.replicas", (Object)"0"), Utils.mkEntry((Object)"acceptable.recovery.lag", (Object)"0"), Utils.mkEntry((Object)"max.warmup.replicas", (Object)"2"), Utils.mkEntry((Object)"probing.rebalance.interval.ms", (Object)"60000"), Utils.mkEntry((Object)"__assignment.listener__", (Object)configuredAssignmentListener), Utils.mkEntry((Object)"commit.interval.ms", (Object)100L), Utils.mkEntry((Object)"internal.task.assignor.class", (Object)HighAvailabilityTaskAssignor.class.getName()), Utils.mkEntry((Object)"num.stream.threads", (Object)40), Utils.mkEntry((Object)"default.key.serde", (Object)Serdes.StringSerde.class.getName()), Utils.mkEntry((Object)"default.value.serde", (Object)Serdes.StringSerde.class.getName())}));
    }

    private static long getEndOffsetSum(Set<TopicPartition> changelogTopicPartitions, Consumer<String, String> consumer) {
        long sum = 0L;
        Collection values = consumer.endOffsets(changelogTopicPartitions).values();
        for (Long value : values) {
            sum += value.longValue();
        }
        return sum;
    }

    private static /* synthetic */ String lambda$shouldScaleOutWithWarmupTasks$6(Set changelogTopicPartitions, Consumer consumer) {
        return "Input records haven't all been written to the changelog: " + HighAvailabilityTaskAssignorIntegrationTest.getEndOffsetSum(changelogTopicPartitions, (Consumer<String, String>)consumer);
    }

    private static /* synthetic */ boolean lambda$shouldScaleOutWithWarmupTasks$5(Set changelogTopicPartitions, Consumer consumer) throws Exception {
        return HighAvailabilityTaskAssignorIntegrationTest.getEndOffsetSum(changelogTopicPartitions, (Consumer<String, String>)consumer) == 500L;
    }

    private static /* synthetic */ String lambda$shouldScaleOutWithWarmupTasks$4(Set inputTopicPartitions, Consumer consumer) {
        return "Input records haven't all been written to the input topic: " + HighAvailabilityTaskAssignorIntegrationTest.getEndOffsetSum(inputTopicPartitions, (Consumer<String, String>)consumer);
    }

    private static /* synthetic */ boolean lambda$shouldScaleOutWithWarmupTasks$3(Set inputTopicPartitions, Consumer consumer) throws Exception {
        return HighAvailabilityTaskAssignorIntegrationTest.getEndOffsetSum(inputTopicPartitions, (Consumer<String, String>)consumer) == 500L;
    }
}

