/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.Op;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.runtime.OpEmitter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.runtime.SamzaDoFnInvokerRegistrar;
import org.apache.beam.runners.samza.translation.ConfigContext;
import org.apache.beam.runners.samza.translation.TransformConfigGenerator;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.joda.time.Instant;

class ParDoBoundMultiTranslator<InT, OutT>
implements TransformTranslator<ParDo.MultiOutput<InT, OutT>>,
TransformConfigGenerator<ParDo.MultiOutput<InT, OutT>> {
    private final SamzaDoFnInvokerRegistrar doFnInvokerRegistrar;

    ParDoBoundMultiTranslator() {
        Iterator<SamzaDoFnInvokerRegistrar> invokerReg = ServiceLoader.load(SamzaDoFnInvokerRegistrar.class).iterator();
        this.doFnInvokerRegistrar = invokerReg.hasNext() ? (SamzaDoFnInvokerRegistrar)Iterators.getOnlyElement(invokerReg) : null;
    }

    @Override
    public void translate(ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        MessageStream mergedStreams;
        Coder keyCoder;
        PCollection input = (PCollection)ctx.getInput(transform);
        Map<TupleTag<?>, Coder<?>> outputCoders = ctx.getCurrentTransform().getOutputs().entrySet().stream().filter(e -> e.getValue() instanceof PCollection).collect(Collectors.toMap(e -> (TupleTag)e.getKey(), e -> ((PCollection)e.getValue()).getCoder()));
        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
        Coder coder = keyCoder = signature.usesState() ? ((KvCoder)input.getCoder()).getKeyCoder() : null;
        if (signature.usesTimers()) {
            throw new UnsupportedOperationException("DoFn with timers is not currently supported");
        }
        if (signature.processElement().isSplittable()) {
            throw new UnsupportedOperationException("Splittable DoFn is not currently supported");
        }
        MessageStream inputStream = ctx.getMessageStream((PValue)input);
        List sideInputStreams = transform.getSideInputs().stream().map(ctx::getViewStream).collect(Collectors.toList());
        HashMap tagToIdMap = new HashMap();
        HashMap<Integer, PCollection> idToPCollectionMap = new HashMap<Integer, PCollection>();
        ArrayList outputs = new ArrayList(node.getOutputs().entrySet());
        for (int id = 0; id < outputs.size(); ++id) {
            Map.Entry taggedOutput = outputs.get(id);
            tagToIdMap.put((TupleTag)taggedOutput.getKey(), id);
            if (!(taggedOutput.getValue() instanceof PCollection)) {
                throw new IllegalArgumentException("Expected side output to be PCollection, but was: " + taggedOutput.getValue());
            }
            PCollection sideOutputCollection = (PCollection)taggedOutput.getValue();
            idToPCollectionMap.put(id, sideOutputCollection);
        }
        HashMap idToPValueMap = new HashMap();
        for (PCollectionView view : transform.getSideInputs()) {
            idToPValueMap.put(ctx.getViewId(view), view);
        }
        DoFnOp op = new DoFnOp(transform.getMainOutputTag(), transform.getFn(), keyCoder, input.getCoder(), outputCoders, transform.getSideInputs(), transform.getAdditionalOutputTags().getAll(), input.getWindowingStrategy(), idToPValueMap, new DoFnOp.MultiOutputManagerFactory(tagToIdMap), node.getFullName());
        if (sideInputStreams.isEmpty()) {
            mergedStreams = inputStream;
        } else {
            MessageStream mergedSideInputStreams = MessageStream.mergeAll(sideInputStreams).flatMap((FlatMapFunction)new SideInputWatermarkFn());
            mergedStreams = inputStream.merge(Collections.singletonList(mergedSideInputStreams));
        }
        MessageStream taggedOutputStream = mergedStreams.flatMap(OpAdapter.adapt(op));
        Iterator iterator = tagToIdMap.values().iterator();
        while (iterator.hasNext()) {
            int outputIndex = (Integer)iterator.next();
            this.registerSideOutputStream((MessageStream<OpMessage<RawUnionValue>>)taggedOutputStream, (PValue)idToPCollectionMap.get(outputIndex), outputIndex, ctx);
        }
    }

    @Override
    public Map<String, String> createConfig(ParDo.MultiOutput<InT, OutT> transform, TransformHierarchy.Node node, ConfigContext ctx) {
        HashMap<String, String> config = new HashMap<String, String>();
        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
        if (signature.usesState()) {
            for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
                String storeId = state.id();
                config.put("stores." + storeId + ".factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
                config.put("stores." + storeId + ".key.serde", "byteSerde");
                config.put("stores." + storeId + ".msg.serde", "byteSerde");
            }
        }
        if (this.doFnInvokerRegistrar != null) {
            config.putAll(this.doFnInvokerRegistrar.configFor(transform.getFn()));
        }
        return config;
    }

    private <T> void registerSideOutputStream(MessageStream<OpMessage<RawUnionValue>> inputStream, PValue outputPValue, int outputIndex, TranslationContext ctx) {
        MessageStream outputStream = inputStream.filter((FilterFunction)new FilterByUnionId(outputIndex)).flatMap(OpAdapter.adapt(new RawUnionValueToValue()));
        ctx.registerMessageStream(outputPValue, outputStream);
    }

    private static class RawUnionValueToValue<OutT>
    implements Op<RawUnionValue, OutT, Void> {
        private RawUnionValueToValue() {
        }

        @Override
        public void processElement(WindowedValue<RawUnionValue> inputElement, OpEmitter<OutT> emitter) {
            Object value = ((RawUnionValue)inputElement.getValue()).getValue();
            emitter.emitElement(inputElement.withValue(value));
        }
    }

    private static class FilterByUnionId
    implements FilterFunction<OpMessage<RawUnionValue>> {
        private final int id;

        public FilterByUnionId(int id) {
            this.id = id;
        }

        public boolean apply(OpMessage<RawUnionValue> message) {
            return message.getType() != OpMessage.Type.ELEMENT || ((RawUnionValue)message.getElement().getValue()).getUnionTag() == this.id;
        }
    }

    private class SideInputWatermarkFn
    implements FlatMapFunction<OpMessage<InT>, OpMessage<InT>>,
    WatermarkFunction<OpMessage<InT>> {
        private SideInputWatermarkFn() {
        }

        public Collection<OpMessage<InT>> apply(OpMessage<InT> message) {
            return Collections.singletonList(message);
        }

        public Collection<OpMessage<InT>> processWatermark(long watermark) {
            return Collections.singletonList(OpMessage.ofSideInputWatermark(new Instant(watermark)));
        }

        public Long getOutputWatermark() {
            return Long.MAX_VALUE;
        }
    }
}

