/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.server.KafkaConfig$;
import kafka.tools.StreamsResetter;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.AbstractResetIntegrationTest;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.IntegrationTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class ResetIntegrationTest
extends AbstractResetIntegrationTest {
    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;

    @Override
    Map<String, Object> getClientSslConfig() {
        return null;
    }

    @Before
    public void before() throws Exception {
        cluster = CLUSTER;
        this.prepareTest();
    }

    @After
    public void after() throws Exception {
        this.cleanupTest();
    }

    @Test
    public void shouldNotAllowToResetWhileStreamsIsRunning() {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)1L, (long)exitCode);
        streams.close();
    }

    @Test
    public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)1L, (long)exitCode);
    }

    @Test
    public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String[] parameters = new String[]{"--application-id", appID, "--bootstrap-servers", cluster.bootstrapServers(), "--intermediate-topics", NON_EXISTING_TOPIC, "--execute"};
        Properties cleanUpConfig = new Properties();
        cleanUpConfig.put("heartbeat.interval.ms", (Object)100);
        cleanUpConfig.put("session.timeout.ms", Integer.toString(2000));
        int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
        Assert.assertEquals((long)1L, (long)exitCode);
    }

    @Test
    public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig.put("application.id", appID);
        this.streamsConfig.put("session.timeout.ms", Integer.toString(200000));
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        boolean cleanResult = this.tryCleanGlobal(false, null, null, appID);
        Assert.assertFalse((boolean)cleanResult);
        this.cleanGlobal(false, "--force", null, appID);
        MatcherAssert.assertThat((String)"Group is not empty after cleanGlobal", (boolean)IntegrationTestUtils.isEmptyConsumerGroup(adminClient, appID));
        this.assertInternalTopicsGotDeleted(null);
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        this.cleanGlobal(false, "--force", null, appID);
    }

    @Test
    public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        File resetFile = File.createTempFile("reset", ".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        this.cleanGlobal(false, "--from-file", resetFile.getAbsolutePath(), appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 5);
        streams.close();
        result.remove(0);
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.cleanGlobal(false, null, null, appID);
    }

    @Test
    public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        File resetFile = File.createTempFile("reset", ".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
        Calendar calendar = Calendar.getInstance();
        calendar.add(5, -1);
        this.cleanGlobal(false, "--to-datetime", format.format(calendar.getTime()), appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.cleanGlobal(false, null, null, appID);
    }

    @Test
    public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception {
        String appID = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.streamsConfig.put("application.id", appID);
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        File resetFile = File.createTempFile("reset", ".csv");
        try (BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile));){
            writer.write("inputTopic,0,1");
        }
        streams = new KafkaStreams(this.setupTopologyWithoutIntermediateUserTopic(), this.streamsConfig);
        streams.cleanUp();
        this.cleanGlobal(false, "--by-duration", "PT1M", appID);
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.assertInternalTopicsGotDeleted(null);
        resetFile.deleteOnExit();
        streams.start();
        List resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(this.resultConsumerConfig, "outputTopic", 10);
        streams.close();
        MatcherAssert.assertThat(resultRerun, (Matcher)CoreMatchers.equalTo(result));
        IntegrationTestUtils.waitForEmptyConsumerGroup(adminClient, appID, 30000L);
        this.cleanGlobal(false, null, null, appID);
    }

    static {
        Properties brokerProps = new Properties();
        brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), (Object)-1L);
        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
    }
}

