/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.Serializable;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.repackaged.beam_runners_samza.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_samza.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_samza.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_samza.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
import org.apache.beam.runners.samza.util.Base64Serializer;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.factories.PropertiesConfigFactory;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.ByteSerdeFactory;
import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;

public class ConfigBuilder
extends Pipeline.PipelineVisitor.Defaults {
    private static final String APP_RUNNER_CLASS = "app.runner.class";
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Map<PValue, String> idMap;
    private final Map<String, String> config = new HashMap<String, String>();
    private final Pipeline pipeline;
    private boolean foundSource = false;

    public static Config buildConfig(Pipeline pipeline, SamzaPipelineOptions options, Map<PValue, String> idMap) {
        try {
            ConfigBuilder builder = new ConfigBuilder(idMap, pipeline);
            pipeline.traverseTopologically((Pipeline.PipelineVisitor)builder);
            builder.checkFoundSource();
            HashMap<String, String> config = new HashMap<String, String>(builder.getConfig());
            ConfigBuilder.createConfigForSystemStore(config);
            config.putAll(ConfigBuilder.createUserConfig(options));
            config.put(JobConfig.JOB_NAME(), options.getJobName());
            config.put("beamPipelineOptions", Base64Serializer.serializeUnchecked((Serializable)new SerializablePipelineOptions((PipelineOptions)options)));
            config.put("app.run.id", String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8));
            return new MapConfig(config);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Map<String, String> createUserConfig(SamzaPipelineOptions options) {
        String configFilePath = options.getConfigFilePath();
        HashMap<String, String> config = new HashMap<String, String>();
        if (StringUtils.isNoneEmpty((CharSequence[])new CharSequence[]{configFilePath})) {
            File configFile = new File(configFilePath);
            Preconditions.checkArgument(configFile.exists(), "Config file %s does not exist", (Object)configFilePath);
            PropertiesConfigFactory configFactory = new PropertiesConfigFactory();
            URI configUri = configFile.toURI();
            config.putAll((Map<String, String>)configFactory.getConfig(configUri));
        }
        if (options.getConfigOverride() != null) {
            config.putAll(options.getConfigOverride());
        }
        if (config.isEmpty()) {
            config.putAll(ConfigBuilder.localRunConfig());
        }
        return config;
    }

    @VisibleForTesting
    public static Map<String, String> localRunConfig() {
        return ImmutableMap.builder().put(APP_RUNNER_CLASS, LocalApplicationRunner.class.getName()).put("job.coordinator.factory", PassthroughJobCoordinatorFactory.class.getName()).put("job.coordination.utils.factory", PassthroughCoordinationUtilsFactory.class.getName()).put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()).put(TaskConfig.COMMIT_MS(), "-1").put("processor.id", "1").build();
    }

    private static void createConfigForSystemStore(Map<String, String> config) {
        config.put("stores.beamStore.factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
        config.put("stores.beamStore.key.serde", "byteSerde");
        config.put("stores.beamStore.msg.serde", "byteSerde");
        config.put("serializers.registry.byteSerde.class", ByteSerdeFactory.class.getName());
    }

    private ConfigBuilder(Map<PValue, String> idMap, Pipeline pipeline) {
        this.idMap = idMap;
        this.pipeline = pipeline;
    }

    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
        if (node.getTransform() instanceof Read.Bounded) {
            this.foundSource = true;
            this.processReadBounded(node, (Read.Bounded)node.getTransform());
        } else if (node.getTransform() instanceof Read.Unbounded) {
            this.foundSource = true;
            this.processReadUnbounded(node, (Read.Unbounded)node.getTransform());
        } else if (node.getTransform() instanceof ParDo.MultiOutput) {
            this.processParDo((ParDo.MultiOutput)node.getTransform());
        }
    }

    private <T> void processReadBounded(TransformHierarchy.Node node, Read.Bounded<T> transform) {
        String id = this.getId((PValue)Iterables.getOnlyElement(node.getOutputs().values()));
        BoundedSource source = transform.getSource();
        PCollection output = (PCollection)Iterables.getOnlyElement(node.toAppliedPTransform(this.pipeline).getOutputs().values());
        Coder coder = SamzaCoders.of(output);
        this.config.putAll(BoundedSourceSystem.createConfigFor(id, source, coder, node.getFullName()));
    }

    private <T> void processReadUnbounded(TransformHierarchy.Node node, Read.Unbounded<T> transform) {
        String id = this.getId((PValue)Iterables.getOnlyElement(node.getOutputs().values()));
        UnboundedSource source = transform.getSource();
        PCollection output = (PCollection)Iterables.getOnlyElement(node.toAppliedPTransform(this.pipeline).getOutputs().values());
        Coder coder = SamzaCoders.of(output);
        this.config.putAll(UnboundedSourceSystem.createConfigFor(id, source, coder, node.getFullName()));
    }

    private void processParDo(ParDo.MultiOutput<?, ?> parDo) {
        DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass());
        if (signature.usesState()) {
            for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
                String storeId = state.id();
                this.config.put("stores." + storeId + ".factory", "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");
                this.config.put("stores." + storeId + ".key.serde", "byteSerde");
                this.config.put("stores." + storeId + ".msg.serde", "byteSerde");
            }
        }
    }

    private String getId(PValue pvalue) {
        String id = this.idMap.get(pvalue);
        if (id == null) {
            throw new IllegalStateException(String.format("Could not find id for pvalue: %s", pvalue));
        }
        return id;
    }

    private void checkFoundSource() {
        if (!this.foundSource) {
            throw new IllegalStateException("Could not find any sources in pipeline!");
        }
    }

    private Map<String, String> getConfig() {
        return this.config;
    }

    private String writeValueAsJsonString(Object object) {
        try {
            return this.objectMapper.writeValueAsString(object);
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

