/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.server.starter.helix;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.server.starter.helix.SegmentLocks;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class HelixInstanceDataManager
implements InstanceDataManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(HelixInstanceDataManager.class);
    private final ConcurrentHashMap<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap();
    private HelixInstanceDataManagerConfig _instanceDataManagerConfig;
    private String _instanceId;
    private HelixManager _helixManager;
    private ServerMetrics _serverMetrics;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private String _authToken;
    private SegmentUploader _segmentUploader;
    private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;

    public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics) throws ConfigurationException {
        File instanceSegmentTarDir;
        LOGGER.info("Initializing Helix instance data manager");
        this._instanceDataManagerConfig = new HelixInstanceDataManagerConfig(config);
        LOGGER.info("HelixInstanceDataManagerConfig: {}", (Object)this._instanceDataManagerConfig);
        this._instanceId = this._instanceDataManagerConfig.getInstanceId();
        this._helixManager = helixManager;
        this._serverMetrics = serverMetrics;
        this._authToken = config.getProperty("auth.token");
        this._segmentUploader = new PinotFSSegmentUploader(this._instanceDataManagerConfig.getSegmentStoreUri(), 10000);
        File instanceDataDir = new File(this._instanceDataManagerConfig.getInstanceDataDir());
        if (!instanceDataDir.exists()) {
            Preconditions.checkState((boolean)instanceDataDir.mkdirs());
        }
        if (!(instanceSegmentTarDir = new File(this._instanceDataManagerConfig.getInstanceSegmentTarDir())).exists()) {
            Preconditions.checkState((boolean)instanceSegmentTarDir.mkdirs());
        }
        SegmentBuildTimeLeaseExtender.initExecutor();
        TableDataManagerProvider.init((InstanceDataManagerConfig)this._instanceDataManagerConfig);
        LOGGER.info("Initialized Helix instance data manager");
        this._errorCache = CacheBuilder.newBuilder().maximumSize(this._instanceDataManagerConfig.getErrorCacheSize()).build((CacheLoader)new CacheLoader<Pair<String, String>, SegmentErrorInfo>(){

            public SegmentErrorInfo load(Pair<String, String> tableNameWithTypeSegmentNamePair) {
                return null;
            }
        });
    }

    public String getInstanceId() {
        return this._instanceId;
    }

    public synchronized void start() {
        this._propertyStore = this._helixManager.getHelixPropertyStore();
        LOGGER.info("Helix instance data manager started");
    }

    public synchronized void shutDown() {
        for (TableDataManager tableDataManager : this._tableDataManagerMap.values()) {
            tableDataManager.shutDown();
        }
        SegmentBuildTimeLeaseExtender.shutdownExecutor();
        LOGGER.info("Helix instance data manager shut down");
    }

    public void addOfflineSegment(String offlineTableName, String segmentName, File indexDir) throws Exception {
        LOGGER.info("Adding segment: {} to table: {}", (Object)segmentName, (Object)offlineTableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)offlineTableName);
        Preconditions.checkNotNull((Object)tableConfig);
        this._tableDataManagerMap.computeIfAbsent(offlineTableName, k -> this.createTableDataManager((String)k, tableConfig)).addSegment(indexDir, new IndexLoadingConfig((InstanceDataManagerConfig)this._instanceDataManagerConfig, tableConfig));
        LOGGER.info("Added segment: {} to table: {}", (Object)segmentName, (Object)offlineTableName);
    }

    public void addRealtimeSegment(String realtimeTableName, String segmentName) throws Exception {
        LOGGER.info("Adding segment: {} to table: {}", (Object)segmentName, (Object)realtimeTableName);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)realtimeTableName);
        Preconditions.checkNotNull((Object)tableConfig);
        this._tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> this.createTableDataManager((String)k, tableConfig)).addSegment(segmentName, tableConfig, new IndexLoadingConfig((InstanceDataManagerConfig)this._instanceDataManagerConfig, tableConfig));
        LOGGER.info("Added segment: {} to table: {}", (Object)segmentName, (Object)realtimeTableName);
    }

    private TableDataManager createTableDataManager(String tableNameWithType, TableConfig tableConfig) {
        LOGGER.info("Creating table data manager for table: {}", (Object)tableNameWithType);
        TableDataManagerConfig tableDataManagerConfig = TableDataManagerConfig.getDefaultHelixTableDataManagerConfig((InstanceDataManagerConfig)this._instanceDataManagerConfig, (String)tableNameWithType);
        tableDataManagerConfig.overrideConfigs(tableConfig, this._authToken);
        TableDataManager tableDataManager = TableDataManagerProvider.getTableDataManager((TableDataManagerConfig)tableDataManagerConfig, (String)this._instanceId, this._propertyStore, (ServerMetrics)this._serverMetrics, (HelixManager)this._helixManager, this._errorCache);
        tableDataManager.start();
        LOGGER.info("Created table data manager for table: {}", (Object)tableNameWithType);
        return tableDataManager;
    }

    public void removeSegment(String tableNameWithType, String segmentName) {
        LOGGER.info("Removing segment: {} from table: {}", (Object)segmentName, (Object)tableNameWithType);
        this._tableDataManagerMap.computeIfPresent(tableNameWithType, (k, v) -> {
            v.removeSegment(segmentName);
            LOGGER.info("Removed segment: {} from table: {}", (Object)segmentName, k);
            if (v.getNumSegments() == 0) {
                v.shutDown();
                return null;
            }
            return v;
        });
    }

    public void reloadSegment(String tableNameWithType, String segmentName, boolean forceDownload) throws Exception {
        LOGGER.info("Reloading single segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
        SegmentMetadata segmentMetadata = this.getSegmentMetadata(tableNameWithType, segmentName);
        if (segmentMetadata == null) {
            LOGGER.info("Segment metadata is null. Skip reloading segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
            return;
        }
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        Preconditions.checkNotNull((Object)tableConfig);
        Schema schema = ZKMetadataProvider.getTableSchema(this._propertyStore, (String)tableNameWithType);
        this.reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload);
        LOGGER.info("Reloaded single segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
    }

    public void reloadAllSegments(String tableNameWithType, boolean forceDownload) {
        LOGGER.info("Reloading all segments in table: {}", (Object)tableNameWithType);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        Preconditions.checkNotNull((Object)tableConfig);
        Schema schema = ZKMetadataProvider.getTableSchema(this._propertyStore, (String)tableNameWithType);
        ArrayList<String> failedSegments = new ArrayList<String>();
        Exception sampleException = null;
        List<SegmentMetadata> segmentsMetadata = this.getAllSegmentsMetadata(tableNameWithType);
        for (SegmentMetadata segmentMetadata : segmentsMetadata) {
            try {
                this.reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload);
            }
            catch (Exception e) {
                String segmentName = segmentMetadata.getName();
                LOGGER.error("Caught exception while reloading segment: {} in table: {}", new Object[]{segmentName, tableNameWithType, e});
                failedSegments.add(segmentName);
                sampleException = e;
            }
        }
        if (sampleException != null) {
            throw new RuntimeException(String.format("Failed to reload %d/%d segments: %s in table: %s", failedSegments.size(), segmentsMetadata.size(), failedSegments, tableNameWithType), sampleException);
        }
        LOGGER.info("Reloaded all segments in table: {}", (Object)tableNameWithType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reloadSegment(String tableNameWithType, SegmentMetadata segmentMetadata, TableConfig tableConfig, @Nullable Schema schema, boolean forceDownload) throws Exception {
        String segmentName = segmentMetadata.getName();
        LOGGER.info("Reloading segment: {} in table: {} with forceDownload: {}", new Object[]{segmentName, tableNameWithType, forceDownload});
        TableDataManager tableDataManager = this._tableDataManagerMap.get(tableNameWithType);
        if (tableDataManager == null) {
            LOGGER.warn("Failed to find table data manager for table: {}, skipping reloading segment", (Object)tableNameWithType);
            return;
        }
        File indexDir = segmentMetadata.getIndexDir();
        if (indexDir == null) {
            if (!this._instanceDataManagerConfig.shouldReloadConsumingSegment()) {
                LOGGER.info("Skip reloading REALTIME consuming segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
                return;
            }
            Preconditions.checkState((schema != null ? 1 : 0) != 0, (String)"Failed to find schema for table: {}", (Object)tableNameWithType);
            LOGGER.info("Try reloading REALTIME consuming segment: {} in table: {}", (Object)segmentName, (Object)tableNameWithType);
            SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
            if (segmentDataManager != null) {
                try {
                    MutableSegmentImpl mutableSegment = (MutableSegmentImpl)segmentDataManager.getSegment();
                    mutableSegment.addExtraColumns(schema);
                }
                finally {
                    tableDataManager.releaseSegment(segmentDataManager);
                }
            }
            return;
        }
        SegmentZKMetadata zkMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
        Preconditions.checkNotNull((Object)zkMetadata);
        Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName);
        try {
            segmentLock.lock();
            tableDataManager.reloadSegment(segmentName, new IndexLoadingConfig((InstanceDataManagerConfig)this._instanceDataManagerConfig, tableConfig), zkMetadata, segmentMetadata, schema, forceDownload);
            LOGGER.info("Reloaded segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
        }
        finally {
            segmentLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addOrReplaceSegment(String tableNameWithType, String segmentName) throws Exception {
        LOGGER.info("Adding or replacing segment: {} for table: {}", (Object)segmentName, (Object)tableNameWithType);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, (String)tableNameWithType);
        Preconditions.checkNotNull((Object)tableConfig);
        SegmentZKMetadata zkMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, (String)tableNameWithType, (String)segmentName);
        Preconditions.checkNotNull((Object)zkMetadata);
        Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName);
        try {
            segmentLock.lock();
            SegmentMetadata localMetadata = this.getSegmentMetadata(tableNameWithType, segmentName);
            this._tableDataManagerMap.computeIfAbsent(tableNameWithType, k -> this.createTableDataManager((String)k, tableConfig)).addOrReplaceSegment(segmentName, new IndexLoadingConfig((InstanceDataManagerConfig)this._instanceDataManagerConfig, tableConfig), zkMetadata, localMetadata);
            LOGGER.info("Added or replaced segment: {} of table: {}", (Object)segmentName, (Object)tableNameWithType);
        }
        finally {
            segmentLock.unlock();
        }
    }

    public Set<String> getAllTables() {
        return this._tableDataManagerMap.keySet();
    }

    @Nullable
    public TableDataManager getTableDataManager(String tableNameWithType) {
        return this._tableDataManagerMap.get(tableNameWithType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public SegmentMetadata getSegmentMetadata(String tableNameWithType, String segmentName) {
        TableDataManager tableDataManager = this._tableDataManagerMap.get(tableNameWithType);
        if (tableDataManager != null) {
            SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
            if (segmentDataManager == null) {
                return null;
            }
            try {
                SegmentMetadata segmentMetadata = segmentDataManager.getSegment().getSegmentMetadata();
                return segmentMetadata;
            }
            finally {
                tableDataManager.releaseSegment(segmentDataManager);
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SegmentMetadata> getAllSegmentsMetadata(String tableNameWithType) {
        TableDataManager tableDataManager = this._tableDataManagerMap.get(tableNameWithType);
        if (tableDataManager == null) {
            return Collections.emptyList();
        }
        List segmentDataManagers = tableDataManager.acquireAllSegments();
        try {
            ArrayList<SegmentMetadata> segmentsMetadata = new ArrayList<SegmentMetadata>(segmentDataManagers.size());
            for (SegmentDataManager segmentDataManager : segmentDataManagers) {
                segmentsMetadata.add(segmentDataManager.getSegment().getSegmentMetadata());
            }
            ArrayList<SegmentMetadata> arrayList = segmentsMetadata;
            return arrayList;
        }
        finally {
            for (SegmentDataManager segmentDataManager : segmentDataManagers) {
                tableDataManager.releaseSegment(segmentDataManager);
            }
        }
    }

    public File getSegmentDataDirectory(String tableNameWithType, String segmentName) {
        return new File(new File(this._instanceDataManagerConfig.getInstanceDataDir(), tableNameWithType), segmentName);
    }

    public String getSegmentFileDirectory() {
        return this._instanceDataManagerConfig.getInstanceSegmentTarDir();
    }

    public int getMaxParallelRefreshThreads() {
        return this._instanceDataManagerConfig.getMaxParallelRefreshThreads();
    }

    public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
        return this._propertyStore;
    }

    public SegmentUploader getSegmentUploader() {
        return this._segmentUploader;
    }
}

