/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamThreadStateStoreProviderTest {
    private StreamTask taskOne;
    private StreamTask taskTwo;
    private StreamThreadStateStoreProvider provider;
    private StateDirectory stateDirectory;
    private File stateDir;
    private boolean storesAvailable;
    private final String topicName = "topic";

    @Before
    public void before() throws IOException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("the-source", new String[]{"topic"});
        builder.addProcessor("the-processor", new MockProcessorSupplier(), new String[]{"the-source"});
        builder.addStateStore(Stores.create((String)"kv-store").withStringKeys().withStringValues().inMemory().build(), new String[]{"the-processor"});
        builder.addStateStore(Stores.create((String)"window-store").withStringKeys().withStringValues().persistent().windowed(10L, 10L, 2, false).build(), new String[]{"the-processor"});
        Properties properties = new Properties();
        String applicationId = "applicationId";
        properties.put("application.id", "applicationId");
        properties.put("bootstrap.servers", "localhost:9092");
        this.stateDir = TestUtils.tempDirectory();
        String stateConfigDir = this.stateDir.getPath();
        properties.put("state.dir", stateConfigDir);
        StreamsConfig streamsConfig = new StreamsConfig((Map)properties);
        MockClientSupplier clientSupplier = new MockClientSupplier();
        this.configureRestoreConsumer(clientSupplier, "applicationId-kv-store-changelog");
        this.configureRestoreConsumer(clientSupplier, "applicationId-window-store-changelog");
        builder.setApplicationId("applicationId");
        ProcessorTopology topology = builder.build(null);
        final HashMap<TaskId, StreamTask> tasks = new HashMap<TaskId, StreamTask>();
        this.stateDirectory = new StateDirectory("applicationId", stateConfigDir, (Time)new MockTime());
        this.taskOne = this.createStreamsTask("applicationId", streamsConfig, clientSupplier, topology, new TaskId(0, 0));
        this.taskOne.initialize();
        tasks.put(new TaskId(0, 0), this.taskOne);
        this.taskTwo = this.createStreamsTask("applicationId", streamsConfig, clientSupplier, topology, new TaskId(0, 1));
        this.taskTwo.initialize();
        tasks.put(new TaskId(0, 1), this.taskTwo);
        this.storesAvailable = true;
        this.provider = new StreamThreadStateStoreProvider(new StreamThread(builder, streamsConfig, clientSupplier, "applicationId", "clientId", UUID.randomUUID(), new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory){

            public Map<TaskId, StreamTask> tasks() {
                return tasks;
            }

            public boolean isInitialized() {
                return StreamThreadStateStoreProviderTest.this.storesAvailable;
            }
        });
    }

    @After
    public void cleanUp() throws IOException {
        Utils.delete((File)this.stateDir);
    }

    @Test
    public void shouldFindKeyValueStores() throws Exception {
        List kvStores = this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
        Assert.assertEquals((long)2L, (long)kvStores.size());
    }

    @Test
    public void shouldFindWindowStores() throws Exception {
        List windowStores = this.provider.stores("window-store", QueryableStoreTypes.windowStore());
        Assert.assertEquals((long)2L, (long)windowStores.size());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() throws Exception {
        this.taskOne.getStore("window-store").close();
        this.provider.stores("window-store", QueryableStoreTypes.windowStore());
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() throws Exception {
        this.taskOne.getStore("kv-store").close();
        this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
    }

    @Test
    public void shouldReturnEmptyListIfNoStoresFoundWithName() throws Exception {
        Assert.assertEquals(Collections.emptyList(), (Object)this.provider.stores("not-a-store", QueryableStoreTypes.keyValueStore()));
    }

    @Test
    public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() throws Exception {
        Assert.assertEquals(Collections.emptyList(), (Object)this.provider.stores("window-store", QueryableStoreTypes.keyValueStore()));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() throws Exception {
        this.storesAvailable = false;
        this.provider.stores("kv-store", QueryableStoreTypes.keyValueStore());
    }

    private StreamTask createStreamsTask(String applicationId, StreamsConfig streamsConfig, MockClientSupplier clientSupplier, ProcessorTopology topology, TaskId taskId) {
        return new StreamTask(taskId, applicationId, Collections.singletonList(new TopicPartition("topic", taskId.partition)), topology, (Consumer)clientSupplier.consumer, (ChangelogReader)new StoreChangelogReader(clientSupplier.restoreConsumer), streamsConfig, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory, null, (Time)new MockTime(), (Producer)clientSupplier.getProducer(new HashMap<String, Object>())){

            protected void updateOffsetLimits() {
            }
        };
    }

    private void configureRestoreConsumer(MockClientSupplier clientSupplier, String topic) {
        clientSupplier.restoreConsumer.updatePartitions(topic, Arrays.asList(new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null)));
        TopicPartition tp1 = new TopicPartition(topic, 0);
        TopicPartition tp2 = new TopicPartition(topic, 1);
        clientSupplier.restoreConsumer.assign(Arrays.asList(tp1, tp2));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp1, 0L);
        offsets.put(tp2, 0L);
        clientSupplier.restoreConsumer.updateBeginningOffsets(offsets);
        clientSupplier.restoreConsumer.updateEndOffsets(offsets);
    }
}

