/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;

@Category(value={IntegrationTest.class})
public class TaskMetadataIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
    @Rule
    public TestName testName = new TestName();
    private String inputTopic;
    private static StreamsBuilder builder;
    private static Properties properties;
    private static String appIdPrefix;
    private static String appId;
    private AtomicBoolean process;
    private AtomicBoolean commit;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setup() {
        String testId = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        appId = appIdPrefix + testId;
        this.inputTopic = "input" + testId;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, this.inputTopic);
        builder = new StreamsBuilder();
        this.process = new AtomicBoolean(true);
        this.commit = new AtomicBoolean(true);
        KStream stream = builder.stream(this.inputTopic);
        stream.process(() -> new PauseProcessor(), new String[0]);
        properties = Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"num.stream.threads", (Object)2), Utils.mkEntry((Object)"default.key.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"default.value.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"commit.interval.ms", (Object)1L)}));
    }

    @Test
    public void shouldReportCorrectCommittedOffsetInformation() {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            TaskMetadata taskMetadata = this.getTaskMetadata(kafkaStreams);
            MatcherAssert.assertThat((Object)taskMetadata.committedOffsets().size(), (Matcher)CoreMatchers.equalTo((Object)1));
            TopicPartition topicPartition = new TopicPartition(this.inputTopic, 0);
            this.produceMessages(0L, this.inputTopic, "test");
            TestUtils.waitForCondition(() -> !this.process.get(), (String)"The record was not processed");
            TestUtils.waitForCondition(() -> (Long)taskMetadata.committedOffsets().get(topicPartition) == 1L, (String)"the record was processed");
            this.process.set(true);
            this.produceMessages(0L, this.inputTopic, "test1");
            TestUtils.waitForCondition(() -> !this.process.get(), (String)"The record was not processed");
            TestUtils.waitForCondition(() -> (Long)taskMetadata.committedOffsets().get(topicPartition) == 2L, (String)"the record was processed");
            this.process.set(true);
            this.produceMessages(0L, this.inputTopic, "test1");
            TestUtils.waitForCondition(() -> !this.process.get(), (String)"The record was not processed");
            TestUtils.waitForCondition(() -> (Long)taskMetadata.committedOffsets().get(topicPartition) == 3L, (String)"the record was processed");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void shouldReportCorrectEndOffsetInformation() {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
            TaskMetadata taskMetadata = this.getTaskMetadata(kafkaStreams);
            MatcherAssert.assertThat((Object)taskMetadata.endOffsets().size(), (Matcher)CoreMatchers.equalTo((Object)1));
            TopicPartition topicPartition = new TopicPartition(this.inputTopic, 0);
            this.commit.set(false);
            for (int i = 0; i < 10; ++i) {
                this.produceMessages(0L, this.inputTopic, "test");
                TestUtils.waitForCondition(() -> !this.process.get(), (String)"The record was not processed");
                this.process.set(true);
            }
            MatcherAssert.assertThat(taskMetadata.endOffsets().get(topicPartition), (Matcher)CoreMatchers.equalTo((Object)9L));
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private TaskMetadata getTaskMetadata(KafkaStreams kafkaStreams) throws InterruptedException {
        AtomicReference taskMetadataList = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            taskMetadataList.set(kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList()));
            return ((List)taskMetadataList.get()).size() == 1;
        }, (String)"The number of active tasks returned in the allotted time was not one.");
        return (TaskMetadata)((List)taskMetadataList.get()).get(0);
    }

    @After
    public void teardown() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(properties);
    }

    private void produceMessages(long timestamp, String streamOneInput, String msg) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(streamOneInput, Collections.singletonList(new KeyValue((Object)"1", (Object)msg)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    static {
        appIdPrefix = "TaskMetadataTest_";
    }

    private class PauseProcessor
    extends ContextualProcessor<String, String, Void, Void> {
        private PauseProcessor() {
        }

        public void process(Record<String, String> record) {
            while (!TaskMetadataIntegrationTest.this.process.get()) {
                try {
                    ((Object)((Object)this)).wait(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
            if (TaskMetadataIntegrationTest.this.commit.get()) {
                this.context().commit();
            }
            TaskMetadataIntegrationTest.this.process.set(false);
        }
    }
}

