/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.spec_catalog;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.MutableSpecCatalog;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalog;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSearchObject;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.SpecCatalogListenersList;
import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.callbacks.CallbackResult;
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class FlowCatalog
extends AbstractIdleService
implements SpecCatalog,
MutableSpecCatalog {
    public static final String FLOWSPEC_STORE_CLASS_KEY = "flowSpec.store.class";
    public static final String FLOWSPEC_STORE_DIR_KEY = "flowSpec.store.dir";
    public static final String DEFAULT_FLOWSPEC_STORE_CLASS = FSSpecStore.class.getCanonicalName();
    public static final String FLOWSPEC_SERDE_CLASS_KEY = "flowSpec.serde.class";
    public static final String DEFAULT_FLOWSPEC_SERDE_CLASS = JavaSpecSerDe.class.getCanonicalName();
    protected final SpecCatalogListenersList listeners;
    protected final Logger log;
    protected final MetricContext metricContext;
    protected final MutableSpecCatalog.MutableStandardMetrics metrics;
    protected final SpecStore specStore;
    private final Map<String, Object> specSyncObjects = new HashMap<String, Object>();
    private final ClassAliasResolver<SpecStore> aliasResolver;

    public FlowCatalog(Config config) {
        this(config, (Optional<Logger>)Optional.absent());
    }

    public FlowCatalog(Config config, Optional<Logger> log) {
        this(config, log, (Optional<MetricContext>)Optional.absent(), true);
    }

    @Inject
    public FlowCatalog(Config config, GobblinInstanceEnvironment env) {
        this(config, (Optional<Logger>)Optional.of((Object)env.getLog()), (Optional<MetricContext>)Optional.of((Object)env.getMetricContext()), env.isInstrumentationEnabled());
    }

    public FlowCatalog(Config config, Optional<Logger> log, Optional<MetricContext> parentMetricContext, boolean instrumentationEnabled) {
        this.log = log.isPresent() ? (Logger)log.get() : LoggerFactory.getLogger(this.getClass());
        this.listeners = new SpecCatalogListenersList(log);
        if (instrumentationEnabled) {
            MetricContext realParentCtx = (MetricContext)parentMetricContext.or((Object)Instrumented.getMetricContext((State)new State(), this.getClass()));
            this.metricContext = realParentCtx.childBuilder(FlowCatalog.class.getSimpleName()).build();
            this.metrics = new MutableSpecCatalog.MutableStandardMetrics(this, (Optional<Config>)Optional.of((Object)config));
            this.addListener(this.metrics);
        } else {
            this.metricContext = null;
            this.metrics = null;
        }
        this.aliasResolver = new ClassAliasResolver(SpecStore.class);
        try {
            Config newConfig = config;
            if (config.hasPath(FLOWSPEC_STORE_DIR_KEY)) {
                newConfig = config.withValue("specStore.fs.dir", config.getValue(FLOWSPEC_STORE_DIR_KEY));
            }
            String specStoreClassName = ConfigUtils.getString((Config)config, (String)FLOWSPEC_STORE_CLASS_KEY, (String)DEFAULT_FLOWSPEC_STORE_CLASS);
            this.log.info(String.format("Using class name/alias [%s] for specstore", specStoreClassName));
            String specSerDeClassName = ConfigUtils.getString((Config)config, (String)FLOWSPEC_SERDE_CLASS_KEY, (String)DEFAULT_FLOWSPEC_SERDE_CLASS);
            this.log.info(String.format("Using class name/alias [%s] for spec serde", specSerDeClassName));
            SpecSerDe specSerDe = (SpecSerDe)ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver(SpecSerDe.class).resolve(specSerDeClassName)), (Object[])new Object[0]);
            this.specStore = (SpecStore)ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specStoreClassName)), (Object[])new Object[]{newConfig, specSerDe});
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    protected void startUp() throws Exception {
    }

    protected void shutDown() throws Exception {
        this.listeners.close();
    }

    protected void notifyAllListeners() {
        try {
            Iterator<URI> uriIterator = this.getSpecURIs();
            while (uriIterator.hasNext()) {
                this.listeners.onAddSpec(this.getSpecWrapper(uriIterator.next()));
            }
        }
        catch (IOException e) {
            this.log.error("Cannot retrieve specs from catalog:", (Throwable)e);
        }
    }

    @Override
    public void addListener(SpecCatalogListener specListener) {
        Preconditions.checkNotNull((Object)specListener);
        this.listeners.addListener(specListener);
        if (this.state() == Service.State.RUNNING) {
            try {
                Iterator<URI> uriIterator = this.getSpecURIs();
                while (uriIterator.hasNext()) {
                    SpecCatalogListener.AddSpecCallback addJobCallback = new SpecCatalogListener.AddSpecCallback(this.getSpecWrapper(uriIterator.next()));
                    this.listeners.callbackOneListener((Function<SpecCatalogListener, AddSpecResponse>)addJobCallback, specListener);
                }
            }
            catch (IOException e) {
                this.log.error("Cannot retrieve specs from catalog:", (Throwable)e);
            }
        }
    }

    @Override
    public void removeListener(SpecCatalogListener specCatalogListener) {
        this.listeners.removeListener(specCatalogListener);
    }

    @Override
    public void registerWeakSpecCatalogListener(SpecCatalogListener specCatalogListener) {
        this.listeners.registerWeakSpecCatalogListener(specCatalogListener);
    }

    @Nonnull
    public MetricContext getMetricContext() {
        return this.metricContext;
    }

    public boolean isInstrumentationEnabled() {
        return null != this.metricContext;
    }

    public List<Tag<?>> generateTags(State state) {
        return Collections.emptyList();
    }

    public void switchMetricContext(List<Tag<?>> tags) {
        throw new UnsupportedOperationException();
    }

    public void switchMetricContext(MetricContext context) {
        throw new UnsupportedOperationException();
    }

    @Override
    public SpecCatalog.StandardMetrics getMetrics() {
        return this.metrics;
    }

    public Iterator<URI> getSpecURIs() throws IOException {
        return this.specStore.getSpecURIs();
    }

    public Iterator<URI> getSpecURISWithTag(String tag) throws IOException {
        return this.specStore.getSpecURIsWithTag(tag);
    }

    @Override
    @Deprecated
    public Collection<Spec> getSpecs() {
        try {
            return this.specStore.getSpecs();
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
        }
    }

    public boolean exists(URI uri) {
        try {
            return this.specStore.exists(uri);
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
        }
    }

    @Override
    public Spec getSpecs(URI uri) throws SpecNotFoundException {
        try {
            return this.specStore.getSpec(uri);
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
        }
    }

    @Override
    public Collection<Spec> getSpecs(SpecSearchObject specSearchObject) {
        try {
            return this.specStore.getSpecs(specSearchObject);
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + specSearchObject, e);
        }
    }

    public Collection<Spec> getAllSpecs() {
        try {
            return this.specStore.getSpecs();
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot retrieve all specs from Spec stores", e);
        }
    }

    public Spec getSpecWrapper(URI uri) {
        Spec spec = null;
        try {
            spec = this.getSpecs(uri);
        }
        catch (SpecNotFoundException snfe) {
            this.log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog, suspecting current modification on SpecStore", uri), (Throwable)snfe);
        }
        return spec;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
        HashMap<String, AddSpecResponse> responseMap = new HashMap<String, AddSpecResponse>();
        FlowSpec flowSpec = (FlowSpec)spec;
        Preconditions.checkState((this.state() == Service.State.RUNNING ? 1 : 0) != 0, (Object)String.format("%s is not running.", this.getClass().getName()));
        Preconditions.checkNotNull((Object)flowSpec);
        this.log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", flowSpec.getUri(), flowSpec.getConfigAsProperties()));
        Object syncObject = new Object();
        this.specSyncObjects.put(flowSpec.getUri().toString(), syncObject);
        if (triggerListener) {
            AddSpecResponse response = this.listeners.onAddSpec(flowSpec);
            for (Map.Entry entry : ((CallbacksDispatcher.CallbackResults)response.getValue()).getSuccesses().entrySet()) {
                responseMap.put(((SpecCatalogListener)entry.getKey()).getName(), (AddSpecResponse)((CallbackResult)entry.getValue()).getResult());
            }
        }
        if (FlowCatalog.isCompileSuccessful(responseMap)) {
            Object object = syncObject;
            synchronized (object) {
                try {
                    if (!flowSpec.isExplain().booleanValue()) {
                        long startTime = System.currentTimeMillis();
                        this.specStore.addSpec(spec);
                        this.metrics.updatePutSpecTime(startTime);
                    }
                    responseMap.put("compilation.successful", new AddSpecResponse<String>("true"));
                }
                catch (IOException e) {
                    throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
                }
                finally {
                    syncObject.notifyAll();
                    this.specSyncObjects.remove(flowSpec.getUri().toString());
                }
            }
        }
        responseMap.put("compilation.successful", new AddSpecResponse<String>("false"));
        return responseMap;
    }

    public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
        AddSpecResponse<Object> addSpecResponse = responseMap.getOrDefault("org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler", new AddSpecResponse<Object>(null));
        return FlowCatalog.isCompileSuccessful(addSpecResponse.getValue());
    }

    public static boolean isCompileSuccessful(String dag) {
        return dag != null && !dag.contains(ConfigException.class.getSimpleName());
    }

    @Override
    public Map<String, AddSpecResponse> put(Spec spec) {
        return this.put(spec, true);
    }

    public void remove(URI uri) {
        this.remove(uri, new Properties());
    }

    @Override
    public void remove(URI uri, Properties headers) {
        this.remove(uri, headers, true);
    }

    public void remove(URI uri, Properties headers, boolean triggerListener) {
        try {
            Preconditions.checkState((this.state() == Service.State.RUNNING ? 1 : 0) != 0, (Object)String.format("%s is not running.", this.getClass().getName()));
            Preconditions.checkNotNull((Object)uri);
            long startTime = System.currentTimeMillis();
            this.log.info(String.format("Removing FlowSpec with URI: %s", uri));
            this.specStore.deleteSpec(uri);
            this.metrics.updateRemoveSpecTime(startTime);
            if (triggerListener) {
                this.listeners.onDeleteSpec(uri, "", headers);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
        }
    }

    public Object getSyncObject(String specUri) {
        return this.specSyncObjects.getOrDefault(specUri, null);
    }

    public SpecStore getSpecStore() {
        return this.specStore;
    }
}

