/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.client;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState;
import org.apache.flink.queryablestate.client.state.ImmutableFoldingState;
import org.apache.flink.queryablestate.client.state.ImmutableListState;
import org.apache.flink.queryablestate.client.state.ImmutableMapState;
import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
import org.apache.flink.queryablestate.client.state.ImmutableValueState;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class QueryableStateClient {
    private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of(ValueStateDescriptor.class, ImmutableValueState::createState), Tuple2.of(ListStateDescriptor.class, ImmutableListState::createState), Tuple2.of(MapStateDescriptor.class, ImmutableMapState::createState), Tuple2.of(AggregatingStateDescriptor.class, ImmutableAggregatingState::createState), Tuple2.of(ReducingStateDescriptor.class, ImmutableReducingState::createState), Tuple2.of(FoldingStateDescriptor.class, ImmutableFoldingState::createState)).collect(Collectors.toMap(t -> (Class)t.f0, t -> (StateFactory)t.f1));
    private final Client<KvStateRequest, KvStateResponse> client;
    private final InetSocketAddress remoteAddress;
    private ExecutionConfig executionConfig;

    public QueryableStateClient(String remoteHostname, int remotePort) throws UnknownHostException {
        this(InetAddress.getByName((String)Preconditions.checkNotNull((Object)remoteHostname)), remotePort);
    }

    public QueryableStateClient(InetAddress remoteAddress, int remotePort) {
        Preconditions.checkArgument((remotePort >= 0 && remotePort <= 65536 ? 1 : 0) != 0, (Object)("Remote Port " + remotePort + " is out of valid port range (0-65536)."));
        this.remoteAddress = new InetSocketAddress(remoteAddress, remotePort);
        MessageSerializer<KvStateRequest, KvStateResponse> messageSerializer = new MessageSerializer<KvStateRequest, KvStateResponse>(new KvStateRequest.KvStateRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
        this.client = new Client<KvStateRequest, KvStateResponse>("Queryable State Client", 1, messageSerializer, new DisabledKvStateRequestStats());
    }

    public CompletableFuture<?> shutdownAndHandle() {
        return this.client.shutdown();
    }

    public void shutdownAndWait() {
        try {
            this.client.shutdown().get();
            LOG.info("The Queryable State Client was shutdown successfully.");
        }
        catch (Exception e) {
            LOG.warn("The Queryable State Client shutdown failed: ", (Throwable)e);
        }
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
        ExecutionConfig prev = this.executionConfig;
        this.executionConfig = config;
        return prev;
    }

    @PublicEvolving
    public <K, S extends State, V> CompletableFuture<S> getKvState(JobID jobId, String queryableStateName, K key, TypeHint<K> keyTypeHint, StateDescriptor<S, V> stateDescriptor) {
        Preconditions.checkNotNull(keyTypeHint);
        TypeInformation keyTypeInfo = keyTypeHint.getTypeInfo();
        return this.getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
    }

    @PublicEvolving
    public <K, S extends State, V> CompletableFuture<S> getKvState(JobID jobId, String queryableStateName, K key, TypeInformation<K> keyTypeInfo, StateDescriptor<S, V> stateDescriptor) {
        return this.getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
    }

    private <K, N, S extends State, V> CompletableFuture<S> getKvState(JobID jobId, String queryableStateName, K key, N namespace, TypeInformation<K> keyTypeInfo, TypeInformation<N> namespaceTypeInfo, StateDescriptor<S, V> stateDescriptor) {
        byte[] serializedKeyAndNamespace;
        Preconditions.checkNotNull((Object)jobId);
        Preconditions.checkNotNull((Object)queryableStateName);
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(namespace);
        Preconditions.checkNotNull(keyTypeInfo);
        Preconditions.checkNotNull(namespaceTypeInfo);
        Preconditions.checkNotNull(stateDescriptor);
        TypeSerializer keySerializer = keyTypeInfo.createSerializer(this.executionConfig);
        TypeSerializer namespaceSerializer = namespaceTypeInfo.createSerializer(this.executionConfig);
        stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
        try {
            serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
        }
        catch (IOException e) {
            return FutureUtils.getFailedFuture(e);
        }
        return this.getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(stateResponse -> this.createState((KvStateResponse)stateResponse, (StateDescriptor)stateDescriptor));
    }

    private <T, S extends State> S createState(KvStateResponse stateResponse, StateDescriptor<S, T> stateDescriptor) {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDescriptor.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        try {
            return stateFactory.createState(stateDescriptor, stateResponse.getContent());
        }
        catch (Exception e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private CompletableFuture<KvStateResponse> getKvState(JobID jobId, String queryableStateName, int keyHashCode, byte[] serializedKeyAndNamespace) {
        LOG.debug("Sending State Request to {}.", (Object)this.remoteAddress);
        try {
            KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace);
            return this.client.sendRequest(this.remoteAddress, request);
        }
        catch (Exception e) {
            LOG.error("Unable to send KVStateRequest: ", (Throwable)e);
            return FutureUtils.getFailedFuture(e);
        }
    }

    private static interface StateFactory {
        public <T, S extends State> S createState(StateDescriptor<S, T> var1, byte[] var2) throws Exception;
    }
}

