/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation;

import java.io.IOException;
import java.io.Serializable;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.InMemoryBagUserStateFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.runners.fnexecution.translation.BatchSideInputHandlerFactory;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.metrics.MetricsContainerStepMapAccumulator;
import org.apache.beam.runners.spark.translation.SparkExecutableStageContextFactory;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

class SparkExecutableStageFunction<InputT, SideInputT>
implements FlatMapFunction<Iterator<WindowedValue<InputT>>, RawUnionValue> {
    private static final Logger LOG = LoggerFactory.getLogger(SparkExecutableStageFunction.class);
    private final RunnerApi.ExecutableStagePayload stagePayload;
    private final Map<String, Integer> outputMap;
    private final SparkExecutableStageContextFactory contextFactory;
    private final Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>> sideInputs;
    private final MetricsContainerStepMapAccumulator metricsAccumulator;
    private final Coder windowCoder;
    private final JobInfo jobInfo;
    private transient InMemoryBagUserStateFactory bagUserStateHandlerFactory;
    private transient Object currentTimerKey;

    SparkExecutableStageFunction(RunnerApi.ExecutableStagePayload stagePayload, JobInfo jobInfo, Map<String, Integer> outputMap, SparkExecutableStageContextFactory contextFactory, Map<String, Tuple2<Broadcast<List<byte[]>>, WindowedValue.WindowedValueCoder<SideInputT>>> sideInputs, MetricsContainerStepMapAccumulator metricsAccumulator, Coder windowCoder) {
        this.stagePayload = stagePayload;
        this.jobInfo = jobInfo;
        this.outputMap = outputMap;
        this.contextFactory = contextFactory;
        this.sideInputs = sideInputs;
        this.metricsAccumulator = metricsAccumulator;
        this.windowCoder = windowCoder;
    }

    FlatMapFunction<Tuple2<ByteArray, Iterable<WindowedValue<InputT>>>, RawUnionValue> forPair() {
        return (FlatMapFunction & Serializable)input -> this.call(((Iterable)input._2).iterator());
    }

    /*
     * Exception decompiling
     */
    public Iterator<RawUnionValue> call(Iterator<WindowedValue<InputT>> inputs) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void processElements(ExecutableStage executableStage, StateRequestHandler stateRequestHandler, ReceiverFactory receiverFactory, StageBundleFactory stageBundleFactory, Iterator<WindowedValue<InputT>> inputs) throws Exception {
        try (RemoteBundle bundle = stageBundleFactory.getBundle((OutputReceiverFactory)receiverFactory, stateRequestHandler, this.getBundleProgressHandler());){
            String inputPCollectionId = executableStage.getInputPCollection().getId();
            FnDataReceiver mainReceiver = (FnDataReceiver)bundle.getInputReceivers().get(inputPCollectionId);
            while (inputs.hasNext()) {
                WindowedValue<InputT> input = inputs.next();
                mainReceiver.accept(input);
            }
        }
    }

    private BundleProgressHandler getBundleProgressHandler() {
        String stageName = this.stagePayload.getInput();
        final MetricsContainerImpl container = this.metricsAccumulator.value().getContainer(stageName);
        return new BundleProgressHandler(){

            public void onProgress(BeamFnApi.ProcessBundleProgressResponse progress) {
                container.update((Iterable)progress.getMonitoringInfosList());
            }

            public void onCompleted(BeamFnApi.ProcessBundleResponse response) {
                container.update((Iterable)response.getMonitoringInfosList());
            }
        };
    }

    private StateRequestHandler getStateRequestHandler(ExecutableStage executableStage, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor) {
        StateRequestHandler userStateHandler;
        StateRequestHandler sideInputHandler;
        EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler> handlerMap = new EnumMap<BeamFnApi.StateKey.TypeCase, StateRequestHandler>(BeamFnApi.StateKey.TypeCase.class);
        BatchSideInputHandlerFactory sideInputHandlerFactory = BatchSideInputHandlerFactory.forStage((ExecutableStage)executableStage, (BatchSideInputHandlerFactory.SideInputGetter)new BatchSideInputHandlerFactory.SideInputGetter(){

            public <T> List<T> getSideInput(String pCollectionId) {
                Tuple2 tuple2 = (Tuple2)SparkExecutableStageFunction.this.sideInputs.get(pCollectionId);
                Broadcast broadcast = (Broadcast)tuple2._1;
                WindowedValue.WindowedValueCoder coder = (WindowedValue.WindowedValueCoder)tuple2._2;
                return ((List)broadcast.value()).stream().map(bytes -> (WindowedValue)CoderHelpers.fromByteArray(bytes, coder)).collect(Collectors.toList());
            }
        });
        try {
            sideInputHandler = StateRequestHandlers.forSideInputHandlerFactory((Map)ProcessBundleDescriptors.getSideInputs((ExecutableStage)executableStage), (StateRequestHandlers.SideInputHandlerFactory)sideInputHandlerFactory);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to setup state handler", e);
        }
        if (this.bagUserStateHandlerFactory == null) {
            this.bagUserStateHandlerFactory = new InMemoryBagUserStateFactory();
        }
        if (executableStage.getUserStates().size() > 0) {
            this.bagUserStateHandlerFactory.resetForNewKey();
            userStateHandler = StateRequestHandlers.forBagUserStateHandlerFactory((ProcessBundleDescriptors.ExecutableProcessBundleDescriptor)processBundleDescriptor, (StateRequestHandlers.BagUserStateHandlerFactory)this.bagUserStateHandlerFactory);
        } else {
            userStateHandler = StateRequestHandler.unsupported();
        }
        handlerMap.put(BeamFnApi.StateKey.TypeCase.MULTIMAP_SIDE_INPUT, sideInputHandler);
        handlerMap.put(BeamFnApi.StateKey.TypeCase.BAG_USER_STATE, userStateHandler);
        return StateRequestHandlers.delegateBasedUponType(handlerMap);
    }

    private static /* synthetic */ void lambda$call$1(RemoteBundle bundle, String timerId, WindowedValue timerValue) {
        FnDataReceiver fnTimerReceiver = (FnDataReceiver)bundle.getInputReceivers().get(timerId);
        Preconditions.checkNotNull((Object)fnTimerReceiver, (String)"No FnDataReceiver found for %s", (Object)timerId);
        try {
            fnTimerReceiver.accept((Object)timerValue);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
        }
    }

    private /* synthetic */ void lambda$call$0(InMemoryTimerInternals timerInternals, WindowedValue timerElement, TimerInternals.TimerData timerData) {
        this.currentTimerKey = ((KV)timerElement.getValue()).getKey();
        timerInternals.setTimer(timerData);
    }

    private static class ReceiverFactory
    implements OutputReceiverFactory {
        private final ConcurrentLinkedQueue<RawUnionValue> collector;
        private final Map<String, Integer> outputMap;
        @Nullable
        private final TimerReceiverFactory timerReceiverFactory;

        ReceiverFactory(ConcurrentLinkedQueue<RawUnionValue> collector, Map<String, Integer> outputMap) {
            this(collector, outputMap, null);
        }

        ReceiverFactory(ConcurrentLinkedQueue<RawUnionValue> collector, Map<String, Integer> outputMap, @Nullable TimerReceiverFactory timerReceiverFactory) {
            this.collector = collector;
            this.outputMap = outputMap;
            this.timerReceiverFactory = timerReceiverFactory;
        }

        public <OutputT> FnDataReceiver<OutputT> create(String pCollectionId) {
            Integer unionTag = this.outputMap.get(pCollectionId);
            if (unionTag != null) {
                int tagInt = unionTag;
                return receivedElement -> this.collector.add(new RawUnionValue(tagInt, receivedElement));
            }
            if (this.timerReceiverFactory != null) {
                return this.timerReceiverFactory.create(pCollectionId);
            }
            throw new IllegalStateException(String.format(Locale.ENGLISH, "Unknown PCollectionId %s", pCollectionId));
        }
    }

    static interface JobBundleFactoryCreator
    extends Serializable {
        public JobBundleFactory create();
    }
}

