/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
import org.apache.beam.runners.samza.translation.ConfigContext;
import org.apache.beam.runners.samza.translation.TransformConfigGenerator;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;

public class ReadTranslator<T>
implements TransformTranslator<PTransform<PBegin, PCollection<T>>>,
TransformConfigGenerator<PTransform<PBegin, PCollection<T>>> {
    @Override
    public void translate(PTransform<PBegin, PCollection<T>> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        PCollection<T> output = ctx.getOutput(transform);
        ctx.registerInputMessageStream((PValue)output);
    }

    @Override
    public Map<String, String> createConfig(PTransform<PBegin, PCollection<T>> transform, TransformHierarchy.Node node, ConfigContext ctx) {
        String id = ctx.getOutputId(node);
        PCollection<T> output = ctx.getOutput(transform);
        Coder<WindowedValue<T>> coder = SamzaCoders.of(output);
        UnboundedSource source = transform instanceof Read.Unbounded ? ((Read.Unbounded)transform).getSource() : ((Read.Bounded)transform).getSource();
        HashMap<String, String> config = new HashMap<String, String>();
        String streamPrefix = "systems." + id;
        config.put(streamPrefix + ".source", Base64Serializer.serializeUnchecked((Serializable)source));
        config.put(streamPrefix + ".coder", Base64Serializer.serializeUnchecked(coder));
        config.put(streamPrefix + ".stepName", node.getFullName());
        config.put("streams." + id + ".samza.system", id);
        if (source instanceof BoundedSource) {
            config.put("streams." + id + ".samza.bounded", "true");
            config.put(streamPrefix + ".samza.factory", BoundedSourceSystem.Factory.class.getName());
        } else {
            config.put(streamPrefix + ".samza.factory", UnboundedSourceSystem.Factory.class.getName());
        }
        return config;
    }
}

