/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.server.starter.helix;

import java.util.concurrent.Semaphore;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentMessageHandlerFactory
implements MessageHandlerFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMessageHandlerFactory.class);
    private final Semaphore _refreshThreadSemaphore;
    private final InstanceDataManager _instanceDataManager;
    private final ServerMetrics _metrics;

    public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager, ServerMetrics metrics) {
        this._instanceDataManager = instanceDataManager;
        this._metrics = metrics;
        int maxParallelRefreshThreads = instanceDataManager.getMaxParallelRefreshThreads();
        this._refreshThreadSemaphore = maxParallelRefreshThreads > 0 ? new Semaphore(maxParallelRefreshThreads, true) : null;
    }

    private void acquireSema(String context, Logger logger) throws InterruptedException {
        if (this._refreshThreadSemaphore != null) {
            long startTime = System.currentTimeMillis();
            logger.info("Waiting for lock to refresh : {}, queue-length: {}", (Object)context, (Object)this._refreshThreadSemaphore.getQueueLength());
            this._refreshThreadSemaphore.acquire();
            logger.info("Acquired lock to refresh segment: {} (lock-time={}ms, queue-length={})", new Object[]{context, System.currentTimeMillis() - startTime, this._refreshThreadSemaphore.getQueueLength()});
        } else {
            LOGGER.info("Locking of refresh threads disabled (segment: {})", (Object)context);
        }
    }

    private void releaseSema() {
        if (this._refreshThreadSemaphore != null) {
            this._refreshThreadSemaphore.release();
        }
    }

    public MessageHandler createHandler(Message message, NotificationContext context) {
        String msgSubType;
        switch (msgSubType = message.getMsgSubType()) {
            case "REFRESH_SEGMENT": {
                return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), this._metrics, context);
            }
            case "RELOAD_SEGMENT": {
                return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), this._metrics, context);
            }
        }
        LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", (Object)msgSubType, (Object)message.getPartitionName());
        return new DefaultMessageHandler(message, this._metrics, context);
    }

    public String getMessageType() {
        return Message.MessageType.USER_DEFINE_MSG.toString();
    }

    public void reset() {
        LOGGER.info("Reset called");
    }

    private static class DefaultMessageHandler
    extends MessageHandler {
        final String _segmentName;
        final String _tableNameWithType;
        final ServerMetrics _metrics;
        final Logger _logger;

        DefaultMessageHandler(Message message, ServerMetrics metrics, NotificationContext context) {
            super(message, context);
            this._segmentName = message.getPartitionName();
            this._tableNameWithType = message.getResourceName();
            this._metrics = metrics;
            this._logger = LoggerFactory.getLogger((String)(this._tableNameWithType + "-" + ((Object)((Object)this)).getClass().getSimpleName()));
        }

        public HelixTaskResult handleMessage() throws InterruptedException {
            HelixTaskResult helixTaskResult = new HelixTaskResult();
            helixTaskResult.setSuccess(true);
            return helixTaskResult;
        }

        public void onError(Exception e, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            this._logger.error("onError: {}, {}", new Object[]{errorType, errorCode, e});
        }
    }

    private class SegmentReloadMessageHandler
    extends DefaultMessageHandler {
        private final boolean _forceDownload;

        SegmentReloadMessageHandler(SegmentReloadMessage segmentReloadMessage, ServerMetrics metrics, NotificationContext context) {
            super((Message)segmentReloadMessage, metrics, context);
            this._forceDownload = segmentReloadMessage.shouldForceDownload();
        }

        @Override
        public HelixTaskResult handleMessage() throws InterruptedException {
            HelixTaskResult helixTaskResult = new HelixTaskResult();
            this._logger.info("Handling message: {}", (Object)this._message);
            try {
                if (this._segmentName.equals("")) {
                    SegmentMessageHandlerFactory.this.acquireSema("ALL", this._logger);
                    SegmentMessageHandlerFactory.this._instanceDataManager.reloadAllSegments(this._tableNameWithType, this._forceDownload);
                } else {
                    SegmentMessageHandlerFactory.this.acquireSema(this._segmentName, this._logger);
                    SegmentMessageHandlerFactory.this._instanceDataManager.reloadSegment(this._tableNameWithType, this._segmentName, this._forceDownload);
                }
                helixTaskResult.setSuccess(true);
            }
            catch (Throwable e) {
                this._metrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.RELOAD_FAILURES, 1L);
                throw new RuntimeException("Caught exception while reloading segment: " + this._segmentName + " in table: " + this._tableNameWithType, e);
            }
            finally {
                SegmentMessageHandlerFactory.this.releaseSema();
            }
            return helixTaskResult;
        }
    }

    private class SegmentRefreshMessageHandler
    extends DefaultMessageHandler {
        SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, ServerMetrics metrics, NotificationContext context) {
            super((Message)refreshMessage, metrics, context);
        }

        @Override
        public HelixTaskResult handleMessage() throws InterruptedException {
            HelixTaskResult result = new HelixTaskResult();
            this._logger.info("Handling message: {}", (Object)this._message);
            try {
                SegmentMessageHandlerFactory.this.acquireSema(this._segmentName, this._logger);
                SegmentMessageHandlerFactory.this._instanceDataManager.addOrReplaceSegment(this._tableNameWithType, this._segmentName);
                result.setSuccess(true);
            }
            catch (Exception e) {
                this._metrics.addMeteredTableValue(this._tableNameWithType, (AbstractMetrics.Meter)ServerMeter.REFRESH_FAILURES, 1L);
                Utils.rethrowException((Throwable)e);
            }
            finally {
                SegmentMessageHandlerFactory.this.releaseSema();
            }
            return result;
        }
    }
}

