/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.job_monitor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaJobMonitor
extends HighLevelConsumer<byte[], byte[]>
implements JobSpecMonitor {
    private static final Logger log = LoggerFactory.getLogger(KafkaJobMonitor.class);
    public static final String KAFKA_JOB_MONITOR_PREFIX = "jobSpecMonitor.kafka";
    public static final String KAFKA_AUTO_OFFSET_RESET_KEY = "jobSpecMonitor.kafka.auto.offset.reset";
    public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
    public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
    protected DatasetStateStore datasetStateStore;
    protected final MutableJobCatalog jobCatalog;
    protected ContextAwareMeter newSpecs;
    protected ContextAwareMeter updatedSpecs;
    protected ContextAwareMeter cancelledSpecs;
    protected ContextAwareMeter removedSpecs;
    protected ContextAwareMeter totalSpecs;

    public abstract Collection<JobSpec> parseJobSpec(byte[] var1) throws IOException;

    public KafkaJobMonitor(String topic, MutableJobCatalog catalog, Config config) {
        super(topic, ConfigUtils.getConfigOrEmpty((Config)config, (String)KAFKA_JOB_MONITOR_PREFIX), 1);
        this.jobCatalog = catalog;
        try {
            this.datasetStateStore = DatasetStateStore.buildDatasetStateStore((Config)config);
        }
        catch (Exception e) {
            log.warn("DatasetStateStore could not be created.", (Throwable)e);
        }
    }

    @Override
    protected void createMetrics() {
        super.createMetrics();
        this.newSpecs = this.getMetricContext().contextAwareMeter("gobblin.jobMonitor.kafka.newSpecs");
        this.updatedSpecs = this.getMetricContext().contextAwareMeter("gobblin.jobMonitor.kafka.updatedSpecs");
        this.removedSpecs = this.getMetricContext().contextAwareMeter("gobblin.jobMonitor.kafka.removedSpecs");
        this.cancelledSpecs = this.getMetricContext().contextAwareMeter("gobblin.jobMonitor.kafka.cancelledSpecs");
        this.totalSpecs = this.getMetricContext().contextAwareMeter("gobblin.jobMonitor.kafka.totalSpecs");
    }

    @Override
    @VisibleForTesting
    protected void buildMetricsContextAndMetrics() {
        super.buildMetricsContextAndMetrics();
    }

    @Override
    @VisibleForTesting
    protected void shutdownMetrics() throws IOException {
        super.shutdownMetrics();
    }

    @Override
    protected void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) {
        try {
            Collection<JobSpec> parsedCollection = this.parseJobSpec((byte[])message.getValue());
            block11: for (JobSpec parsedMessage : parsedCollection) {
                SpecExecutor.Verb verb;
                try {
                    verb = SpecExecutor.Verb.valueOf((String)parsedMessage.getMetadata().get("Verb"));
                }
                catch (IllegalArgumentException | NullPointerException e) {
                    log.error("Unknown verb {} for spec {}", (Object)parsedMessage.getMetadata().get("Verb"), (Object)parsedMessage.getUri());
                    continue;
                }
                this.totalSpecs.mark();
                switch (verb) {
                    case ADD: {
                        this.newSpecs.mark();
                        this.jobCatalog.put(parsedMessage);
                        continue block11;
                    }
                    case UPDATE: {
                        this.updatedSpecs.mark();
                        this.jobCatalog.put(parsedMessage);
                        continue block11;
                    }
                    case UNKNOWN: {
                        log.warn("Job Spec Verb is 'UNKNOWN', putting this spec in job catalog anyway.");
                        this.jobCatalog.put(parsedMessage);
                        continue block11;
                    }
                    case DELETE: {
                        this.removedSpecs.mark();
                        URI jobSpecUri = parsedMessage.getUri();
                        this.jobCatalog.remove(jobSpecUri);
                        this.deleteStateStore(jobSpecUri);
                        continue block11;
                    }
                    case CANCEL: {
                        this.cancelledSpecs.mark();
                        this.jobCatalog.remove(parsedMessage.getUri(), true);
                        continue block11;
                    }
                }
                log.error("Cannot process spec {} with verb {}", (Object)parsedMessage.getUri(), (Object)verb);
            }
        }
        catch (IOException ioe) {
            String messageStr = new String((byte[])message.getValue(), Charsets.UTF_8);
            log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.getOffset(), messageStr), (Throwable)ioe);
        }
    }

    private void deleteStateStore(URI jobSpecUri) throws IOException {
        int EXPECTED_NUM_URI_TOKENS = 3;
        String[] uriTokens = jobSpecUri.getPath().split("/");
        if (null == this.datasetStateStore) {
            log.warn("Job state store deletion failed as datasetstore is not initialized.");
            return;
        }
        if (uriTokens.length != EXPECTED_NUM_URI_TOKENS) {
            log.error("Invalid URI {}.", (Object)jobSpecUri);
            return;
        }
        String jobName = uriTokens[EXPECTED_NUM_URI_TOKENS - 1];
        this.datasetStateStore.delete(jobName);
        log.info("JobSpec {} deleted with statestore.", (Object)jobSpecUri);
    }

    public ContextAwareMeter getNewSpecs() {
        return this.newSpecs;
    }

    public ContextAwareMeter getUpdatedSpecs() {
        return this.updatedSpecs;
    }

    public ContextAwareMeter getCancelledSpecs() {
        return this.cancelledSpecs;
    }

    public ContextAwareMeter getRemovedSpecs() {
        return this.removedSpecs;
    }

    public ContextAwareMeter getTotalSpecs() {
        return this.totalSpecs;
    }
}

