/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.RunnerPCollectionView;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.runtime.SamzaDoFnInvokerRegistrar;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
import org.apache.beam.runners.samza.translation.ConfigContext;
import org.apache.beam.runners.samza.translation.GroupByKeyTranslator;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
import org.apache.beam.runners.samza.translation.SamzaPublishViewTranslator;
import org.apache.beam.runners.samza.translation.TransformConfigGenerator;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils;
import org.apache.beam.runners.samza.util.StateUtils;
import org.apache.beam.runners.samza.util.WindowUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.joda.time.Instant;

class ParDoBoundMultiTranslator<InT, OutT>
implements TransformTranslator<ParDo.MultiOutput<InT, OutT>>,
TransformConfigGenerator<ParDo.MultiOutput<InT, OutT>> {
    private final SamzaDoFnInvokerRegistrar doFnInvokerRegistrar;
    private static final ViewFn<Iterable<WindowedValue<?>>, ?> VIEW_FN = new PCollectionViews.MultimapViewFn((PCollectionViews.TypeDescriptorSupplier & Serializable)() -> TypeDescriptors.iterables((TypeDescriptor)new TypeDescriptor<WindowedValue<Void>>(){}), TypeDescriptors::voids);

    ParDoBoundMultiTranslator() {
        Iterator<SamzaDoFnInvokerRegistrar> invokerReg = ServiceLoader.load(SamzaDoFnInvokerRegistrar.class).iterator();
        this.doFnInvokerRegistrar = invokerReg.hasNext() ? (SamzaDoFnInvokerRegistrar)Iterators.getOnlyElement(invokerReg) : null;
    }

    @Override
    public void translate(ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        ParDoBoundMultiTranslator.doTranslate(transform, node, ctx);
    }

    private static <InT, OutT> void doTranslate(ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        MessageStream mergedStreams;
        Coder keyCoder;
        PCollection input = (PCollection)ctx.getInput(transform);
        Map<TupleTag<?>, Coder<?>> outputCoders = ctx.getCurrentTransform().getOutputs().entrySet().stream().filter(e -> e.getValue() instanceof PCollection).collect(Collectors.toMap(e -> (TupleTag)e.getKey(), e -> ((PCollection)e.getValue()).getCoder()));
        Coder coder = keyCoder = StateUtils.isStateful(transform.getFn()) ? ((KvCoder)input.getCoder()).getKeyCoder() : null;
        if (DoFnSignatures.isSplittable((DoFn)transform.getFn())) {
            throw new UnsupportedOperationException("Splittable DoFn is not currently supported");
        }
        if (DoFnSignatures.requiresTimeSortedInput((DoFn)transform.getFn())) {
            throw new UnsupportedOperationException("@RequiresTimeSortedInput annotation is not currently supported");
        }
        MessageStream inputStream = ctx.getMessageStream((PValue)input);
        List sideInputStreams = transform.getSideInputs().values().stream().map(ctx::getViewStream).collect(Collectors.toList());
        ArrayList outputs = new ArrayList(node.getOutputs().entrySet());
        HashMap tagToIndexMap = new HashMap();
        HashMap<Integer, PCollection> indexToPCollectionMap = new HashMap<Integer, PCollection>();
        for (int index = 0; index < outputs.size(); ++index) {
            Map.Entry taggedOutput = outputs.get(index);
            tagToIndexMap.put((TupleTag)taggedOutput.getKey(), index);
            if (!(taggedOutput.getValue() instanceof PCollection)) {
                throw new IllegalArgumentException("Expected side output to be PCollection, but was: " + taggedOutput.getValue());
            }
            PCollection sideOutputCollection = (PCollection)taggedOutput.getValue();
            indexToPCollectionMap.put(index, sideOutputCollection);
        }
        HashMap idToPValueMap = new HashMap();
        for (PCollectionView view : transform.getSideInputs().values()) {
            idToPValueMap.put(ctx.getViewId(view), view);
        }
        DoFnSchemaInformation doFnSchemaInformation = ParDoTranslation.getSchemaInformation(ctx.getCurrentTransform());
        Map sideInputMapping = ParDoTranslation.getSideInputMapping(ctx.getCurrentTransform());
        DoFnOp op = new DoFnOp(transform.getMainOutputTag(), transform.getFn(), keyCoder, input.getCoder(), null, outputCoders, transform.getSideInputs().values(), transform.getAdditionalOutputTags().getAll(), input.getWindowingStrategy(), idToPValueMap, new DoFnOp.MultiOutputManagerFactory(tagToIndexMap), ctx.getTransformFullName(), ctx.getTransformId(), input.isBounded(), false, null, null, Collections.emptyMap(), doFnSchemaInformation, sideInputMapping);
        if (sideInputStreams.isEmpty()) {
            mergedStreams = inputStream;
        } else {
            MessageStream mergedSideInputStreams = MessageStream.mergeAll(sideInputStreams).flatMap(new SideInputWatermarkFn());
            mergedStreams = inputStream.merge(Collections.singletonList(mergedSideInputStreams));
        }
        MessageStream taggedOutputStream = mergedStreams.flatMapAsync(OpAdapter.adapt(op));
        Iterator iterator = tagToIndexMap.values().iterator();
        while (iterator.hasNext()) {
            int outputIndex = (Integer)iterator.next();
            MessageStream outputStream = taggedOutputStream.filter((FilterFunction & Serializable)message -> message.getType() != OpMessage.Type.ELEMENT || ((RawUnionValue)message.getElement().getValue()).getUnionTag() == outputIndex).flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
            ctx.registerMessageStream((PValue)indexToPCollectionMap.get(outputIndex), outputStream);
        }
    }

    @Override
    public void translatePortable(PipelineNode.PTransformNode transform, QueryablePipeline pipeline, PortableTranslationContext ctx) {
        ParDoBoundMultiTranslator.doTranslatePortable(transform, pipeline, ctx);
    }

    private static <InT, OutT> void doTranslatePortable(PipelineNode.PTransformNode transform, QueryablePipeline pipeline, PortableTranslationContext ctx) {
        MessageStream mergedStreams;
        RunnerApi.ExecutableStagePayload stagePayload;
        Map outputs = transform.getTransform().getOutputsMap();
        try {
            stagePayload = RunnerApi.ExecutableStagePayload.parseFrom((ByteString)transform.getTransform().getSpec().getPayload());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        String inputId = stagePayload.getInput();
        MessageStream inputStream = ctx.getMessageStreamById(inputId);
        ArrayList sideInputStreams = new ArrayList();
        HashMap sideInputMapping = new HashMap();
        HashMap idToViewMapping = new HashMap();
        RunnerApi.Components components = stagePayload.getComponents();
        for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId : stagePayload.getSideInputsList()) {
            String sideInputCollectionId = components.getTransformsOrThrow(sideInputId.getTransformId()).getInputsOrThrow(sideInputId.getLocalName());
            WindowingStrategy<?, BoundedWindow> windowingStrategy = WindowUtils.getWindowStrategy(sideInputCollectionId, components);
            WindowedValue.WindowedValueCoder coder = (WindowedValue.WindowedValueCoder)PipelineTranslatorUtils.instantiateCoder((String)sideInputCollectionId, (RunnerApi.Components)components);
            PCollectionView<?> view = ParDoBoundMultiTranslator.createPCollectionView(sideInputId, coder, windowingStrategy);
            MessageStream<OpMessage<Iterable<?>>> broadcastSideInput = ParDoBoundMultiTranslator.groupAndBroadcastSideInput(sideInputId, sideInputCollectionId, components.getPcollectionsOrThrow(sideInputCollectionId), windowingStrategy, coder, ctx);
            sideInputStreams.add(broadcastSideInput);
            sideInputMapping.put(sideInputId, view);
            idToViewMapping.put(ParDoBoundMultiTranslator.getSideInputUniqueId(sideInputId), view);
        }
        HashMap tagToIndexMap = new HashMap();
        HashMap indexToIdMap = new HashMap();
        HashMap idToTupleTagMap = new HashMap();
        TupleTag mainOutputTag = outputs.isEmpty() ? null : new TupleTag((String)outputs.keySet().iterator().next());
        AtomicInteger index = new AtomicInteger(0);
        outputs.keySet().iterator().forEachRemaining(outputName -> {
            TupleTag tupleTag = new TupleTag(outputName);
            tagToIndexMap.put(tupleTag, index.get());
            String collectionId = (String)outputs.get(outputName);
            indexToIdMap.put(index.get(), collectionId);
            idToTupleTagMap.put(collectionId, tupleTag);
            index.incrementAndGet();
        });
        WindowedValue.WindowedValueCoder windowedInputCoder = WindowUtils.instantiateWindowedCoder(inputId, pipeline.getComponents());
        DoFnSchemaInformation doFnSchemaInformation = null;
        RunnerApi.PCollection input = pipeline.getComponents().getPcollectionsOrThrow(inputId);
        PCollection.IsBounded isBounded = SamzaPipelineTranslatorUtils.isBounded(input);
        Coder keyCoder = StateUtils.isStateful(stagePayload) ? ((KvCoder)((WindowedValue.FullWindowedValueCoder)windowedInputCoder).getValueCoder()).getKeyCoder() : null;
        DoFnOp op = new DoFnOp(mainOutputTag, new NoOpDoFn(), (Coder<?>)keyCoder, windowedInputCoder.getValueCoder(), windowedInputCoder, Collections.emptyMap(), new ArrayList(sideInputMapping.values()), new ArrayList(idToTupleTagMap.values()), (WindowingStrategy)WindowUtils.getWindowStrategy(inputId, stagePayload.getComponents()), idToViewMapping, new DoFnOp.MultiOutputManagerFactory(tagToIndexMap), ctx.getTransformFullName(), ctx.getTransformId(), isBounded, true, stagePayload, ctx.getJobInfo(), idToTupleTagMap, doFnSchemaInformation, sideInputMapping);
        if (sideInputStreams.isEmpty()) {
            mergedStreams = inputStream;
        } else {
            MessageStream mergedSideInputStreams = MessageStream.mergeAll(sideInputStreams).flatMap(new SideInputWatermarkFn());
            mergedStreams = inputStream.merge(Collections.singletonList(mergedSideInputStreams));
        }
        MessageStream taggedOutputStream = mergedStreams.flatMapAsync(OpAdapter.adapt(op));
        Iterator iterator = tagToIndexMap.values().iterator();
        while (iterator.hasNext()) {
            int outputIndex = (Integer)iterator.next();
            MessageStream outputStream = taggedOutputStream.filter((FilterFunction & Serializable)message -> message.getType() != OpMessage.Type.ELEMENT || ((RawUnionValue)message.getElement().getValue()).getUnionTag() == outputIndex).flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
            ctx.registerMessageStream((String)indexToIdMap.get(outputIndex), outputStream);
        }
    }

    @Override
    public Map<String, String> createConfig(ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, ConfigContext ctx) {
        HashMap<String, String> config = new HashMap<String, String>();
        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
        SamzaPipelineOptions options = ctx.getPipelineOptions();
        if (signature.processElement().observesWindow()) {
            config.putAll(ConfigBuilder.createRocksDBStoreConfig(options));
        }
        if (signature.usesState()) {
            for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
                String storeId = state.id();
                if (!ctx.addStateId(storeId)) {
                    throw new IllegalStateException("Duplicate StateId " + storeId + " found in multiple ParDo.");
                }
                config.put("stores." + storeId + ".factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
                config.put("stores." + storeId + ".key.serde", "byteArraySerde");
                config.put("stores." + storeId + ".msg.serde", "stateValueSerde");
                config.put("stores." + storeId + ".rocksdb.compression", "lz4");
                if (!options.getStateDurable().booleanValue()) continue;
                config.put("stores." + storeId + ".changelog", ConfigBuilder.getChangelogTopic(options, storeId));
            }
        }
        if (this.doFnInvokerRegistrar != null) {
            config.putAll(this.doFnInvokerRegistrar.configFor(transform.getFn()));
        }
        return config;
    }

    @Override
    public Map<String, String> createPortableConfig(PipelineNode.PTransformNode transform, SamzaPipelineOptions options) {
        RunnerApi.ExecutableStagePayload stagePayload;
        try {
            stagePayload = RunnerApi.ExecutableStagePayload.parseFrom((ByteString)transform.getTransform().getSpec().getPayload());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        if (!StateUtils.isStateful(stagePayload)) {
            return Collections.emptyMap();
        }
        HashMap<String, String> config = new HashMap<String, String>(ConfigBuilder.createRocksDBStoreConfig(options));
        for (RunnerApi.ExecutableStagePayload.UserStateId stateId : stagePayload.getUserStatesList()) {
            String storeId = stateId.getLocalName();
            config.put("stores." + storeId + ".factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
            config.put("stores." + storeId + ".key.serde", "byteArraySerde");
            config.put("stores." + storeId + ".msg.serde", "stateValueSerde");
            config.put("stores." + storeId + ".rocksdb.compression", "lz4");
            if (!options.getStateDurable().booleanValue()) continue;
            config.put("stores." + storeId + ".changelog", ConfigBuilder.getChangelogTopic(options, storeId));
        }
        return config;
    }

    private static PCollectionView<?> createPCollectionView(RunnerApi.ExecutableStagePayload.SideInputId sideInputId, WindowedValue.WindowedValueCoder<?> coder, WindowingStrategy<?, BoundedWindow> windowingStrategy) {
        return new RunnerPCollectionView(null, new TupleTag(sideInputId.getLocalName()), VIEW_FN, windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), windowingStrategy, coder.getValueCoder());
    }

    private static <SideInputT> MessageStream<OpMessage<Iterable<SideInputT>>> groupAndBroadcastSideInput(RunnerApi.ExecutableStagePayload.SideInputId sideInputId, String sideInputCollectionId, RunnerApi.PCollection sideInputPCollection, WindowingStrategy<SideInputT, BoundedWindow> windowingStrategy, WindowedValue.WindowedValueCoder<SideInputT> coder, PortableTranslationContext ctx) {
        MessageStream sideInput = ctx.getMessageStreamById(sideInputCollectionId);
        MessageStream keyedSideInput = sideInput.map((MapFunction & Serializable)opMessage -> {
            WindowedValue wv = opMessage.getElement();
            return OpMessage.ofElement(wv.withValue((Object)KV.of(null, (Object)wv.getValue())));
        });
        WindowedValue.WindowedValueCoder kvCoder = coder.withValueCoder((Coder)KvCoder.of((Coder)VoidCoder.of(), (Coder)coder.getValueCoder()));
        MessageStream groupedSideInput = GroupByKeyTranslator.doTranslatePortable(sideInputPCollection, keyedSideInput, windowingStrategy, kvCoder, new TupleTag("main output"), ctx);
        MessageStream nonkeyGroupedSideInput = groupedSideInput.map((MapFunction & Serializable)opMessage -> {
            WindowedValue wv = opMessage.getElement();
            return OpMessage.ofElement(wv.withValue((Object)((Iterable)((KV)wv.getValue()).getValue())));
        });
        MessageStream broadcastSideInput = SamzaPublishViewTranslator.doTranslate(nonkeyGroupedSideInput, coder.withValueCoder((Coder)IterableCoder.of((Coder)coder.getValueCoder())), ctx.getTransformId(), ParDoBoundMultiTranslator.getSideInputUniqueId(sideInputId), ctx.getSamzaPipelineOptions());
        return broadcastSideInput;
    }

    private static String getSideInputUniqueId(RunnerApi.ExecutableStagePayload.SideInputId sideInputId) {
        return sideInputId.getTransformId() + "-" + sideInputId.getLocalName();
    }

    private static class NoOpDoFn<InT, OutT>
    extends DoFn<InT, OutT> {
        private NoOpDoFn() {
        }

        @DoFn.ProcessElement
        public void doNothing(DoFn.ProcessContext context) {
        }
    }

    static class RawUnionValueToValue<OutT>
    implements Op<RawUnionValue, OutT, Void> {
        RawUnionValueToValue() {
        }

        @Override
        public void processElement(WindowedValue<RawUnionValue> inputElement, OpEmitter<OutT> emitter) {
            Object value = ((RawUnionValue)inputElement.getValue()).getValue();
            emitter.emitElement(inputElement.withValue(value));
        }
    }

    static class SideInputWatermarkFn<InT>
    implements FlatMapFunction<OpMessage<InT>, OpMessage<InT>>,
    WatermarkFunction<OpMessage<InT>> {
        SideInputWatermarkFn() {
        }

        public Collection<OpMessage<InT>> apply(OpMessage<InT> message) {
            return Collections.singletonList(message);
        }

        public Collection<OpMessage<InT>> processWatermark(long watermark) {
            return Collections.singletonList(OpMessage.ofSideInputWatermark(new Instant(watermark)));
        }

        public Long getOutputWatermark() {
            return Long.MAX_VALUE;
        }
    }
}

