/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.test.MockStandbyUpdateListener;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.log4j.Level;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
@MockitoSettings(strictness=Strictness.STRICT_STUBS)
public class StoreChangelogReaderTest {
    @Mock
    private ProcessorStateManager stateManager;
    @Mock
    private ProcessorStateManager activeStateManager;
    @Mock
    private ProcessorStateManager standbyStateManager;
    @Mock
    private ProcessorStateManager.StateStoreMetadata storeMetadata;
    @Mock
    private ProcessorStateManager.StateStoreMetadata storeMetadataOne;
    @Mock
    private ProcessorStateManager.StateStoreMetadata storeMetadataTwo;
    @Mock
    private StateStore store;
    private final String storeName = "store";
    private final String topicName = "topic";
    private final LogContext logContext = new LogContext("test-reader ");
    private final TopicPartition tp = new TopicPartition("topic", 0);
    private final TopicPartition tp1 = new TopicPartition("one", 0);
    private final TopicPartition tp2 = new TopicPartition("two", 0);
    private final StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader"));
    private final MockTime time = new MockTime();
    private final MockStateRestoreListener callback = new MockStateRestoreListener();
    private final KafkaException kaboom = new KafkaException("KABOOM!");
    private final MockStandbyUpdateListener standbyListener = new MockStandbyUpdateListener();
    private final MockStateRestoreListener exceptionCallback = new MockStateRestoreListener(){

        @Override
        public void onRestoreStart(TopicPartition tp, String store, long stOffset, long edOffset) {
            throw StoreChangelogReaderTest.this.kaboom;
        }

        @Override
        public void onBatchRestored(TopicPartition tp, String store, long bedOffset, long numRestored) {
            throw StoreChangelogReaderTest.this.kaboom;
        }

        @Override
        public void onRestoreEnd(TopicPartition tp, String store, long totalRestored) {
            throw StoreChangelogReaderTest.this.kaboom;
        }
    };
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final MockAdminClient adminClient = new MockAdminClient();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);

    private void setupStateManagerMock(Task.TaskType type) {
        Mockito.when((Object)this.stateManager.storeMetadata(this.tp)).thenReturn((Object)this.storeMetadata);
        Mockito.when((Object)this.stateManager.taskType()).thenReturn((Object)type);
    }

    private void setupActiveStateManager() {
        Mockito.when((Object)this.activeStateManager.storeMetadata(this.tp)).thenReturn((Object)this.storeMetadata);
        Mockito.when((Object)this.activeStateManager.taskType()).thenReturn((Object)Task.TaskType.ACTIVE);
    }

    private void setupStandbyStateManager() {
        Mockito.when((Object)this.standbyStateManager.storeMetadata(this.tp)).thenReturn((Object)this.storeMetadata);
        Mockito.when((Object)this.standbyStateManager.taskType()).thenReturn((Object)Task.TaskType.STANDBY);
    }

    private void setupStoreMetadata() {
        Mockito.when((Object)this.storeMetadata.changelogPartition()).thenReturn((Object)this.tp);
        Mockito.when((Object)this.storeMetadata.store()).thenReturn((Object)this.store);
    }

    private void setupStore() {
        Mockito.when((Object)this.store.name()).thenReturn((Object)"store");
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldNotRegisterSameStoreMultipleTimes(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.changelogReader.register(this.tp, this.stateManager);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertNull((Object)this.changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertThrows(IllegalStateException.class, () -> this.changelogReader.register(this.tp, this.stateManager));
    }

    @Test
    public void shouldNotRegisterStoreWithoutMetadata() {
        Assertions.assertThrows(IllegalStateException.class, () -> this.changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), this.stateManager));
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldSupportUnregisterChangelogBeforeInitialization(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 100L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        if (type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.unregister(Collections.singleton(this.tp));
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        Assertions.assertNull((Object)this.callback.restoreTopicPartition);
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_suspended"));
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        Assertions.assertNull((Object)this.standbyListener.capturedStore("update_suspended"));
        Assertions.assertNull((Object)this.standbyListener.capturedStore("update_start"));
        Assertions.assertNull((Object)this.standbyListener.capturedStore("update_start"));
        Assertions.assertNull((Object)this.standbyListener.capturedStore("update_batch"));
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldSupportUnregisterChangelogBeforeCompletion(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)9L);
        if (type == Task.TaskType.STANDBY) {
            Mockito.when((Object)this.storeMetadata.endOffset()).thenReturn((Object)10L);
            Mockito.when((Object)this.stateManager.changelogAsSource(this.tp)).thenReturn((Object)true);
        }
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 100L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        if (type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
        Assertions.assertEquals((long)10L, (long)this.consumer.position(this.tp));
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.assignment());
        changelogReader.unregister(Collections.singleton(this.tp));
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_suspended"));
        } else {
            Assertions.assertNull((Object)this.callback.restoreTopicPartition);
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_suspended"));
            Assertions.assertEquals((Object)"store", (Object)this.standbyListener.capturedStore("update_start"));
            Assertions.assertEquals((Object)this.tp, (Object)this.standbyListener.updatePartition);
        }
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldSupportUnregisterChangelogAfterCompletion(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)9L);
        if (type == Task.TaskType.STANDBY) {
            Mockito.when((Object)this.storeMetadata.endOffset()).thenReturn((Object)10L);
            Mockito.when((Object)this.stateManager.changelogAsSource(this.tp)).thenReturn((Object)true);
        }
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        if (type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((long)10L, (long)this.consumer.position(this.tp));
        Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.assignment());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
            Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        } else {
            Assertions.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
            Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        }
        changelogReader.unregister(Collections.singleton(this.tp));
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_suspended"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        } else {
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("update_suspended"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("update_batch"));
            Assertions.assertEquals((Object)"store", (Object)this.standbyListener.capturedStore("update_start"));
            Assertions.assertEquals((Object)this.tp, (Object)this.standbyListener.updatePartition);
        }
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldInitializeChangelogAndCheckForCompletion(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)9L);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((Object)(type == Task.TaskType.ACTIVE ? StoreChangelogReader.ChangelogState.COMPLETED : StoreChangelogReader.ChangelogState.RESTORING), (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((Long)(type == Task.TaskType.ACTIVE ? Long.valueOf(10L) : null), (Long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals(type == Task.TaskType.ACTIVE ? Collections.singleton(this.tp) : Collections.emptySet(), (Object)changelogReader.completedChangelogs());
        Assertions.assertEquals((long)10L, (long)this.consumer.position(this.tp));
        Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        }
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutException(Task.TaskType type) {
        if (type == Task.TaskType.ACTIVE) {
            this.setupStateManagerMock(type);
            this.setupStoreMetadata();
            Map mockTasks = (Map)Mockito.mock(Map.class);
            Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
            Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
            Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 5L));
            this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
            MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

                public long position(TopicPartition partition) {
                    throw new TimeoutException("KABOOM!");
                }
            };
            consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
            StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
            changelogReader.register(this.tp, this.stateManager);
            changelogReader.restore(mockTasks);
            MatcherAssert.assertThat((Object)this.callback.restoreStartOffset, (Matcher)Matchers.equalTo((Object)0L));
        }
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldPollWithRightTimeoutWithStateUpdater(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        this.shouldPollWithRightTimeout(true, type);
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldPollWithRightTimeoutWithoutStateUpdater(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        this.shouldPollWithRightTimeout(false, type);
    }

    private void shouldPollWithRightTimeout(boolean stateUpdaterEnabled, Task.TaskType type) {
        Properties properties = new Properties();
        properties.put("__state.updater.enabled__", (Object)stateUpdaterEnabled);
        this.shouldPollWithRightTimeout(properties, type);
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldPollWithRightTimeoutWithStateUpdaterDefault(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        Properties properties = new Properties();
        this.shouldPollWithRightTimeout(properties, type);
    }

    private void shouldPollWithRightTimeout(Properties properties, Task.TaskType type) {
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn(null).thenReturn((Object)9L);
        Mockito.when((Object)this.stateManager.taskId()).thenReturn((Object)taskId);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 11L));
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader", properties));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        if (type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((Object)Duration.ofMillis(config.getLong("poll.ms")), (Object)this.consumer.lastPollTimeout());
        } else if (!properties.containsKey("__state.updater.enabled__") || ((Boolean)properties.get("__state.updater.enabled__")).booleanValue()) {
            Assertions.assertEquals((Object)Duration.ofMillis(config.getLong("poll.ms")), (Object)this.consumer.lastPollTimeout());
        } else {
            Assertions.assertEquals((Object)Duration.ZERO, (Object)this.consumer.lastPollTimeout());
        }
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldRestoreFromPositionAndCheckForCompletion(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)5L);
        if (type == Task.TaskType.STANDBY) {
            Mockito.when((Object)this.storeMetadata.endOffset()).thenReturn((Object)10L);
        }
        Mockito.when((Object)this.stateManager.taskId()).thenReturn((Object)taskId);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        if (type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertTrue((boolean)changelogReader.completedChangelogs().isEmpty());
        Assertions.assertEquals((long)6L, (long)this.consumer.position(this.tp));
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
            Assertions.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        } else {
            Assertions.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        }
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assertions.assertEquals((long)12L, (long)this.consumer.position(this.tp));
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assertions.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
            Assertions.assertEquals((int)1, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
            Assertions.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
            Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_batch"));
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
        } else {
            Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assertions.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
            Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
            Assertions.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
            Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        }
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldRestoreFromBeginningAndCheckCompletion(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        TaskId taskId = new TaskId(0, 0);
        if (type == Task.TaskType.STANDBY && this.logContext.logger(StoreChangelogReader.class).isDebugEnabled()) {
            Mockito.when((Object)this.storeMetadata.offset()).thenReturn(null).thenReturn(null).thenReturn((Object)9L);
            Mockito.when((Object)this.storeMetadata.endOffset()).thenReturn((Object)10L);
        } else {
            Mockito.when((Object)this.storeMetadata.offset()).thenReturn(null).thenReturn((Object)9L);
        }
        Mockito.when((Object)this.stateManager.taskId()).thenReturn((Object)taskId);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 11L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        if (type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((long)5L, (long)this.consumer.position(this.tp));
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((long)11L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
            Assertions.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        } else {
            Assertions.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        }
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.consumer.seek(this.tp, 11L);
        changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assertions.assertEquals((long)11L, (long)this.consumer.position(this.tp));
        Assertions.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assertions.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
            Assertions.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
            Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_batch"));
            Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
        } else {
            Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assertions.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
            Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        }
    }

    @Test
    public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
        this.setupActiveStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)5L);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 0L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
        Assertions.assertEquals((long)6L, (long)this.consumer.position(this.tp));
        Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        Assertions.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
        Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assertions.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
    }

    @Test
    public void shouldRequestPositionAndHandleTimeoutException() {
        this.setupActiveStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        TaskId taskId = new TaskId(0, 0);
        Task mockTask = (Task)Mockito.mock(Task.class);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)10L);
        Mockito.when((Object)this.activeStateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 10L));
        Mockito.when((Object)this.activeStateManager.taskId()).thenReturn((Object)taskId);
        final AtomicBoolean clearException = new AtomicBoolean(false);
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public long position(TopicPartition partition) {
                if (clearException.get()) {
                    return 10L;
                }
                throw new TimeoutException("KABOOM!");
            }
        };
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertTrue((boolean)changelogReader.completedChangelogs().isEmpty());
        Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        ((Task)Mockito.verify((Object)mockTask)).clearTaskTimeout();
        ((Task)Mockito.verify((Object)mockTask)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception)ArgumentMatchers.any());
        ((Task)Mockito.verify((Object)mockTask)).recordRestoration((Time)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        clearException.set(true);
        Mockito.reset((Object[])new Task[]{mockTask});
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
        Assertions.assertEquals((long)10L, (long)consumer.position(this.tp));
        ((Task)Mockito.verify((Object)mockTask)).clearTaskTimeout();
    }

    @Test
    public void shouldThrowIfPositionFail() {
        this.setupActiveStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)this.activeStateManager.taskId()).thenReturn((Object)taskId);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)10L);
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public long position(TopicPartition partition) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.activeStateManager);
        StreamsException thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class))));
        Assertions.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldRequestEndOffsetsAndHandleTimeoutException() {
        this.setupActiveStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        TaskId taskId = new TaskId(0, 0);
        Task mockTask = (Task)Mockito.mock(Task.class);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)5L);
        Mockito.when((Object)this.activeStateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 5L));
        Mockito.when((Object)this.activeStateManager.taskId()).thenReturn((Object)taskId);
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        MockAdminClient adminClient = new MockAdminClient(){

            public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
                if (functionCalled.get()) {
                    return super.listOffsets(topicPartitionOffsets, options);
                }
                functionCalled.set(true);
                throw new TimeoutException("KABOOM!");
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
                throw new AssertionError((Object)"Should not trigger this function");
            }
        };
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, (Consumer)consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertTrue((boolean)functionCalled.get());
        ((Task)Mockito.verify((Object)mockTask)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception)ArgumentMatchers.any());
        Mockito.reset((Object[])new Task[]{mockTask});
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)6L, (long)consumer.position(this.tp));
        ((Task)Mockito.verify((Object)mockTask)).clearTaskTimeout();
        ((Task)Mockito.verify((Object)mockTask)).recordRestoration((Time)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void shouldThrowIfEndOffsetsFail() {
        this.setupActiveStateManager();
        Mockito.when((Object)this.storeMetadata.changelogPartition()).thenReturn((Object)this.tp);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)this.activeStateManager.taskId()).thenReturn((Object)taskId);
        MockAdminClient adminClient = new MockAdminClient(){

            public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 0L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.activeStateManager);
        StreamsException thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class))));
        Assertions.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class, names={"ACTIVE", "STANDBY"})
    public void shouldRequestCommittedOffsetsAndHandleTimeoutException(Task.TaskType type) {
        this.setupStateManagerMock(type);
        this.setupStoreMetadata();
        this.setupStore();
        TaskId taskId = new TaskId(0, 0);
        Task mockTask = (Task)Mockito.mock(Task.class);
        if (type == Task.TaskType.ACTIVE) {
            mockTask.clearTaskTimeout();
        }
        Mockito.when((Object)this.stateManager.changelogAsSource(this.tp)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)5L);
        Mockito.when((Object)this.stateManager.taskId()).thenReturn((Object)taskId);
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        MockAdminClient adminClient = new MockAdminClient(){

            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
                if (functionCalled.get()) {
                    return super.listConsumerGroupOffsets(groupSpecs, options);
                }
                functionCalled.set(true);
                return AdminClientTestUtils.listConsumerGroupOffsetsResult((String)groupSpecs.keySet().iterator().next(), (KafkaException)new TimeoutException("KABOOM!"));
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 20L));
        adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assertions.assertEquals((Object)(type == Task.TaskType.ACTIVE ? StoreChangelogReader.ChangelogState.REGISTERED : StoreChangelogReader.ChangelogState.RESTORING), (Object)changelogReader.changelogMetadata(this.tp).state());
        if (type == Task.TaskType.ACTIVE) {
            Assertions.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        } else {
            Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        }
        Assertions.assertTrue((boolean)functionCalled.get());
        ((Task)Mockito.verify((Object)mockTask)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception)ArgumentMatchers.any());
        Mockito.reset((Object[])new Task[]{mockTask});
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)(type == Task.TaskType.ACTIVE ? 10L : 0L), (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)6L, (long)this.consumer.position(this.tp));
        if (type == Task.TaskType.ACTIVE) {
            ((Task)Mockito.verify((Object)mockTask, (VerificationMode)Mockito.times((int)2))).clearTaskTimeout();
            ((Task)Mockito.verify((Object)mockTask)).recordRestoration((Time)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean());
        }
    }

    @ParameterizedTest
    @EnumSource(value=Task.TaskType.class)
    public void shouldThrowIfCommittedOffsetsFail(Task.TaskType type) {
        this.setupStateManagerMock(type);
        Mockito.when((Object)this.storeMetadata.changelogPartition()).thenReturn((Object)this.tp);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)this.stateManager.taskId()).thenReturn((Object)taskId);
        Mockito.when((Object)this.stateManager.changelogAsSource(this.tp)).thenReturn((Object)true);
        MockAdminClient adminClient = new MockAdminClient(){

            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.stateManager);
        StreamsException thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class))));
        Assertions.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowIfUnsubscribeFail() {
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public void unsubscribe() {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        StreamsException thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> ((StoreChangelogReader)changelogReader).clear());
        Assertions.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
        this.setupStandbyStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)3L);
        Mockito.when((Object)this.storeMetadata.endOffset()).thenReturn((Object)20L);
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 0L));
        this.changelogReader.register(this.tp, this.standbyStateManager);
        this.changelogReader.restore(mockTasks);
        Assertions.assertNull((Object)this.callback.restoreTopicPartition);
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertNull((Object)this.changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 10L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.changelogReader.restore(mockTasks);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)0L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertTrue((boolean)this.changelogReader.changelogMetadata(this.tp).bufferedRecords().isEmpty());
        Assertions.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        this.changelogReader.transitToUpdateStandby();
        this.changelogReader.restore(mockTasks);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)5L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertTrue((boolean)this.changelogReader.changelogMetadata(this.tp).bufferedRecords().isEmpty());
    }

    @Test
    public void shouldNotUpdateLimitForNonSourceStandbyChangelog() {
        this.setupStandbyStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)3L);
        Mockito.when((Object)this.storeMetadata.endOffset()).thenReturn((Object)20L);
        Mockito.when((Object)this.standbyStateManager.changelogAsSource(this.tp)).thenReturn((Object)false);
        MockAdminClient adminClient = new MockAdminClient(){

            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
                throw new AssertionError((Object)"Should not try to fetch committed offsets");
            }
        };
        Properties properties = new Properties();
        properties.put("commit.interval.ms", (Object)100L);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader", properties));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.transitToUpdateStandby();
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 0L));
        changelogReader.register(this.tp, this.standbyStateManager);
        Assertions.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        changelogReader.restore(mockTasks);
        Assertions.assertNull((Object)this.callback.restoreTopicPartition);
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 5L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 10L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)6L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
    }

    @Test
    public void shouldRestoreToLimitInStandbyState() {
        this.setupStandbyStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.standbyStateManager.changelogAsSource(this.tp)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)3L);
        Mockito.when((Object)this.storeMetadata.endOffset()).thenReturn((Object)20L);
        long now = this.time.milliseconds();
        Properties properties = new Properties();
        properties.put("commit.interval.ms", (Object)100L);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader", properties));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.transitToUpdateStandby();
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 0L));
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 7L));
        changelogReader.register(this.tp, this.standbyStateManager);
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        changelogReader.restore(mockTasks);
        Assertions.assertNull((Object)this.callback.restoreTopicPartition);
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)7L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 5L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 10L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((long)7L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)4, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
        Assertions.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 10L));
        this.time.setCurrentTimeMs(now + 100L);
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)7L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)4, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.time.setCurrentTimeMs(now + 101L);
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)4, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)2, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)2, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 15L));
        this.time.setCurrentTimeMs(now + 201L);
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)2, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.time.setCurrentTimeMs(now + 202L);
        changelogReader.enforceRestoreActive();
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)2, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        changelogReader.transitToUpdateStandby();
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)15L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)2, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)2, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)15L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)6L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 12L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 13L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 14L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 15L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(mockTasks);
        Assertions.assertEquals((long)15L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assertions.assertEquals((long)9L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assertions.assertEquals((int)1, (int)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assertions.assertEquals((int)0, (int)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
    }

    @Test
    public void shouldRestoreMultipleChangelogs() {
        this.setupActiveStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        Map mockTasks = (Map)Mockito.mock(Map.class);
        Mockito.when(mockTasks.get(null)).thenReturn(Mockito.mock(Task.class));
        Mockito.when((Object)mockTasks.containsKey(null)).thenReturn((Object)true);
        Mockito.when((Object)this.storeMetadataOne.changelogPartition()).thenReturn((Object)this.tp1);
        Mockito.when((Object)this.storeMetadataOne.store()).thenReturn((Object)this.store);
        Mockito.when((Object)this.storeMetadataTwo.changelogPartition()).thenReturn((Object)this.tp2);
        Mockito.when((Object)this.storeMetadataTwo.store()).thenReturn((Object)this.store);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)0L);
        Mockito.when((Object)this.storeMetadataOne.offset()).thenReturn((Object)0L);
        Mockito.when((Object)this.storeMetadataTwo.offset()).thenReturn((Object)0L);
        Mockito.when((Object)this.activeStateManager.storeMetadata(this.tp1)).thenReturn((Object)this.storeMetadataOne);
        Mockito.when((Object)this.activeStateManager.storeMetadata(this.tp2)).thenReturn((Object)this.storeMetadataTwo);
        Mockito.when((Object)this.activeStateManager.changelogOffsets()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.tp, (Object)5L), Utils.mkEntry((Object)this.tp1, (Object)5L), Utils.mkEntry((Object)this.tp2, (Object)5L)}));
        this.setupConsumer(10L, this.tp);
        this.setupConsumer(5L, this.tp1);
        this.setupConsumer(3L, this.tp2);
        this.changelogReader.register(this.tp, this.activeStateManager);
        this.changelogReader.register(this.tp1, this.activeStateManager);
        this.changelogReader.register(this.tp2, this.activeStateManager);
        this.changelogReader.restore(mockTasks);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp1).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp2).state());
        this.changelogReader.unregister(Collections.singletonList(this.tp));
        Assertions.assertNull((Object)this.changelogReader.changelogMetadata(this.tp));
        Assertions.assertFalse((boolean)this.changelogReader.isEmpty());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp1).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp2).state());
        this.changelogReader.clear();
        Assertions.assertTrue((boolean)this.changelogReader.isEmpty());
        Assertions.assertNull((Object)this.changelogReader.changelogMetadata(this.tp1));
        Assertions.assertNull((Object)this.changelogReader.changelogMetadata(this.tp2));
        Assertions.assertEquals((Object)this.changelogReader.state(), (Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING);
    }

    @Test
    public void shouldTransitState() {
        this.setupActiveStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        Mockito.when((Object)this.standbyStateManager.taskType()).thenReturn((Object)Task.TaskType.STANDBY);
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)this.storeMetadataOne.changelogPartition()).thenReturn((Object)this.tp1);
        Mockito.when((Object)this.storeMetadataOne.store()).thenReturn((Object)this.store);
        Mockito.when((Object)this.storeMetadataTwo.changelogPartition()).thenReturn((Object)this.tp2);
        Mockito.when((Object)this.storeMetadataTwo.store()).thenReturn((Object)this.store);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)5L);
        Mockito.when((Object)this.storeMetadataOne.offset()).thenReturn((Object)5L);
        Mockito.when((Object)this.storeMetadataTwo.offset()).thenReturn((Object)5L);
        Mockito.when((Object)this.standbyStateManager.storeMetadata(this.tp1)).thenReturn((Object)this.storeMetadataOne);
        Mockito.when((Object)this.standbyStateManager.storeMetadata(this.tp2)).thenReturn((Object)this.storeMetadataTwo);
        Mockito.when((Object)this.activeStateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.tp, 5L));
        Mockito.when((Object)this.activeStateManager.taskId()).thenReturn((Object)taskId);
        Mockito.when((Object)this.standbyStateManager.taskId()).thenReturn((Object)taskId);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp1, 10L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp2, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.register(this.tp1, this.standbyStateManager);
        changelogReader.register(this.tp2, this.standbyStateManager);
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class)));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}), (Object)this.consumer.paused());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        changelogReader.enforceRestoreActive();
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        changelogReader.transitToUpdateStandby();
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING, (Object)changelogReader.state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        Assertions.assertThrows(IllegalStateException.class, () -> ((StoreChangelogReader)changelogReader).transitToUpdateStandby());
        changelogReader.unregister(Collections.singletonList(this.tp));
        changelogReader.register(this.tp, this.activeStateManager);
        Assertions.assertThrows(IllegalStateException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class))));
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assertions.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING, (Object)changelogReader.state());
        changelogReader.enforceRestoreActive();
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}), (Object)this.consumer.paused());
    }

    @Test
    public void shouldTransitStateBackToActiveRestoringAfterRemovingLastTask() {
        Mockito.when((Object)this.standbyStateManager.taskType()).thenReturn((Object)Task.TaskType.STANDBY);
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback, (StandbyUpdateListener)this.standbyListener);
        Mockito.when((Object)this.standbyStateManager.storeMetadata(this.tp1)).thenReturn((Object)this.storeMetadataOne);
        changelogReader.register(this.tp1, this.standbyStateManager);
        changelogReader.transitToUpdateStandby();
        changelogReader.unregister((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.tp1}));
        Assertions.assertTrue((boolean)changelogReader.isEmpty());
        Assertions.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
    }

    @Test
    public void shouldThrowIfRestoreCallbackThrows() {
        this.setupActiveStateManager();
        this.setupStoreMetadata();
        this.setupStore();
        TaskId taskId = new TaskId(0, 0);
        Mockito.when((Object)this.storeMetadata.offset()).thenReturn((Object)5L);
        Mockito.when((Object)this.activeStateManager.taskId()).thenReturn((Object)taskId);
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.exceptionCallback, (StandbyUpdateListener)this.standbyListener);
        changelogReader.register(this.tp, this.activeStateManager);
        StreamsException thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class))));
        Assertions.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class))));
        Assertions.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
        this.consumer.seek(this.tp, 10L);
        thrown = (StreamsException)Assertions.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, Mockito.mock(Task.class))));
        Assertions.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldNotThrowOnUnknownRevokedPartition() {
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StoreChangelogReader.class);){
            appender.setClassLogger(StoreChangelogReader.class, Level.DEBUG);
            this.changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)Matchers.hasItem((Object)"test-reader Changelog partition unknown-0 could not be found, it could be already cleaned up during the handling of task corruption and never restore again"));
        }
    }

    private void setupConsumer(long messages, TopicPartition topicPartition) {
        this.assignPartition(messages, topicPartition);
        this.addRecords(messages, topicPartition);
        this.consumer.assign(Collections.emptyList());
    }

    private void addRecords(long messages, TopicPartition topicPartition) {
        int i = 0;
        while ((long)i < messages) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), (long)i, (Object)new byte[0], (Object)new byte[0]));
            ++i;
        }
    }

    private void assignPartition(long messages, TopicPartition topicPartition) {
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0L, messages) + 1L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0L, messages) + 1L));
        this.consumer.assign(Collections.singletonList(topicPartition));
    }
}

