/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.TestStreamTranslation;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
import org.apache.beam.runners.samza.translation.SamzaTestStreamSystemFactory;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.operators.KV;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;

public class SamzaTestStreamTranslator<T>
implements TransformTranslator<TestStream<T>> {
    public static final String ENCODED_TEST_STREAM = "encodedTestStream";
    public static final String TEST_STREAM_DECODER = "testStreamDecoder";

    @Override
    public void translate(TestStream<T> testStream, TransformHierarchy.Node node, TranslationContext ctx) {
        String encodedTestStream;
        PCollection output = (PCollection)ctx.getOutput(testStream);
        String outputId = ctx.getIdForPValue((PValue)output);
        Coder valueCoder = testStream.getValueCoder();
        TestStream.TestStreamCoder testStreamCoder = TestStream.TestStreamCoder.of((Coder)valueCoder);
        try {
            encodedTestStream = CoderUtils.encodeToBase64((Coder)testStreamCoder, testStream);
        }
        catch (CoderException e) {
            throw new RuntimeException("Could not encode TestStream.", e);
        }
        SerializableFunction & Serializable testStreamDecoder = (SerializableFunction & Serializable)string -> {
            try {
                return (TestStream)CoderUtils.decodeFromBase64((Coder)TestStream.TestStreamCoder.of((Coder)valueCoder), (String)string);
            }
            catch (CoderException e) {
                throw new RuntimeException("Could not decode TestStream.", e);
            }
        };
        ctx.registerInputMessageStream((PValue)output, SamzaTestStreamTranslator.createInputDescriptor(outputId, encodedTestStream, testStreamDecoder));
    }

    @Override
    public void translatePortable(PipelineNode.PTransformNode transform, QueryablePipeline pipeline, PortableTranslationContext ctx) {
        ByteString bytes = transform.getTransform().getSpec().getPayload();
        SerializableFunction<String, TestStream<T>> testStreamDecoder = SamzaTestStreamTranslator.createTestStreamDecoder(pipeline.getComponents(), bytes);
        String outputId = ctx.getOutputId(transform);
        String escapedOutputId = SamzaPipelineTranslatorUtils.escape(outputId);
        ctx.registerInputMessageStream(outputId, SamzaTestStreamTranslator.createInputDescriptor(escapedOutputId, Base64Serializer.serializeUnchecked((Serializable)bytes), testStreamDecoder));
    }

    private static <T> GenericInputDescriptor<KV<?, OpMessage<T>>> createInputDescriptor(String id, String encodedTestStream, SerializableFunction<String, TestStream<T>> testStreamDecoder) {
        ImmutableMap systemConfig = ImmutableMap.of((Object)ENCODED_TEST_STREAM, (Object)encodedTestStream, (Object)TEST_STREAM_DECODER, (Object)Base64Serializer.serializeUnchecked(testStreamDecoder));
        GenericSystemDescriptor systemDescriptor = (GenericSystemDescriptor)new GenericSystemDescriptor(id, SamzaTestStreamSystemFactory.class.getName()).withSystemConfigs((Map)systemConfig);
        KVSerde kvSerde = KVSerde.of((Serde)new NoOpSerde(), (Serde)new NoOpSerde());
        return systemDescriptor.getInputDescriptor(id, (Serde)kvSerde);
    }

    private static <T> SerializableFunction<String, TestStream<T>> createTestStreamDecoder(RunnerApi.Components components, ByteString payload) {
        Coder coder;
        try {
            coder = RehydratedComponents.forComponents((RunnerApi.Components)components).getCoder(RunnerApi.TestStreamPayload.parseFrom((ByteString)payload).getCoderId());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return (SerializableFunction & Serializable)encodedTestStream -> {
            try {
                return TestStreamTranslation.testStreamFromProtoPayload((RunnerApi.TestStreamPayload)RunnerApi.TestStreamPayload.parseFrom((ByteString)((ByteString)Base64Serializer.deserializeUnchecked((String)encodedTestStream, ByteString.class))), (Coder)coder);
            }
            catch (IOException e) {
                throw new RuntimeException("Could not decode TestStream.", e);
            }
        };
    }
}

