/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.test.MockRestoreCallback;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Test;

public class StoreChangelogReaderTest {
    private final MockRestoreCallback callback = new MockRestoreCallback();
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.consumer);
    private final TopicPartition topicPartition = new TopicPartition("topic", 0);

    @Test
    public void shouldRequestTopicsAndHandleTimeoutException() throws Exception {
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<String, List<PartitionInfo>> listTopics() {
                functionCalled.set(true);
                throw new TimeoutException("KABOOM!");
            }
        };
        StoreChangelogReader changelogReader = new StoreChangelogReader((Consumer)consumer);
        changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, true));
        changelogReader.restore();
        Assert.assertTrue((boolean)functionCalled.get());
    }

    @Test
    public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception {
        this.consumer.subscribe(Collections.singleton("sometopic"));
        try {
            this.changelogReader.restore();
            Assert.fail((String)"Should have thrown IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, true));
        this.changelogReader.restore();
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    @Test
    public void shouldRestoreMessagesFromCheckpoint() throws Exception {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, Long.valueOf(5L), Long.MAX_VALUE, true));
        this.changelogReader.restore();
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
    }

    @Test
    public void shouldClearAssignmentAtEndOfRestore() throws Exception {
        boolean messages = true;
        this.setupConsumer(1L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, true));
        this.changelogReader.restore();
        MatcherAssert.assertThat((Object)this.consumer.assignment(), (Matcher)IsEqual.equalTo(Collections.emptySet()));
    }

    @Test
    public void shouldRestoreToLimitWhenSupplied() throws Exception {
        this.setupConsumer(10L, this.topicPartition);
        StateRestorer restorer = new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, 3L, true);
        this.changelogReader.register(restorer);
        this.changelogReader.restore();
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)3));
        MatcherAssert.assertThat((Object)restorer.restoredOffset(), (Matcher)IsEqual.equalTo((Object)3L));
    }

    @Test
    public void shouldRestoreMultipleStores() throws Exception {
        TopicPartition one = new TopicPartition("one", 0);
        TopicPartition two = new TopicPartition("two", 0);
        MockRestoreCallback callbackOne = new MockRestoreCallback();
        MockRestoreCallback callbackTwo = new MockRestoreCallback();
        this.setupConsumer(10L, this.topicPartition);
        this.setupConsumer(5L, one);
        this.setupConsumer(3L, two);
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, true));
        this.changelogReader.register(new StateRestorer(one, (StateRestoreCallback)callbackOne, null, Long.MAX_VALUE, true));
        this.changelogReader.register(new StateRestorer(two, (StateRestoreCallback)callbackTwo, null, Long.MAX_VALUE, true));
        this.changelogReader.restore();
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
        MatcherAssert.assertThat((Object)callbackOne.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
        MatcherAssert.assertThat((Object)callbackTwo.restored.size(), (Matcher)IsEqual.equalTo((Object)3));
    }

    @Test
    public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception {
        StateRestorer restorer = new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, true);
        this.setupConsumer(0L, this.topicPartition);
        this.changelogReader.register(restorer);
        this.changelogReader.restore();
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)0));
        MatcherAssert.assertThat((Object)restorer.restoredOffset(), (Matcher)IsEqual.equalTo((Object)0L));
    }

    @Test
    public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception {
        Long endOffset = 10L;
        this.setupConsumer(endOffset, this.topicPartition);
        StateRestorer restorer = new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, endOffset, Long.MAX_VALUE, true);
        this.changelogReader.register(restorer);
        this.changelogReader.restore();
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)0));
        MatcherAssert.assertThat((Object)restorer.restoredOffset(), (Matcher)IsEqual.equalTo((Object)endOffset));
    }

    @Test
    public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception {
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, true));
        this.changelogReader.restore();
        Map restoredOffsets = this.changelogReader.restoredOffsets();
        MatcherAssert.assertThat((Object)restoredOffsets, (Matcher)IsEqual.equalTo(Collections.singletonMap(this.topicPartition, 10L)));
    }

    @Test
    public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception {
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, false));
        this.changelogReader.restore();
        Map restoredOffsets = this.changelogReader.restoredOffsets();
        MatcherAssert.assertThat((Object)restoredOffsets, (Matcher)IsEqual.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldIgnoreNullKeysWhenRestoring() throws Exception {
        this.assignPartition(3L, this.topicPartition);
        byte[] bytes = new byte[]{};
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 0L, (Object)bytes, (Object)bytes));
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 1L, (Object)null, (Object)bytes));
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 2L, (Object)bytes, (Object)bytes));
        this.consumer.assign(Collections.singletonList(this.topicPartition));
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, false));
        this.changelogReader.restore();
        MatcherAssert.assertThat(this.callback.restored, (Matcher)CoreMatchers.equalTo((Object)Utils.mkList((Object[])new KeyValue[]{KeyValue.pair((Object)bytes, (Object)bytes), KeyValue.pair((Object)bytes, (Object)bytes)})));
    }

    @Test
    public void shouldReturnCompletedPartitionsOnEachRestoreCall() {
        this.assignPartition(10L, this.topicPartition);
        byte[] bytes = new byte[]{};
        for (int i = 0; i < 5; ++i) {
            this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), (long)i, (Object)bytes, (Object)bytes));
        }
        this.consumer.assign(Collections.singletonList(this.topicPartition));
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, false));
        Collection completedFirstTime = this.changelogReader.restore();
        Assert.assertTrue((boolean)completedFirstTime.isEmpty());
        for (int i = 5; i < 10; ++i) {
            this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), (long)i, (Object)bytes, (Object)bytes));
        }
        Set<TopicPartition> expected = Collections.singleton(this.topicPartition);
        MatcherAssert.assertThat((Object)this.changelogReader.restore(), (Matcher)IsEqual.equalTo(expected));
    }

    private void setupConsumer(long messages, TopicPartition topicPartition) {
        this.assignPartition(messages, topicPartition);
        int i = 0;
        while ((long)i < messages) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), (long)i, (Object)new byte[0], (Object)new byte[0]));
            ++i;
        }
        this.consumer.assign(Collections.emptyList());
    }

    private void assignPartition(long messages, TopicPartition topicPartition) {
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0L, messages)));
        this.consumer.assign(Collections.singletonList(topicPartition));
    }

    @Test
    public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
        Set<TopicPartition> expected = Collections.singleton(this.topicPartition);
        this.setupConsumer(0L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, true));
        Collection restored = this.changelogReader.restore();
        MatcherAssert.assertThat((Object)restored, (Matcher)IsEqual.equalTo(expected));
    }

    @Test
    public void shouldRestorePartitionsRegisteredPostInitialization() {
        MockRestoreCallback callbackTwo = new MockRestoreCallback();
        this.setupConsumer(1L, this.topicPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 10L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, (StateRestoreCallback)this.callback, null, Long.MAX_VALUE, false));
        Assert.assertTrue((boolean)this.changelogReader.restore().isEmpty());
        this.addRecords(9L, this.topicPartition, 1);
        TopicPartition postInitialization = new TopicPartition("other", 0);
        this.setupConsumer(3L, postInitialization);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
        this.changelogReader.register(new StateRestorer(postInitialization, (StateRestoreCallback)callbackTwo, null, Long.MAX_VALUE, false));
        Set expected = Utils.mkSet((Object[])new TopicPartition[]{this.topicPartition, postInitialization});
        this.consumer.assign((Collection)expected);
        MatcherAssert.assertThat((Object)this.changelogReader.restore(), (Matcher)IsEqual.equalTo((Object)expected));
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
        MatcherAssert.assertThat((Object)callbackTwo.restored.size(), (Matcher)IsEqual.equalTo((Object)3));
    }

    private void addRecords(long messages, TopicPartition topicPartition, int startingOffset) {
        int i = 0;
        while ((long)i < messages) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), (long)(startingOffset + i), (Object)new byte[0], (Object)new byte[0]));
            ++i;
        }
    }
}

