/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.util.HashIdGenerator;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.table.Table;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TranslationContext {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
    private final @UnknownKeyFor @NonNull @Initialized StreamApplicationDescriptor appDescriptor;
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized PValue, @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> messsageStreams = new HashMap();
    private final /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>, @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> viewStreams = new HashMap();
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized PValue, @UnknownKeyFor @NonNull @Initialized String> idMap;
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized MessageStream> registeredInputStreams = new HashMap<String, MessageStream>();
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Table> registeredTables = new HashMap<String, Table>();
    private final @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options;
    private final @UnknownKeyFor @NonNull @Initialized HashIdGenerator idGenerator = new HashIdGenerator();
    private /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized ?> currentTransform;

    public TranslationContext(@UnknownKeyFor @NonNull @Initialized StreamApplicationDescriptor appDescriptor, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized PValue, @UnknownKeyFor @NonNull @Initialized String> idMap, @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions options) {
        this.appDescriptor = appDescriptor;
        this.idMap = idMap;
        this.options = options;
    }

    public <OutT> void registerInputMessageStream(@UnknownKeyFor @NonNull @Initialized PValue pvalue, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized InputDescriptor<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized OpMessage<OutT>>, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> inputDescriptor) {
        this.registerInputMessageStreams(pvalue, Collections.singletonList(inputDescriptor));
    }

    public <OutT> void registerInputMessageStreams(@UnknownKeyFor @NonNull @Initialized PValue pvalue, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized InputDescriptor<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized OpMessage<OutT>>, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?>> inputDescriptors) {
        HashSet<Object> streamsToMerge = new HashSet<Object>();
        for (InputDescriptor<KV<?, OpMessage<OutT>>, ?> inputDescriptor : inputDescriptors) {
            String streamId = inputDescriptor.getStreamId();
            if (this.registeredInputStreams.containsKey(streamId)) {
                MessageStream messageStream = this.registeredInputStreams.get(streamId);
                LOG.info(String.format("Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.", streamId, messageStream, pvalue));
                streamsToMerge.add(messageStream);
                continue;
            }
            MessageStream typedStream = TranslationContext.getValueStream(this.appDescriptor.getInputStream(inputDescriptor));
            this.registeredInputStreams.put(streamId, typedStream);
            streamsToMerge.add(typedStream);
        }
        this.registerMessageStream(pvalue, MessageStream.mergeAll(streamsToMerge));
    }

    public <OutT> void registerMessageStream(@UnknownKeyFor @NonNull @Initialized PValue pvalue, @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>> stream) {
        if (this.messsageStreams.containsKey(pvalue)) {
            throw new IllegalArgumentException("Stream already registered for pvalue: " + pvalue);
        }
        this.messsageStreams.put(pvalue, stream);
    }

    public @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized OpMessage<@UnknownKeyFor @NonNull @Initialized String>> getDummyStream() {
        InputDescriptor<OpMessage<String>, ?> dummyInput = TranslationContext.createDummyStreamDescriptor(UUID.randomUUID().toString());
        return this.appDescriptor.getInputStream(dummyInput);
    }

    public <OutT> @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized OpMessage<OutT>> getMessageStream(@UnknownKeyFor @NonNull @Initialized PValue pvalue) {
        MessageStream<?> stream = this.messsageStreams.get(pvalue);
        if (stream == null) {
            throw new IllegalArgumentException("No stream registered for pvalue: " + pvalue);
        }
        return stream;
    }

    public <ElemT, ViewT> void registerViewStream(@UnknownKeyFor @NonNull @Initialized PCollectionView<ViewT> view, @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized OpMessage<@UnknownKeyFor @NonNull @Initialized Iterable<ElemT>>> stream) {
        if (this.viewStreams.containsKey(view)) {
            throw new IllegalArgumentException("Stream already registered for view: " + view);
        }
        this.viewStreams.put(view, stream);
    }

    public <InT> @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized OpMessage<InT>> getViewStream(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized PCollectionView<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> view) {
        MessageStream<?> stream = this.viewStreams.get(view);
        if (stream == null) {
            throw new IllegalArgumentException("No stream registered for view: " + view);
        }
        return stream;
    }

    public <ViewT> @UnknownKeyFor @NonNull @Initialized String getViewId(@UnknownKeyFor @NonNull @Initialized PCollectionView<ViewT> view) {
        return this.getIdForPValue((PValue)view);
    }

    public void setCurrentTransform(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized ?> currentTransform) {
        this.currentTransform = currentTransform;
    }

    public void clearCurrentTransform() {
        this.currentTransform = null;
    }

    public /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized ?> getCurrentTransform() {
        return this.currentTransform;
    }

    public <InT extends PValue> InT getInput(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized PTransform<InT, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> transform) {
        return (InT)((PValue)Iterables.getOnlyElement((Iterable)TransformInputs.nonAdditionalInputs(this.currentTransform)));
    }

    public <OutT extends PValue> OutT getOutput(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, OutT> transform) {
        return (OutT)((PValue)Iterables.getOnlyElement(this.currentTransform.getOutputs().values()));
    }

    public <OutT> @UnknownKeyFor @NonNull @Initialized TupleTag<OutT> getOutputTag(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized PCollection<OutT>> transform) {
        return (TupleTag)Iterables.getOnlyElement(this.currentTransform.getOutputs().keySet());
    }

    public @UnknownKeyFor @NonNull @Initialized SamzaPipelineOptions getPipelineOptions() {
        return this.options;
    }

    public <OutT> @UnknownKeyFor @NonNull @Initialized OutputStream<OutT> getOutputStream(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized OutputDescriptor<OutT, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> outputDescriptor) {
        return this.appDescriptor.getOutputStream(outputDescriptor);
    }

    public <K, V> @UnknownKeyFor @NonNull @Initialized Table<@UnknownKeyFor @NonNull @Initialized KV<K, V>> getTable(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized TableDescriptor<K, V, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> tableDesc) {
        return this.registeredTables.computeIfAbsent(tableDesc.getTableId(), id -> this.appDescriptor.getTable(tableDesc));
    }

    private static <T> @UnknownKeyFor @NonNull @Initialized MessageStream<T> getValueStream(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized MessageStream<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, T>> input) {
        return input.map(KV::getValue);
    }

    public @UnknownKeyFor @NonNull @Initialized String getIdForPValue(@UnknownKeyFor @NonNull @Initialized PValue pvalue) {
        String id = this.idMap.get(pvalue);
        if (id == null) {
            throw new IllegalArgumentException("No id mapping for value: " + pvalue);
        }
        return id;
    }

    public @UnknownKeyFor @NonNull @Initialized String getTransformFullName() {
        return this.currentTransform.getFullName();
    }

    public @UnknownKeyFor @NonNull @Initialized String getTransformId() {
        return this.idGenerator.getId(this.currentTransform.getFullName());
    }

    private static /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized InputDescriptor<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized OpMessage<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized String>, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> createDummyStreamDescriptor(@UnknownKeyFor @NonNull @Initialized String id) {
        GenericSystemDescriptor dummySystem = new GenericSystemDescriptor(id, InMemorySystemFactory.class.getName());
        GenericInputDescriptor dummyInput = dummySystem.getInputDescriptor(id, (Serde)new NoOpSerde());
        dummyInput.withOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
        MapConfig config = new MapConfig(new Map[]{dummyInput.toConfig(), dummySystem.toConfig()});
        InMemorySystemFactory factory = new InMemorySystemFactory();
        StreamSpec dummyStreamSpec = new StreamSpec(id, id, id, 1);
        factory.getAdmin(id, (Config)config).createStream(dummyStreamSpec);
        SystemProducer producer = factory.getProducer(id, (Config)config, null);
        SystemStream sysStream = new SystemStream(id, id);
        Consumer<Object> sendFn = msg -> producer.send(id, new OutgoingMessageEnvelope(sysStream, (Object)0, null, msg));
        WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)"dummy", (Instant)new Instant());
        sendFn.accept(OpMessage.ofElement(windowedValue));
        sendFn.accept(new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
        sendFn.accept(new EndOfStreamMessage(null));
        return dummyInput;
    }
}

