/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.job_monitor;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroJobSpecKafkaJobMonitor
extends KafkaAvroJobMonitor<AvroJobSpec> {
    private static final Logger log = LoggerFactory.getLogger(AvroJobSpecKafkaJobMonitor.class);
    public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec";
    public static final String TOPIC_KEY = "topic";
    public static final String SCHEMA_VERSION_READER_CLASS = "versionReaderClass";
    protected static final String VERB_KEY = "Verb";
    private static final Config DEFAULTS = ConfigFactory.parseMap((Map)ImmutableMap.of((Object)"versionReaderClass", (Object)FixedSchemaVersionWriter.class.getName()));

    protected AvroJobSpecKafkaJobMonitor(String topic, MutableJobCatalog catalog, Config limitedScopeConfig, SchemaVersionWriter<?> versionWriter) throws IOException {
        super(topic, catalog, limitedScopeConfig, AvroJobSpec.SCHEMA$, versionWriter);
    }

    @Override
    protected void createMetrics() {
        super.createMetrics();
    }

    @Override
    public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
        JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
        Properties props = new Properties();
        props.putAll(record.getProperties());
        jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion()).withDescription(record.getDescription()).withConfigAsProperties(props).withMetadata(record.getMetadata());
        if (!record.getTemplateUri().isEmpty()) {
            try {
                jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
            }
            catch (URISyntaxException e) {
                log.error("could not parse template URI " + record.getTemplateUri());
            }
        }
        String verbName = record.getMetadata().get(VERB_KEY);
        SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf((String)verbName);
        JobSpec jobSpec = jobSpecBuilder.build();
        log.info("Parsed job spec " + jobSpec.toString());
        if (verb == SpecExecutor.Verb.ADD || verb == SpecExecutor.Verb.UPDATE) {
            return Lists.newArrayList((Object[])new Either[]{Either.left((Object)jobSpec)});
        }
        return Lists.newArrayList((Object[])new Either[]{Either.right((Object)jobSpec.getUri())});
    }

    public static class Factory
    implements JobSpecMonitorFactory {
        @Override
        public JobSpecMonitor forJobCatalog(GobblinInstanceDriver instanceDriver, MutableJobCatalog jobCatalog) throws IOException {
            Config config = instanceDriver.getSysConfig().getConfig().getConfig(AvroJobSpecKafkaJobMonitor.CONFIG_PREFIX).withFallback((ConfigMergeable)DEFAULTS);
            return this.forConfig(config, jobCatalog);
        }

        public JobSpecMonitor forConfig(Config localScopeConfig, MutableJobCatalog jobCatalog) throws IOException {
            SchemaVersionWriter versionWriter;
            Preconditions.checkArgument((boolean)localScopeConfig.hasPath(AvroJobSpecKafkaJobMonitor.TOPIC_KEY));
            Config config = localScopeConfig.withFallback((ConfigMergeable)DEFAULTS);
            String topic = config.getString(AvroJobSpecKafkaJobMonitor.TOPIC_KEY);
            try {
                versionWriter = (SchemaVersionWriter)GobblinConstructorUtils.invokeLongestConstructor(Class.forName(config.getString(AvroJobSpecKafkaJobMonitor.SCHEMA_VERSION_READER_CLASS)), (Object[])new Object[]{config});
            }
            catch (ReflectiveOperationException roe) {
                throw new IllegalArgumentException(roe);
            }
            return new AvroJobSpecKafkaJobMonitor(topic, jobCatalog, config, versionWriter);
        }
    }
}

