/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.StateStoreProvider;

public class StreamThreadStateStoreProvider
implements StateStoreProvider {
    private final StreamThread streamThread;

    public StreamThreadStateStoreProvider(StreamThread streamThread) {
        this.streamThread = streamThread;
    }

    @Override
    public <T> List<T> stores(String storeName, QueryableStoreType<T> queryableStoreType) {
        if (this.streamThread.state() == StreamThread.State.DEAD) {
            return Collections.emptyList();
        }
        if (!this.streamThread.isRunningAndNotRebalancing()) {
            throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
        }
        ArrayList<StateStore> stores = new ArrayList<StateStore>();
        for (Task streamTask : this.streamThread.tasks().values()) {
            StateStore store = streamTask.getStore(storeName);
            if (store == null || !queryableStoreType.accepts(store)) continue;
            if (!store.isOpen()) {
                throw new InvalidStateStoreException("the state store, " + storeName + ", may have migrated to another instance.");
            }
            stores.add(store);
        }
        return stores;
    }
}

