/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.runtime;

import java.io.IOException;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.StreamingSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals;
import org.apache.beam.runners.samza.util.StateUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.samza.context.TaskContext;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class SamzaStateRequestHandlers {
    public static @UnknownKeyFor @NonNull @Initialized StateRequestHandler of(@UnknownKeyFor @NonNull @Initialized String transformId, @UnknownKeyFor @NonNull @Initialized TaskContext context, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized ExecutableStage executableStage, @UnknownKeyFor @NonNull @Initialized StageBundleFactory stageBundleFactory, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.ExecutableStagePayload.SideInputId, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputIds, @UnknownKeyFor @NonNull @Initialized SideInputHandler sideInputHandler) {
        StateRequestHandler sideInputStateHandler = SamzaStateRequestHandlers.createSideInputStateHandler(executableStage, sideInputIds, sideInputHandler);
        StateRequestHandler userStateRequestHandler = SamzaStateRequestHandlers.createUserStateRequestHandler(transformId, executableStage, context, pipelineOptions, stageBundleFactory);
        EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlerMap = new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(BeamFnApi.StateKey.TypeCase.class);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.ITERABLE_SIDE_INPUT, sideInputStateHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, sideInputStateHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_KEYS_SIDE_INPUT, sideInputStateHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, userStateRequestHandler);
        return StateRequestHandlers.delegateBasedUponType(handlerMap);
    }

    private static @UnknownKeyFor @NonNull @Initialized StateRequestHandler createSideInputStateHandler(@UnknownKeyFor @NonNull @Initialized ExecutableStage executableStage, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized RunnerApi.ExecutableStagePayload.SideInputId, @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputIds, @UnknownKeyFor @NonNull @Initialized SideInputHandler sideInputHandler) {
        if (executableStage.getSideInputs().size() <= 0) {
            return StateRequestHandler.unsupported();
        }
        StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory = (StateRequestHandlers.SideInputHandlerFactory)Preconditions.checkNotNull((Object)StreamingSideInputHandlerFactory.forStage((ExecutableStage)executableStage, sideInputIds, (SideInputHandler)sideInputHandler));
        try {
            return StateRequestHandlers.forSideInputHandlerFactory((Map)ProcessBundleDescriptors.getSideInputs((ExecutableStage)executableStage), (StateRequestHandlers.SideInputHandlerFactory)sideInputHandlerFactory);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to initialize SideInputHandler", e);
        }
    }

    private static @UnknownKeyFor @NonNull @Initialized StateRequestHandler createUserStateRequestHandler(@UnknownKeyFor @NonNull @Initialized String transformId, @UnknownKeyFor @NonNull @Initialized ExecutableStage executableStage, @UnknownKeyFor @NonNull @Initialized TaskContext context, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions pipelineOptions, @UnknownKeyFor @NonNull @Initialized StageBundleFactory stageBundleFactory) {
        if (!StateUtils.isStateful(executableStage)) {
            return StateRequestHandler.unsupported();
        }
        SamzaStoreStateInternals.Factory stateInternalsFactory = SamzaStoreStateInternals.createStateInternalsFactory(transformId, ByteStringCoder.of(), context, pipelineOptions, executableStage);
        return StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)stageBundleFactory.getProcessBundleDescriptor(), new BagUserStateFactory(stateInternalsFactory));
    }

    static class BagUserStateFactory<@UnknownKeyFor K extends @UnknownKeyFor @NonNull @Initialized ByteString, @UnknownKeyFor V extends @UnknownKeyFor @NonNull @Initialized ByteString, @UnknownKeyFor W extends @UnknownKeyFor @NonNull @Initialized BoundedWindow>
    implements StateRequestHandlers.BagUserStateHandlerFactory<K, V, W> {
        private final @UnknownKeyFor @NonNull @Initialized SamzaStoreStateInternals.Factory<K> stateInternalsFactory;

        BagUserStateFactory(@UnknownKeyFor @NonNull @Initialized SamzaStoreStateInternals.Factory<K> stateInternalsFactory) {
            this.stateInternalsFactory = stateInternalsFactory;
        }

        public // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(@UnknownKeyFor @NonNull @Initialized String pTransformId, final @UnknownKeyFor @NonNull @Initialized String userStateId, @UnknownKeyFor @NonNull @Initialized Coder<K> keyCoder, final @UnknownKeyFor @NonNull @Initialized Coder<V> valueCoder, final @UnknownKeyFor @NonNull @Initialized Coder<W> windowCoder) {
            return new StateRequestHandlers.BagUserStateHandler<K, V, W>(){

                public @UnknownKeyFor @NonNull @Initialized Iterable<V> get(K key, W window) {
                    StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                    BagState bagState = (BagState)stateInternalsFactory.stateInternalsForKey(key).state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                    return bagState.read();
                }

                public void append(K key, W window, @UnknownKeyFor @NonNull @Initialized Iterator<V> values) {
                    StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                    BagState bagState = (BagState)stateInternalsFactory.stateInternalsForKey(key).state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                    while (values.hasNext()) {
                        bagState.add((Object)((ByteString)values.next()));
                    }
                }

                public void clear(K key, W window) {
                    StateNamespace namespace = StateNamespaces.window((Coder)windowCoder, window);
                    BagState bagState = (BagState)stateInternalsFactory.stateInternalsForKey(key).state(namespace, StateTags.bag((String)userStateId, (Coder)valueCoder));
                    bagState.clear();
                }
            };
        }
    }
}

