/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.query.materializedview;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.query.materializedview.DerivativeDataSource;
import org.apache.druid.query.materializedview.MaterializedViewConfig;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.tweak.HandleCallback;

@ManageLifecycle
public class DerivativeDataSourceManager {
    private static final EmittingLogger log = new EmittingLogger(DerivativeDataSourceManager.class);
    private static final AtomicReference<ConcurrentHashMap<String, SortedSet<DerivativeDataSource>>> DERIVATIVES_REF = new AtomicReference(new ConcurrentHashMap());
    private final MaterializedViewConfig config;
    private final Supplier<MetadataStorageTablesConfig> dbTables;
    private final SQLMetadataConnector connector;
    private final ObjectMapper objectMapper;
    private final Object lock = new Object();
    private boolean started = false;
    private ListeningScheduledExecutorService exec = null;
    private ListenableFuture<?> future = null;

    @Inject
    public DerivativeDataSourceManager(MaterializedViewConfig config, Supplier<MetadataStorageTablesConfig> dbTables, ObjectMapper objectMapper, SQLMetadataConnector connector) {
        this.config = config;
        this.dbTables = dbTables;
        this.objectMapper = objectMapper;
        this.connector = connector;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() {
        log.info("starting derivatives manager.", new Object[0]);
        Object object = this.lock;
        synchronized (object) {
            if (this.started) {
                return;
            }
            this.exec = MoreExecutors.listeningDecorator((ScheduledExecutorService)Execs.scheduledSingleThreaded((String)"DerivativeDataSourceManager-Exec-%d"));
            Duration delay = this.config.getPollDuration().toStandardDuration();
            this.future = this.exec.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        DerivativeDataSourceManager.this.updateDerivatives();
                    }
                    catch (Exception e) {
                        log.makeAlert((Throwable)e, "uncaught exception in derivatives manager updating thread", new Object[0]).emit();
                    }
                }
            }, 0L, delay.getMillis(), TimeUnit.MILLISECONDS);
            this.started = true;
        }
        log.info("Derivatives manager started.", new Object[0]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.started) {
                return;
            }
            this.started = false;
            this.future.cancel(true);
            this.future = null;
            DERIVATIVES_REF.set(new ConcurrentHashMap());
            this.exec.shutdownNow();
            this.exec = null;
        }
    }

    public static ImmutableSet<DerivativeDataSource> getDerivatives(String datasource) {
        return ImmutableSet.copyOf((Collection)DERIVATIVES_REF.get().getOrDefault(datasource, new TreeSet()));
    }

    public static ImmutableMap<String, Set<DerivativeDataSource>> getAllDerivatives() {
        return ImmutableMap.copyOf((Map)DERIVATIVES_REF.get());
    }

    private void updateDerivatives() {
        ConcurrentHashMap<String, SortedSet<DerivativeDataSource>> current;
        List derivativesInDatabase = (List)this.connector.retryWithHandle(handle -> handle.createQuery(StringUtils.format((String)"SELECT DISTINCT dataSource,commit_metadata_payload FROM %1$s", (Object[])new Object[]{((MetadataStorageTablesConfig)this.dbTables.get()).getDataSourceTable()})).map((index, r, ctx) -> {
            String datasourceName = r.getString("dataSource");
            DataSourceMetadata payload = (DataSourceMetadata)JacksonUtils.readValue((ObjectMapper)this.objectMapper, (byte[])r.getBytes("commit_metadata_payload"), DataSourceMetadata.class);
            if (!(payload instanceof DerivativeDataSourceMetadata)) {
                return null;
            }
            DerivativeDataSourceMetadata metadata = (DerivativeDataSourceMetadata)payload;
            return new Pair((Object)datasourceName, (Object)metadata);
        }).list());
        List derivativeDataSources = derivativesInDatabase.parallelStream().filter(data -> data != null).map(derivatives -> {
            String name = (String)derivatives.lhs;
            DerivativeDataSourceMetadata metadata = (DerivativeDataSourceMetadata)derivatives.rhs;
            String baseDataSource = metadata.getBaseDataSource();
            long avgSizePerGranularity = this.getAvgSizePerGranularity(name);
            log.info("find derivatives: {bases=%s, derivative=%s, dimensions=%s, metrics=%s, avgSize=%s}", new Object[]{baseDataSource, name, metadata.getDimensions(), metadata.getMetrics(), avgSizePerGranularity});
            return new DerivativeDataSource(name, baseDataSource, metadata.getColumns(), avgSizePerGranularity);
        }).filter(derivatives -> derivatives.getAvgSizeBasedGranularity() > 0L).collect(Collectors.toList());
        ConcurrentHashMap<String, SortedSet> newDerivatives = new ConcurrentHashMap<String, SortedSet>();
        for (DerivativeDataSource derivative : derivativeDataSources) {
            newDerivatives.computeIfAbsent(derivative.getBaseDataSource(), ds -> new TreeSet()).add(derivative);
        }
        while (!DERIVATIVES_REF.compareAndSet(current = DERIVATIVES_REF.get(), newDerivatives)) {
        }
    }

    private long getAvgSizePerGranularity(final String datasource) {
        return (Long)this.connector.retryWithHandle((HandleCallback)new HandleCallback<Long>(){
            Set<Interval> intervals = new HashSet<Interval>();
            long totalSize = 0L;

            public Long withHandle(Handle handle) {
                ((Query)handle.createQuery(StringUtils.format((String)"SELECT start,%1$send%1$s,payload FROM %2$s WHERE used = true AND dataSource = :dataSource", (Object[])new Object[]{DerivativeDataSourceManager.this.connector.getQuoteString(), ((MetadataStorageTablesConfig)DerivativeDataSourceManager.this.dbTables.get()).getSegmentsTable()})).bind("dataSource", datasource)).map((index, r, ctx) -> {
                    this.intervals.add(Intervals.utc((long)DateTimes.of((String)r.getString("start")).getMillis(), (long)DateTimes.of((String)r.getString("end")).getMillis()));
                    DataSegment segment = (DataSegment)JacksonUtils.readValue((ObjectMapper)DerivativeDataSourceManager.this.objectMapper, (byte[])r.getBytes("payload"), DataSegment.class);
                    this.totalSize += segment.getSize();
                    return null;
                }).list();
                return this.intervals.isEmpty() ? 0L : this.totalSize / (long)this.intervals.size();
            }
        });
    }
}

