/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.internals.RecordConverters;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=EasyMockRunner.class)
public class StoreChangelogReaderTest {
    @Mock(type=MockType.NICE)
    private RestoringTasks active;
    @Mock(type=MockType.NICE)
    private StreamTask task;
    private final MockStateRestoreListener callback = new MockStateRestoreListener();
    private final CompositeRestoreListener restoreListener = new CompositeRestoreListener((StateRestoreCallback)this.callback);
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final TopicPartition topicPartition = new TopicPartition("topic", 0);
    private final LogContext logContext = new LogContext("test-reader ");
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.consumer, Duration.ZERO, this.stateRestoreListener, this.logContext);

    @Before
    public void setUp() {
        this.restoreListener.setUserRestoreListener(this.stateRestoreListener);
    }

    @Test
    public void shouldRequestTopicsAndHandleTimeoutException() {
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<String, List<PartitionInfo>> listTopics() {
                functionCalled.set(true);
                throw new TimeoutException("KABOOM!");
            }
        };
        StoreChangelogReader changelogReader = new StoreChangelogReader((Consumer)consumer, Duration.ZERO, this.stateRestoreListener, this.logContext);
        changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        changelogReader.restore(this.active);
        Assert.assertTrue((boolean)functionCalled.get());
    }

    @Test
    public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
        StateRestorer mockRestorer = (StateRestorer)EasyMock.mock(StateRestorer.class);
        mockRestorer.setUserRestoreListener(this.stateRestoreListener);
        EasyMock.expect((Object)mockRestorer.partition()).andReturn((Object)new TopicPartition("sometopic", 0)).andReturn((Object)new TopicPartition("sometopic", 0)).andReturn((Object)new TopicPartition("sometopic", 0)).andReturn((Object)new TopicPartition("sometopic", 0));
        EasyMock.replay((Object[])new Object[]{mockRestorer});
        this.changelogReader.register(mockRestorer);
        this.consumer.subscribe(Collections.singleton("sometopic"));
        try {
            this.changelogReader.restore(this.active);
            Assert.fail((String)"Should have thrown IllegalStateException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    @Test
    public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.consumer.setPollException((KafkaException)new InvalidOffsetException("Try Again!"){

            public Set<TopicPartition> partitions() {
                return Collections.singleton(StoreChangelogReaderTest.this.topicPartition);
            }
        });
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        Assert.assertEquals((long)0L, (long)this.changelogReader.restore(this.active).size());
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        Assert.assertEquals((long)1L, (long)this.changelogReader.restore(this.active).size());
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    @Test
    public void shouldRecoverFromOffsetOutOfRangeExceptionAndRestoreFromStart() {
        int messages = 10;
        int startOffset = 5;
        long expiredCheckpoint = 1L;
        this.assignPartition(10L, this.topicPartition);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.topicPartition, 5L));
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 15L));
        this.addRecords(10L, this.topicPartition, 5);
        this.consumer.assign(Collections.emptyList());
        StateRestorer stateRestorer = new StateRestorer(this.topicPartition, this.restoreListener, Long.valueOf(1L), Long.MAX_VALUE, true, "storeName", RecordConverters.identity());
        this.changelogReader.register(stateRestorer);
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        Assert.assertEquals((long)0L, (long)this.changelogReader.restore(this.active).size());
        MatcherAssert.assertThat((Object)stateRestorer.checkpoint(), (Matcher)IsEqual.equalTo((Object)-1L));
        this.changelogReader.register(stateRestorer);
        Assert.assertEquals((long)1L, (long)this.changelogReader.restore(this.active).size());
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    @Test
    public void shouldRestoreMessagesFromCheckpoint() {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, Long.valueOf(5L), Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
    }

    @Test
    public void shouldClearAssignmentAtEndOfRestore() {
        boolean messages = true;
        this.setupConsumer(1L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.consumer.assignment(), (Matcher)IsEqual.equalTo(Collections.emptySet()));
    }

    @Test
    public void shouldRestoreToLimitWhenSupplied() {
        this.setupConsumer(10L, this.topicPartition);
        StateRestorer restorer = new StateRestorer(this.topicPartition, this.restoreListener, null, 3L, true, "storeName", RecordConverters.identity());
        this.changelogReader.register(restorer);
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)3));
        MatcherAssert.assertThat((Object)restorer.restoredOffset(), (Matcher)IsEqual.equalTo((Object)3L));
    }

    @Test
    public void shouldRestoreMultipleStores() {
        TopicPartition one = new TopicPartition("one", 0);
        TopicPartition two = new TopicPartition("two", 0);
        MockRestoreCallback callbackOne = new MockRestoreCallback();
        MockRestoreCallback callbackTwo = new MockRestoreCallback();
        CompositeRestoreListener restoreListener1 = new CompositeRestoreListener((StateRestoreCallback)callbackOne);
        CompositeRestoreListener restoreListener2 = new CompositeRestoreListener((StateRestoreCallback)callbackTwo);
        this.setupConsumer(10L, this.topicPartition);
        this.setupConsumer(5L, one);
        this.setupConsumer(3L, two);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName1", RecordConverters.identity()));
        this.changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2", RecordConverters.identity()));
        this.changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(one)).andStubReturn((Object)this.task);
        EasyMock.expect((Object)this.active.restoringTaskFor(two)).andStubReturn((Object)this.task);
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
        MatcherAssert.assertThat((Object)callbackOne.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
        MatcherAssert.assertThat((Object)callbackTwo.restored.size(), (Matcher)IsEqual.equalTo((Object)3));
    }

    @Test
    public void shouldRestoreAndNotifyMultipleStores() {
        TopicPartition one = new TopicPartition("one", 0);
        TopicPartition two = new TopicPartition("two", 0);
        MockStateRestoreListener callbackOne = new MockStateRestoreListener();
        MockStateRestoreListener callbackTwo = new MockStateRestoreListener();
        CompositeRestoreListener restoreListener1 = new CompositeRestoreListener((StateRestoreCallback)callbackOne);
        CompositeRestoreListener restoreListener2 = new CompositeRestoreListener((StateRestoreCallback)callbackTwo);
        this.setupConsumer(10L, this.topicPartition);
        this.setupConsumer(5L, one);
        this.setupConsumer(3L, two);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, Long.valueOf(0L), Long.MAX_VALUE, true, "storeName1", RecordConverters.identity()));
        this.changelogReader.register(new StateRestorer(one, restoreListener1, Long.valueOf(0L), Long.MAX_VALUE, true, "storeName2", RecordConverters.identity()));
        this.changelogReader.register(new StateRestorer(two, restoreListener2, Long.valueOf(0L), Long.MAX_VALUE, true, "storeName3", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(one)).andReturn((Object)this.task);
        EasyMock.expect((Object)this.active.restoringTaskFor(two)).andReturn((Object)this.task);
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
        MatcherAssert.assertThat((Object)callbackOne.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
        MatcherAssert.assertThat((Object)callbackTwo.restored.size(), (Matcher)IsEqual.equalTo((Object)3));
        this.assertAllCallbackStatesExecuted(this.callback, "storeName1");
        this.assertCorrectOffsetsReportedByListener(this.callback, 0L, 9L, 10L);
        this.assertAllCallbackStatesExecuted(callbackOne, "storeName2");
        this.assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L);
        this.assertAllCallbackStatesExecuted(callbackTwo, "storeName3");
        this.assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 2L, 3L);
    }

    @Test
    public void shouldOnlyReportTheLastRestoredOffset() {
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, Long.valueOf(0L), 5L, true, "storeName1", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
        this.assertAllCallbackStatesExecuted(this.callback, "storeName1");
        this.assertCorrectOffsetsReportedByListener(this.callback, 0L, 4L, 5L);
    }

    private void assertAllCallbackStatesExecuted(MockStateRestoreListener restoreListener, String storeName) {
        MatcherAssert.assertThat((Object)restoreListener.storeNameCalledStates.get("restore_start"), (Matcher)IsEqual.equalTo((Object)storeName));
        MatcherAssert.assertThat((Object)restoreListener.storeNameCalledStates.get("restore_batch"), (Matcher)IsEqual.equalTo((Object)storeName));
        MatcherAssert.assertThat((Object)restoreListener.storeNameCalledStates.get("restore_end"), (Matcher)IsEqual.equalTo((Object)storeName));
    }

    private void assertCorrectOffsetsReportedByListener(MockStateRestoreListener restoreListener, long startOffset, long batchOffset, long totalRestored) {
        MatcherAssert.assertThat((Object)restoreListener.restoreStartOffset, (Matcher)IsEqual.equalTo((Object)startOffset));
        MatcherAssert.assertThat((Object)restoreListener.restoredBatchOffset, (Matcher)IsEqual.equalTo((Object)batchOffset));
        MatcherAssert.assertThat((Object)restoreListener.totalNumRestored, (Matcher)IsEqual.equalTo((Object)totalRestored));
    }

    @Test
    public void shouldNotRestoreAnythingWhenPartitionIsEmpty() {
        StateRestorer restorer = new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity());
        this.setupConsumer(0L, this.topicPartition);
        this.changelogReader.register(restorer);
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)0));
        MatcherAssert.assertThat((Object)restorer.restoredOffset(), (Matcher)IsEqual.equalTo((Object)0L));
    }

    @Test
    public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
        long endOffset = 10L;
        this.setupConsumer(10L, this.topicPartition);
        StateRestorer restorer = new StateRestorer(this.topicPartition, this.restoreListener, Long.valueOf(10L), Long.MAX_VALUE, true, "storeName", RecordConverters.identity());
        this.changelogReader.register(restorer);
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)0));
        MatcherAssert.assertThat((Object)restorer.restoredOffset(), (Matcher)IsEqual.equalTo((Object)10L));
    }

    @Test
    public void shouldReturnRestoredOffsetsForPersistentStores() {
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        Map restoredOffsets = this.changelogReader.restoredOffsets();
        MatcherAssert.assertThat((Object)restoredOffsets, (Matcher)IsEqual.equalTo(Collections.singletonMap(this.topicPartition, 10L)));
    }

    @Test
    public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, false, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        Map restoredOffsets = this.changelogReader.restoredOffsets();
        MatcherAssert.assertThat((Object)restoredOffsets, (Matcher)IsEqual.equalTo(Collections.emptyMap()));
    }

    @Test
    public void shouldIgnoreNullKeysWhenRestoring() {
        this.assignPartition(3L, this.topicPartition);
        byte[] bytes = new byte[]{};
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 0L, (Object)bytes, (Object)bytes));
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 1L, null, (Object)bytes));
        this.consumer.addRecord(new ConsumerRecord(this.topicPartition.topic(), this.topicPartition.partition(), 2L, (Object)bytes, (Object)bytes));
        this.consumer.assign(Collections.singletonList(this.topicPartition));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, false, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat(this.callback.restored, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)bytes, (Object)bytes), KeyValue.pair((Object)bytes, (Object)bytes))));
    }

    @Test
    public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
        Set<TopicPartition> expected = Collections.singleton(this.topicPartition);
        this.setupConsumer(0L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "store", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        Collection restored = this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)restored, (Matcher)IsEqual.equalTo(expected));
    }

    @Test
    public void shouldRestorePartitionsRegisteredPostInitialization() {
        MockRestoreCallback callbackTwo = new MockRestoreCallback();
        CompositeRestoreListener restoreListener2 = new CompositeRestoreListener((StateRestoreCallback)callbackTwo);
        this.setupConsumer(1L, this.topicPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 10L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, false, "storeName", RecordConverters.identity()));
        TopicPartition postInitialization = new TopicPartition("other", 0);
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andStubReturn((Object)this.task);
        EasyMock.expect((Object)this.active.restoringTaskFor(postInitialization)).andStubReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active, this.task});
        Assert.assertTrue((boolean)this.changelogReader.restore(this.active).isEmpty());
        this.addRecords(9L, this.topicPartition, 1);
        this.setupConsumer(3L, postInitialization);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
        this.changelogReader.register(new StateRestorer(postInitialization, restoreListener2, null, Long.MAX_VALUE, false, "otherStore", RecordConverters.identity()));
        Set expected = Utils.mkSet((Object[])new TopicPartition[]{this.topicPartition, postInitialization});
        this.consumer.assign((Collection)expected);
        MatcherAssert.assertThat((Object)this.changelogReader.restore(this.active), (Matcher)IsEqual.equalTo((Object)expected));
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
        MatcherAssert.assertThat((Object)callbackTwo.restored.size(), (Matcher)IsEqual.equalTo((Object)3));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.consumer.addEndOffsets(Collections.singletonMap(this.topicPartition, 15L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, 9L, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active});
        this.changelogReader.restore(this.active);
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
        int totalMessages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 11L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() {
        int totalMessages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, Long.MAX_VALUE, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic() {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, 5L, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopic() {
        int messages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, 10L, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopicEOSEnabled() {
        int totalMessages = 10;
        this.assignPartition(10L, this.topicPartition);
        this.addRecords(5L, this.topicPartition, 0);
        this.addRecords(5L, this.topicPartition, 6);
        this.consumer.assign(Collections.emptyList());
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 12L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, 6L, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)5));
    }

    @Test
    public void shouldNotThrowTaskMigratedExceptionIfEndOffsetNotExceededDuringRestoreForSourceTopicEOSEnabled() {
        int totalMessages = 10;
        this.setupConsumer(10L, this.topicPartition);
        this.consumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 11L));
        this.changelogReader.register(new StateRestorer(this.topicPartition, this.restoreListener, null, 11L, true, "storeName", RecordConverters.identity()));
        EasyMock.expect((Object)this.active.restoringTaskFor(this.topicPartition)).andReturn((Object)this.task);
        EasyMock.replay((Object[])new Object[]{this.active});
        this.changelogReader.restore(this.active);
        MatcherAssert.assertThat((Object)this.callback.restored.size(), (Matcher)IsEqual.equalTo((Object)10));
    }

    private void setupConsumer(long messages, TopicPartition topicPartition) {
        this.assignPartition(messages, topicPartition);
        this.addRecords(messages, topicPartition, 0);
        this.consumer.assign(Collections.emptyList());
    }

    private void addRecords(long messages, TopicPartition topicPartition, int startingOffset) {
        int i = 0;
        while ((long)i < messages) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), (long)(startingOffset + i), (Object)new byte[0], (Object)new byte[0]));
            ++i;
        }
    }

    private void assignPartition(long messages, TopicPartition topicPartition) {
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0L, messages)));
        this.consumer.assign(Collections.singletonList(topicPartition));
    }
}

