/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;

public class AbstractTaskTest {
    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() throws Exception {
        Consumer consumer = this.mockConsumer((RuntimeException)new AuthorizationException("blah"));
        AbstractTask task = this.createTask(consumer);
        task.initializeOffsetLimits();
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() throws Exception {
        Consumer consumer = this.mockConsumer((RuntimeException)new KafkaException("blah"));
        AbstractTask task = this.createTask(consumer);
        task.initializeOffsetLimits();
    }

    @Test(expected=WakeupException.class)
    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() throws Exception {
        Consumer consumer = this.mockConsumer((RuntimeException)new WakeupException());
        AbstractTask task = this.createTask(consumer);
        task.initializeOffsetLimits();
    }

    private AbstractTask createTask(Consumer consumer) {
        return new AbstractTask(new TaskId(0, 0), "app", Collections.singletonList(new TopicPartition("t", 0)), new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList()), consumer, consumer, false, new StateDirectory("app", TestUtils.tempDirectory().getPath()), new ThreadCache("testCache", 0L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()))){

            public void commit() {
            }

            public void close() {
            }

            public void initTopology() {
            }

            public void closeTopology() {
            }

            public void commitOffsets() {
            }
        };
    }

    private Consumer mockConsumer(final RuntimeException toThrow) {
        return new MockConsumer(OffsetResetStrategy.EARLIEST){

            public OffsetAndMetadata committed(TopicPartition partition) {
                throw toThrow;
            }
        };
    }
}

