/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamsFineGrainedAutoResetIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
    private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
    private static final String OUTPUT_TOPIC_2 = "outputTopic_2";
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String TOPIC_1_0 = "topic-1_0";
    private static final String TOPIC_2_0 = "topic-2_0";
    private static final String TOPIC_A_0 = "topic-A_0";
    private static final String TOPIC_C_0 = "topic-C_0";
    private static final String TOPIC_Y_0 = "topic-Y_0";
    private static final String TOPIC_Z_0 = "topic-Z_0";
    private static final String TOPIC_1_1 = "topic-1_1";
    private static final String TOPIC_2_1 = "topic-2_1";
    private static final String TOPIC_A_1 = "topic-A_1";
    private static final String TOPIC_C_1 = "topic-C_1";
    private static final String TOPIC_Y_1 = "topic-Y_1";
    private static final String TOPIC_Z_1 = "topic-Z_1";
    private static final String TOPIC_1_2 = "topic-1_2";
    private static final String TOPIC_2_2 = "topic-2_2";
    private static final String TOPIC_A_2 = "topic-A_2";
    private static final String TOPIC_C_2 = "topic-C_2";
    private static final String TOPIC_Y_2 = "topic-Y_2";
    private static final String TOPIC_Z_2 = "topic-Z_2";
    private static final String NOOP = "noop";
    private final Serde<String> stringSerde;
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private Properties streamsConfiguration;
    private final String topic1TestMessage = "topic-1 test";
    private final String topic2TestMessage = "topic-2 test";
    private final String topicATestMessage = "topic-A test";
    private final String topicCTestMessage = "topic-C test";
    private final String topicYTestMessage = "topic-Y test";
    private final String topicZTestMessage = "topic-Z test";

    public KStreamsFineGrainedAutoResetIntegrationTest() {
        this.mockTime = KStreamsFineGrainedAutoResetIntegrationTest.CLUSTER.time;
        this.stringSerde = Serdes.String();
        this.topic1TestMessage = "topic-1 test";
        this.topic2TestMessage = "topic-2 test";
        this.topicATestMessage = "topic-A test";
        this.topicCTestMessage = "topic-C test";
        this.topicYTestMessage = "topic-Y test";
        this.topicZTestMessage = "topic-Z test";
    }

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopics(TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, NOOP, DEFAULT_OUTPUT_TOPIC, OUTPUT_TOPIC_0, OUTPUT_TOPIC_1, OUTPUT_TOPIC_2);
    }

    @Before
    public void setUp() throws Exception {
        Properties props = new Properties();
        props.put("auto.offset.reset", "earliest");
        props.put("internal.leave.group.on.close", (Object)true);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("testAutoOffsetId", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, props);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest() throws Exception {
        this.streamsConfiguration.put(StreamsConfig.consumerPrefix((String)"auto.offset.reset"), "latest");
        List<String> expectedReceivedValues = Arrays.asList("topic-1 test", "topic-2 test");
        this.shouldOnlyReadForEarliest("_0", TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0, TOPIC_Z_0, OUTPUT_TOPIC_0, expectedReceivedValues);
    }

    @Test
    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest() throws Exception {
        List<String> expectedReceivedValues = Arrays.asList("topic-1 test", "topic-2 test", "topic-Y test", "topic-Z test");
        this.shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1, TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues);
    }

    @Test
    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets() throws Exception {
        this.commitInvalidOffsets();
        List<String> expectedReceivedValues = Arrays.asList("topic-1 test", "topic-2 test", "topic-Y test", "topic-Z test");
        this.shouldOnlyReadForEarliest("_2", TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2, TOPIC_Z_2, OUTPUT_TOPIC_2, expectedReceivedValues);
    }

    private void shouldOnlyReadForEarliest(String topicSuffix, String topic1, String topic2, String topicA, String topicC, String topicY, String topicZ, String outputTopic, List<String> expectedReceivedValues) throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
        KStream pattern2Stream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
        KStream namedTopicsStream = builder.stream(new String[]{topicY, topicZ});
        pattern1Stream.to(this.stringSerde, this.stringSerde, outputTopic);
        pattern2Stream.to(this.stringSerde, this.stringSerde, outputTopic);
        namedTopicsStream.to(this.stringSerde, this.stringSerde, outputTopic);
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(topic1, Collections.singletonList("topic-1 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(topic2, Collections.singletonList("topic-2 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(topicA, Collections.singletonList("topic-A test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(topicC, Collections.singletonList("topic-C test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(topicY, Collections.singletonList("topic-Y test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(topicZ, Collections.singletonList("topic-Z test"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        streams.start();
        List receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedReceivedValues.size());
        ArrayList<Object> actualValues = new ArrayList<Object>(expectedReceivedValues.size());
        for (KeyValue receivedKeyValue : receivedKeyValues) {
            actualValues.add(receivedKeyValue.value);
        }
        streams.close();
        Collections.sort(actualValues);
        Collections.sort(expectedReceivedValues);
        Assert.assertThat(actualValues, (Matcher)CoreMatchers.equalTo(expectedReceivedValues));
    }

    private void commitInvalidOffsets() {
        KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)this.streamsConfiguration.getProperty("application.id"), StringDeserializer.class, StringDeserializer.class));
        HashMap<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5L, null));
        invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5L, null));
        invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5L, null));
        invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5L, null));
        invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5L, null));
        invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5L, null));
        consumer.commitSync(invalidOffsets);
        consumer.close();
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThrowExceptionOverlappingPattern() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
        builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
        builder.stream(new String[]{TOPIC_Y_1, TOPIC_Z_1});
        builder.earliestResetTopicsPattern();
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThrowExceptionOverlappingTopic() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
        builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1"));
        builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, new String[]{TOPIC_A_1, TOPIC_Z_1});
        builder.latestResetTopicsPattern();
    }

    @Test
    public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception {
        Properties props = new Properties();
        props.put("auto.offset.reset", "none");
        Properties localConfig = StreamsTestUtils.getStreamsConfig("testAutoOffsetWithNone", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, props);
        KStreamBuilder builder = new KStreamBuilder();
        KStream exceptionStream = builder.stream(new String[]{NOOP});
        exceptionStream.to(this.stringSerde, this.stringSerde, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, localConfig);
        final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        TestCondition correctExceptionThrownCondition = new TestCondition(){

            public boolean conditionMet() {
                return uncaughtExceptionHandler.correctExceptionThrown;
            }
        };
        streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        streams.start();
        TestUtils.waitForCondition((TestCondition)correctExceptionThrownCondition, (String)"The expected NoOffsetForPartitionException was never thrown");
        streams.close();
    }

    private static final class TestingUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        boolean correctExceptionThrown = false;

        private TestingUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            Assert.assertThat((Object)e.getClass().getSimpleName(), (Matcher)CoreMatchers.is((Object)"StreamsException"));
            Assert.assertThat((Object)e.getCause().getClass().getSimpleName(), (Matcher)CoreMatchers.is((Object)"NoOffsetForPartitionException"));
            this.correctExceptionThrown = true;
        }
    }
}

