/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2.adaptor;

import java.io.IOException;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.StateDescriptorUtils;
import org.apache.flink.runtime.state.v2.adaptor.AggregatingStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.ReducingStateAdaptor;
import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor;

public class AsyncKeyedStateBackendAdaptor<K>
implements AsyncKeyedStateBackend<K> {
    private final CheckpointableKeyedStateBackend<K> keyedStateBackend;

    public AsyncKeyedStateBackendAdaptor(CheckpointableKeyedStateBackend<K> keyedStateBackend) {
        this.keyedStateBackend = keyedStateBackend;
    }

    @Override
    public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
    }

    @Override
    @Nonnull
    public <N, S extends State, SV> S createState(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception {
        org.apache.flink.api.common.state.StateDescriptor rawStateDesc = StateDescriptorUtils.transformFromV2ToV1(stateDesc);
        Object rawState = this.keyedStateBackend.getPartitionedState(defaultNamespace, namespaceSerializer, rawStateDesc);
        switch (rawStateDesc.getType()) {
            case VALUE: {
                return (S)new ValueStateAdaptor((InternalValueState)rawState);
            }
            case LIST: {
                return (S)new ListStateAdaptor((InternalListState)rawState);
            }
            case REDUCING: {
                return (S)new ReducingStateAdaptor((InternalReducingState)rawState);
            }
            case AGGREGATING: {
                return (S)new AggregatingStateAdaptor((InternalAggregatingState)rawState);
            }
            case MAP: {
                return (S)new MapStateAdaptor((InternalMapState)rawState);
            }
        }
        throw new UnsupportedOperationException(String.format("Unsupported state type: %s", rawStateDesc.getType()));
    }

    @Override
    @Nonnull
    public StateExecutor createStateExecutor() {
        return null;
    }

    @Override
    public void switchContext(RecordContext<K> context) {
        this.keyedStateBackend.setCurrentKeyAndKeyGroup(context.getKey(), context.getKeyGroup());
    }

    @Override
    public void dispose() {
    }

    @Override
    public void close() throws IOException {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener)this.keyedStateBackend).notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener)this.keyedStateBackend).notifyCheckpointAborted(checkpointId);
        }
    }

    public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof InternalCheckpointListener) {
            ((InternalCheckpointListener)this.keyedStateBackend).notifyCheckpointSubsumed(checkpointId);
        }
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        return this.keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }
}

