/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TranslationContext {
    public static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
    private final StreamGraph streamGraph;
    private final Map<PValue, MessageStream<?>> messsageStreams = new HashMap();
    private final Map<PCollectionView<?>, MessageStream<?>> viewStreams = new HashMap();
    private final Map<PValue, String> idMap;
    private final Map<String, MessageStream> registeredInputStreams = new HashMap<String, MessageStream>();
    private final Map<String, Table> registeredTables = new HashMap<String, Table>();
    private final PValue dummySource;
    private final SamzaPipelineOptions options;
    private AppliedPTransform<?, ?, ?> currentTransform;
    private int topologicalId;

    public TranslationContext(StreamGraph streamGraph, Map<PValue, String> idMap, SamzaPipelineOptions options, PValue dummySource) {
        this.streamGraph = streamGraph;
        this.idMap = idMap;
        this.options = options;
        this.dummySource = dummySource;
    }

    public <OutT> void registerInputMessageStream(PValue pvalue) {
        if (!pvalue.equals(this.dummySource)) {
            this.doRegisterInputMessageStream(pvalue, this.getIdForPValue(pvalue));
        }
    }

    public <OutT> void registerInputMessageStreamById(PValue pvalue, String streamId) {
        if (!pvalue.equals(this.dummySource)) {
            this.doRegisterInputMessageStream(pvalue, streamId);
        }
    }

    public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
        if (this.messsageStreams.containsKey(pvalue)) {
            throw new IllegalArgumentException("Stream already registered for pvalue: " + pvalue);
        }
        this.messsageStreams.put(pvalue, stream);
    }

    public MessageStream<OpMessage<String>> getDummyStream() {
        if (!this.messsageStreams.containsKey(this.dummySource)) {
            this.doRegisterInputMessageStream(this.dummySource, this.getIdForPValue(this.dummySource));
        }
        return this.getMessageStream(this.dummySource);
    }

    public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue) {
        MessageStream<?> stream = this.messsageStreams.get(pvalue);
        if (stream == null) {
            throw new IllegalArgumentException("No stream registered for pvalue: " + pvalue);
        }
        return stream;
    }

    public <ElemT, ViewT> void registerViewStream(PCollectionView<ViewT> view, MessageStream<OpMessage<Iterable<ElemT>>> stream) {
        if (this.viewStreams.containsKey(view)) {
            throw new IllegalArgumentException("Stream already registered for view: " + view);
        }
        this.viewStreams.put(view, stream);
    }

    public <InT> MessageStream<OpMessage<InT>> getViewStream(PCollectionView<?> view) {
        MessageStream<?> stream = this.viewStreams.get(view);
        if (stream == null) {
            throw new IllegalArgumentException("No stream registered for view: " + view);
        }
        return stream;
    }

    public <ViewT> String getViewId(PCollectionView<ViewT> view) {
        return this.getIdForPValue((PValue)view);
    }

    public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
        this.currentTransform = currentTransform;
    }

    public void clearCurrentTransform() {
        this.currentTransform = null;
    }

    public AppliedPTransform<?, ?, ?> getCurrentTransform() {
        return this.currentTransform;
    }

    public void setCurrentTopologicalId(int id) {
        this.topologicalId = id;
    }

    public int getCurrentTopologicalId() {
        return this.topologicalId;
    }

    public <InT extends PValue> InT getInput(PTransform<InT, ?> transform) {
        return (InT)((PValue)Iterables.getOnlyElement((Iterable)TransformInputs.nonAdditionalInputs(this.currentTransform)));
    }

    public <OutT extends PValue> OutT getOutput(PTransform<?, OutT> transform) {
        return (OutT)((PValue)Iterables.getOnlyElement(this.currentTransform.getOutputs().values()));
    }

    public <OutT> TupleTag<OutT> getOutputTag(PTransform<?, ? extends PCollection<OutT>> transform) {
        return (TupleTag)Iterables.getOnlyElement(this.currentTransform.getOutputs().keySet());
    }

    public SamzaPipelineOptions getPipelineOptions() {
        return this.options;
    }

    public <OutT> OutputStream<OutT> getOutputStreamById(String outputStreamId) {
        return this.streamGraph.getOutputStream(outputStreamId);
    }

    public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
        return this.registeredTables.computeIfAbsent(tableDesc.getTableId(), id -> this.streamGraph.getTable(tableDesc));
    }

    private <OutT> void doRegisterInputMessageStream(PValue pvalue, String streamId) {
        if (this.registeredInputStreams.containsKey(streamId)) {
            MessageStream messageStream = this.registeredInputStreams.get(streamId);
            LOG.info(String.format("Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.", streamId, messageStream, pvalue));
            this.registerMessageStream(pvalue, messageStream);
            return;
        }
        MessageStream typedStream = this.streamGraph.getInputStream(streamId).map(KV::getValue);
        this.registerMessageStream(pvalue, typedStream);
        this.registeredInputStreams.put(streamId, typedStream);
    }

    private String getIdForPValue(PValue pvalue) {
        String id = this.idMap.get(pvalue);
        if (id == null) {
            throw new IllegalArgumentException("No id mapping for value: " + pvalue);
        }
        return id;
    }
}

