/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AbstractTaskTest {
    private final TaskId id = new TaskId(0, 0);
    private StateDirectory stateDirectory = (StateDirectory)EasyMock.createMock(StateDirectory.class);

    @Before
    public void before() {
        EasyMock.expect((Object)this.stateDirectory.directoryForTask(this.id)).andReturn((Object)TestUtils.tempDirectory());
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() throws Exception {
        Consumer consumer = this.mockConsumer((RuntimeException)new AuthorizationException("blah"));
        AbstractTask task = this.createTask(consumer, Collections.emptyList());
        task.updateOffsetLimits();
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() throws Exception {
        Consumer consumer = this.mockConsumer((RuntimeException)new KafkaException("blah"));
        AbstractTask task = this.createTask(consumer, Collections.emptyList());
        task.updateOffsetLimits();
    }

    @Test(expected=WakeupException.class)
    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() throws Exception {
        Consumer consumer = this.mockConsumer((RuntimeException)new WakeupException());
        AbstractTask task = this.createTask(consumer, Collections.emptyList());
        task.updateOffsetLimits();
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() throws IOException {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        StateStore store = (StateStore)EasyMock.createNiceMock(StateStore.class);
        EasyMock.expect((Object)this.stateDirectory.lock(this.id, 5)).andReturn((Object)false);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        AbstractTask task = this.createTask(consumer, Collections.singletonList(store));
        try {
            task.initStateStores();
            Assert.fail((String)"Should have thrown LockException");
        }
        catch (LockException e) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() throws IOException {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        AbstractTask task = this.createTask(consumer, Collections.emptyList());
        task.initStateStores();
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
    }

    private AbstractTask createTask(Consumer consumer, List<StateStore> stateStores) {
        Properties properties = new Properties();
        properties.put("application.id", "app-id");
        properties.put("bootstrap.servers", "dummyhost:9092");
        StreamsConfig config = new StreamsConfig((Map)properties);
        return new AbstractTask(this.id, "app", Collections.singletonList(new TopicPartition("t", 0)), new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), stateStores, Collections.emptyMap(), Collections.emptyList()), consumer, (ChangelogReader)new StoreChangelogReader(consumer), false, this.stateDirectory, config){

            public void resume() {
            }

            public void commit() {
            }

            public void suspend() {
            }

            public void close(boolean clean, boolean isZombie) {
            }

            public boolean initializeStateStores() {
                return false;
            }

            boolean process() {
                return false;
            }

            boolean maybePunctuate() {
                return false;
            }

            boolean commitNeeded() {
                return false;
            }

            public void initializeTopology() {
            }
        };
    }

    private Consumer mockConsumer(final RuntimeException toThrow) {
        return new MockConsumer(OffsetResetStrategy.EARLIEST){

            public OffsetAndMetadata committed(TopicPartition partition) {
                throw toThrow;
            }
        };
    }
}

