/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogTopics;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.RepartitionTopicConfig;
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Test;

public class ChangelogTopicsTest {
    private static final String SOURCE_TOPIC_NAME = "source";
    private static final String SINK_TOPIC_NAME = "sink";
    private static final String REPARTITION_TOPIC_NAME = "repartition";
    private static final String CHANGELOG_TOPIC_NAME1 = "changelog1";
    private static final Map<String, String> TOPIC_CONFIG = Collections.singletonMap("config1", "val1");
    private static final RepartitionTopicConfig REPARTITION_TOPIC_CONFIG = new RepartitionTopicConfig("repartition", TOPIC_CONFIG);
    private static final UnwindowedChangelogTopicConfig CHANGELOG_TOPIC_CONFIG = new UnwindowedChangelogTopicConfig("changelog1", TOPIC_CONFIG);
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO1 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet((Object[])new String[]{"sink"}), Utils.mkSet((Object[])new String[]{"source"}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"repartition", (Object)REPARTITION_TOPIC_CONFIG)}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"changelog1", (Object)CHANGELOG_TOPIC_CONFIG)}));
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO2 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet((Object[])new String[]{"sink"}), Utils.mkSet((Object[])new String[]{"source"}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"repartition", (Object)REPARTITION_TOPIC_CONFIG)}), Utils.mkMap((Map.Entry[])new Map.Entry[0]));
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO3 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet((Object[])new String[]{"sink"}), Utils.mkSet((Object[])new String[]{"source"}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"repartition", (Object)REPARTITION_TOPIC_CONFIG)}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"source", (Object)CHANGELOG_TOPIC_CONFIG)}));
    private static final InternalTopologyBuilder.TopicsInfo TOPICS_INFO4 = new InternalTopologyBuilder.TopicsInfo(Utils.mkSet((Object[])new String[]{"sink"}), Utils.mkSet((Object[])new String[]{"source"}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"repartition", (Object)REPARTITION_TOPIC_CONFIG)}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"source", null), Utils.mkEntry((Object)"changelog1", (Object)CHANGELOG_TOPIC_CONFIG)}));
    private static final TaskId TASK_0_0 = new TaskId(0, 0);
    private static final TaskId TASK_0_1 = new TaskId(0, 1);
    private static final TaskId TASK_0_2 = new TaskId(0, 2);
    final InternalTopicManager internalTopicManager = (InternalTopicManager)EasyMock.mock(InternalTopicManager.class);

    @Test
    public void shouldNotContainChangelogsForStatelessTasks() {
        EasyMock.expect((Object)this.internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
        Map topicGroups = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)TOPICS_INFO2)});
        Map tasksForTopicGroup = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)Utils.mkSet((Object[])new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2}))});
        EasyMock.replay((Object[])new Object[]{this.internalTopicManager});
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
        changelogTopics.setup();
        EasyMock.verify((Object[])new Object[]{this.internalTopicManager});
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_0), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_1), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_2), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingNonSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() {
        EasyMock.expect((Object)this.internalTopicManager.makeReady(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CHANGELOG_TOPIC_NAME1, (Object)CHANGELOG_TOPIC_CONFIG)}))).andStubReturn((Object)Utils.mkSet((Object[])new String[]{CHANGELOG_TOPIC_NAME1}));
        Map topicGroups = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)TOPICS_INFO1)});
        Set tasks = Utils.mkSet((Object[])new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2});
        Map tasksForTopicGroup = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)tasks)});
        EasyMock.replay((Object[])new Object[]{this.internalTopicManager});
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
        changelogTopics.setup();
        EasyMock.verify((Object[])new Object[]{this.internalTopicManager});
        MatcherAssert.assertThat((Object)CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), (Matcher)CoreMatchers.is((Object)3));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_0), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_1), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_2), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingNonSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {
        EasyMock.expect((Object)this.internalTopicManager.makeReady(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CHANGELOG_TOPIC_NAME1, (Object)CHANGELOG_TOPIC_CONFIG)}))).andStubReturn(Collections.emptySet());
        Map topicGroups = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)TOPICS_INFO1)});
        Set tasks = Utils.mkSet((Object[])new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2});
        Map tasksForTopicGroup = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)tasks)});
        EasyMock.replay((Object[])new Object[]{this.internalTopicManager});
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
        changelogTopics.setup();
        EasyMock.verify((Object[])new Object[]{this.internalTopicManager});
        MatcherAssert.assertThat((Object)CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), (Matcher)CoreMatchers.is((Object)3));
        TopicPartition changelogPartition0 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
        TopicPartition changelogPartition1 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
        TopicPartition changelogPartition2 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 2);
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_0), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition0})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_1), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition1})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_2), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition2})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is(Collections.emptySet()));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingNonSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition0, changelogPartition1, changelogPartition2})));
    }

    @Test
    public void shouldOnlyContainPreExistingSourceBasedChangelogs() {
        EasyMock.expect((Object)this.internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
        Map topicGroups = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)TOPICS_INFO3)});
        Set tasks = Utils.mkSet((Object[])new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2});
        Map tasksForTopicGroup = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)tasks)});
        EasyMock.replay((Object[])new Object[]{this.internalTopicManager});
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
        changelogTopics.setup();
        EasyMock.verify((Object[])new Object[]{this.internalTopicManager});
        TopicPartition changelogPartition0 = new TopicPartition(SOURCE_TOPIC_NAME, 0);
        TopicPartition changelogPartition1 = new TopicPartition(SOURCE_TOPIC_NAME, 1);
        TopicPartition changelogPartition2 = new TopicPartition(SOURCE_TOPIC_NAME, 2);
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_0), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition0})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_1), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition1})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_2), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition2})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition0, changelogPartition1, changelogPartition2})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingNonSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is(Collections.emptySet()));
    }

    @Test
    public void shouldContainBothTypesOfPreExistingChangelogs() {
        EasyMock.expect((Object)this.internalTopicManager.makeReady(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CHANGELOG_TOPIC_NAME1, (Object)CHANGELOG_TOPIC_CONFIG)}))).andStubReturn(Collections.emptySet());
        Map topicGroups = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)TOPICS_INFO4)});
        Set tasks = Utils.mkSet((Object[])new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2});
        Map tasksForTopicGroup = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)AssignmentTestUtils.SUBTOPOLOGY_0, (Object)tasks)});
        EasyMock.replay((Object[])new Object[]{this.internalTopicManager});
        ChangelogTopics changelogTopics = new ChangelogTopics(this.internalTopicManager, topicGroups, tasksForTopicGroup, "[test] ");
        changelogTopics.setup();
        EasyMock.verify((Object[])new Object[]{this.internalTopicManager});
        MatcherAssert.assertThat((Object)CHANGELOG_TOPIC_CONFIG.numberOfPartitions().orElse(Integer.MIN_VALUE), (Matcher)CoreMatchers.is((Object)3));
        TopicPartition changelogPartition0 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 0);
        TopicPartition changelogPartition1 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 1);
        TopicPartition changelogPartition2 = new TopicPartition(CHANGELOG_TOPIC_NAME1, 2);
        TopicPartition sourcePartition0 = new TopicPartition(SOURCE_TOPIC_NAME, 0);
        TopicPartition sourcePartition1 = new TopicPartition(SOURCE_TOPIC_NAME, 1);
        TopicPartition sourcePartition2 = new TopicPartition(SOURCE_TOPIC_NAME, 2);
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_0), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{sourcePartition0, changelogPartition0})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_1), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{sourcePartition1, changelogPartition1})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingPartitionsFor(TASK_0_2), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{sourcePartition2, changelogPartition2})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{sourcePartition0, sourcePartition1, sourcePartition2})));
        MatcherAssert.assertThat((Object)changelogTopics.preExistingNonSourceTopicBasedPartitions(), (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new TopicPartition[]{changelogPartition0, changelogPartition1, changelogPartition2})));
    }
}

