/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.loading;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.druid.common.config.Configs;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DataSegmentChangeResponse;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.SegmentChangeStatus;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import org.apache.druid.server.coordinator.loading.LoadPeonCallback;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadingRateTracker;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentHolder;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.http.SegmentLoadingCapabilities;
import org.apache.druid.server.http.SegmentLoadingMode;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;

public class HttpLoadQueuePeon
implements LoadQueuePeon {
    public static final TypeReference<List<DataSegmentChangeRequest>> REQUEST_ENTITY_TYPE_REF = new TypeReference<List<DataSegmentChangeRequest>>(){};
    public static final TypeReference<List<DataSegmentChangeResponse>> RESPONSE_ENTITY_TYPE_REF = new TypeReference<List<DataSegmentChangeResponse>>(){};
    private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class);
    private static final long DEFAULT_TIMEOUT = 10000L;
    private final AtomicLong queuedSize = new AtomicLong(0L);
    private final AtomicReference<CoordinatorRunStats> stats = new AtomicReference<CoordinatorRunStats>(new CoordinatorRunStats());
    private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentHashMap<DataSegment, SegmentHolder>();
    private final ConcurrentMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentHashMap<DataSegment, SegmentHolder>();
    private final Set<DataSegment> segmentsMarkedToDrop = ConcurrentHashMap.newKeySet();
    private final LoadingRateTracker loadingRateTracker = new LoadingRateTracker();
    private final Set<SegmentHolder> queuedSegments = new TreeSet<SegmentHolder>();
    private final Set<DataSegment> activeRequestSegments = new HashSet<DataSegment>();
    private final ScheduledExecutorService processingExecutor;
    private volatile boolean stopped = false;
    private final Object lock = new Object();
    private final HttpLoadQueuePeonConfig config;
    private final ObjectMapper jsonMapper;
    private final HttpClient httpClient;
    private final String serverId;
    private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
    private final ExecutorService callBackExecutor;
    private final Supplier<SegmentLoadingMode> loadingModeSupplier;
    private final ObjectWriter requestBodyWriter;
    private final SegmentLoadingCapabilities serverCapabilities;

    public HttpLoadQueuePeon(String baseUrl, ObjectMapper jsonMapper, HttpClient httpClient, HttpLoadQueuePeonConfig config, Supplier<SegmentLoadingMode> loadingModeSupplier, ScheduledExecutorService processingExecutor, ExecutorService callBackExecutor) {
        this.jsonMapper = jsonMapper;
        this.requestBodyWriter = jsonMapper.writerFor(REQUEST_ENTITY_TYPE_REF);
        this.httpClient = httpClient;
        this.config = config;
        this.processingExecutor = processingExecutor;
        this.callBackExecutor = callBackExecutor;
        this.serverId = baseUrl;
        this.loadingModeSupplier = loadingModeSupplier;
        this.serverCapabilities = this.fetchSegmentLoadingCapabilities();
    }

    private SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() {
        try {
            URL segmentLoadingCapabilitiesURL = new URL(new URL(this.serverId), "druid-internal/v1/segments/loadCapabilities");
            BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
            InputStream stream = (InputStream)this.httpClient.go(new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL).addHeader("Accept", "application/json"), (HttpResponseHandler)responseHandler, new Duration(10000L)).get();
            if (404 == responseHandler.getStatus()) {
                int batchSize = this.config.getBatchSize() == null ? 1 : this.config.getBatchSize();
                SegmentLoadingCapabilities defaultCapabilities = new SegmentLoadingCapabilities(batchSize, batchSize);
                log.warn("Historical capabilities endpoint not found at URL[%s]. Using default values[%s].", new Object[]{segmentLoadingCapabilitiesURL, defaultCapabilities});
                return defaultCapabilities;
            }
            if (200 != responseHandler.getStatus()) {
                log.makeAlert("Received status[%s] when fetching loading capabilities from server[%s]", new Object[]{responseHandler.getStatus(), this.serverId});
                throw new RE("Received status[%s] when fetching loading capabilities from server[%s]", new Object[]{responseHandler.getStatus(), this.serverId});
            }
            return (SegmentLoadingCapabilities)this.jsonMapper.readValue(stream, SegmentLoadingCapabilities.class);
        }
        catch (Throwable th) {
            throw new RE(th, "Received error while fetching historical capabilities from Server[%s].", new Object[]{this.serverId});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSegmentManagement() {
        if (this.stopped || !this.mainLoopInProgress.compareAndSet(false, true)) {
            log.trace("[%s]Ignoring tick. Either in-progress already or stopped.", new Object[]{this.serverId});
            return;
        }
        SegmentLoadingMode loadingMode = this.loadingModeSupplier.get();
        int batchSize = this.calculateBatchSize(loadingMode);
        ArrayList<DataSegmentChangeRequest> newRequests = new ArrayList<DataSegmentChangeRequest>(batchSize);
        Object object = this.lock;
        synchronized (object) {
            Iterator<SegmentHolder> queuedSegmentIterator = this.queuedSegments.iterator();
            while (newRequests.size() < batchSize && queuedSegmentIterator.hasNext()) {
                SegmentHolder holder = queuedSegmentIterator.next();
                DataSegment segment = holder.getSegment();
                if (holder.hasRequestTimedOut()) {
                    this.onRequestFailed(holder, SegmentChangeStatus.failed("timed out"));
                    queuedSegmentIterator.remove();
                    if (holder.isLoad()) {
                        this.segmentsToLoad.remove(segment);
                    } else {
                        this.segmentsToDrop.remove(segment);
                    }
                    this.activeRequestSegments.remove(segment);
                    continue;
                }
                newRequests.add(holder.getChangeRequest());
                holder.markRequestSentToServer();
                this.activeRequestSegments.add(segment);
            }
            if (this.segmentsToLoad.isEmpty()) {
                this.loadingRateTracker.markBatchLoadingFinished();
            }
        }
        if (newRequests.isEmpty()) {
            log.trace("[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", new Object[]{this.serverId, this.segmentsToLoad.size(), this.segmentsToDrop.size(), batchSize});
            this.mainLoopInProgress.set(false);
            return;
        }
        try {
            log.trace("Sending [%d] load/drop requests to Server[%s] in loadingMode[%s].", new Object[]{newRequests.size(), this.serverId, loadingMode});
            boolean hasLoadRequests = newRequests.stream().anyMatch(r -> r instanceof SegmentChangeRequestLoad);
            if (hasLoadRequests && !this.loadingRateTracker.isLoadingBatch()) {
                this.loadingRateTracker.markBatchLoadingStarted();
            }
            final URL changeRequestURL = new URL(new URL(this.serverId), StringUtils.nonStrictFormat((String)"druid-internal/v1/segments/changeRequests?timeout=%d&loadingMode=%s", (Object[])new Object[]{this.config.getHostTimeout().getMillis(), loadingMode}));
            final BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
            ListenableFuture future = this.httpClient.go(new Request(HttpMethod.POST, changeRequestURL).addHeader("Accept", "application/json").addHeader("Content-Type", "application/json").setContent(this.requestBodyWriter.writeValueAsBytes(newRequests)), (HttpResponseHandler)responseHandler, new Duration(this.config.getHostTimeout().getMillis() + 5000L));
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<InputStream>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                public void onSuccess(InputStream result) {
                    boolean scheduleNextRunImmediately = true;
                    try {
                        if (responseHandler.getStatus() == 204) {
                            log.trace("Received NO CONTENT reseponse from [%s]", new Object[]{HttpLoadQueuePeon.this.serverId});
                            return;
                        }
                        if (200 == responseHandler.getStatus()) {
                            try {
                                List statuses = (List)HttpLoadQueuePeon.this.jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF);
                                log.trace("Server[%s] returned status response [%s].", new Object[]{HttpLoadQueuePeon.this.serverId, statuses});
                                Object object = HttpLoadQueuePeon.this.lock;
                                synchronized (object) {
                                    if (HttpLoadQueuePeon.this.stopped) {
                                        log.trace("Ignoring response from Server[%s]. We are already stopped.", new Object[]{HttpLoadQueuePeon.this.serverId});
                                        scheduleNextRunImmediately = false;
                                        return;
                                    }
                                    int numSuccessfulLoads = 0;
                                    long successfulLoadSize = 0L;
                                    Iterator iterator = statuses.iterator();
                                    block15: while (true) {
                                        if (!iterator.hasNext()) {
                                            if (numSuccessfulLoads <= 0) return;
                                            HttpLoadQueuePeon.this.loadingRateTracker.incrementBytesLoadedInBatch(successfulLoadSize);
                                            return;
                                        }
                                        DataSegmentChangeResponse e = (DataSegmentChangeResponse)iterator.next();
                                        switch (e.getStatus().getState()) {
                                            case SUCCESS: {
                                                if (e.getRequest() instanceof SegmentChangeRequestLoad) {
                                                    ++numSuccessfulLoads;
                                                    successfulLoadSize += ((SegmentChangeRequestLoad)e.getRequest()).getSegment().getSize();
                                                }
                                            }
                                            case FAILED: {
                                                HttpLoadQueuePeon.this.handleResponseStatus(e.getRequest(), e.getStatus());
                                                continue block15;
                                            }
                                            case PENDING: {
                                                log.trace("Request[%s] is still pending on server[%s].", new Object[]{e.getRequest(), HttpLoadQueuePeon.this.serverId});
                                                continue block15;
                                            }
                                        }
                                        scheduleNextRunImmediately = false;
                                        log.error("Server[%s] returned unknown state in status[%s].", new Object[]{HttpLoadQueuePeon.this.serverId, e.getStatus()});
                                    }
                                }
                            }
                            catch (Exception ex) {
                                scheduleNextRunImmediately = false;
                                this.logRequestFailure(ex);
                                return;
                            }
                        }
                        scheduleNextRunImmediately = false;
                        this.logRequestFailure((Throwable)new RE("Unexpected Response Status.", new Object[0]));
                        return;
                    }
                    finally {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                        if (scheduleNextRunImmediately) {
                            HttpLoadQueuePeon.this.processingExecutor.execute(() -> HttpLoadQueuePeon.this.doSegmentManagement());
                        }
                    }
                }

                public void onFailure(Throwable t) {
                    try {
                        this.logRequestFailure(t);
                    }
                    finally {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                    }
                }

                private void logRequestFailure(Throwable t) {
                    log.error(t, "Request[%s] Failed with status[%s]. Reason[%s].", new Object[]{changeRequestURL, responseHandler.getStatus(), responseHandler.getDescription()});
                }
            }, (Executor)this.processingExecutor);
        }
        catch (Throwable th) {
            log.error(th, "Error sending load/drop request to [%s].", new Object[]{this.serverId});
            this.mainLoopInProgress.set(false);
        }
    }

    @VisibleForTesting
    int calculateBatchSize(SegmentLoadingMode loadingMode) {
        int batchSize = SegmentLoadingMode.TURBO.equals((Object)loadingMode) ? this.serverCapabilities.getNumTurboLoadingThreads() : Configs.valueOrDefault((Integer)this.config.getBatchSize(), (int)this.serverCapabilities.getNumLoadingThreads());
        return Math.max(batchSize, 1);
    }

    private void handleResponseStatus(DataSegmentChangeRequest changeRequest, final SegmentChangeStatus status) {
        changeRequest.go(new DataSegmentChangeHandler(){

            @Override
            public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                this.updateSuccessOrFailureInHolder((SegmentHolder)HttpLoadQueuePeon.this.segmentsToLoad.remove(segment), status);
            }

            @Override
            public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                this.updateSuccessOrFailureInHolder((SegmentHolder)HttpLoadQueuePeon.this.segmentsToDrop.remove(segment), status);
            }

            private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentChangeStatus status2) {
                if (holder == null) {
                    return;
                }
                HttpLoadQueuePeon.this.queuedSegments.remove(holder);
                HttpLoadQueuePeon.this.activeRequestSegments.remove(holder.getSegment());
                if (status2.getState() == SegmentChangeStatus.State.FAILED) {
                    HttpLoadQueuePeon.this.onRequestFailed(holder, status2);
                } else {
                    HttpLoadQueuePeon.this.onRequestCompleted(holder, RequestStatus.SUCCESS, status2);
                }
            }
        }, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start() {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                throw new ISE("Can't start.", new Object[0]);
            }
            ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.processingExecutor, (Duration)this.config.getRepeatDelay(), () -> {
                if (!this.stopped) {
                    this.doSegmentManagement();
                }
                if (this.stopped) {
                    return ScheduledExecutors.Signal.STOP;
                }
                return ScheduledExecutors.Signal.REPEAT;
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            if (!this.queuedSegments.isEmpty()) {
                this.queuedSegments.forEach(holder -> this.onRequestCompleted((SegmentHolder)holder, RequestStatus.CANCELLED, SegmentChangeStatus.failed("cancelled")));
            }
            this.segmentsToDrop.clear();
            this.segmentsToLoad.clear();
            this.queuedSegments.clear();
            this.activeRequestSegments.clear();
            this.queuedSize.set(0L);
            this.loadingRateTracker.stop();
            this.stats.get().clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallback callback) {
        if (!action.isLoad()) {
            log.warn("Invalid load action[%s] for segment[%s] on server[%s].", new Object[]{action, segment.getId(), this.serverId});
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                log.warn("Server[%s] cannot load segment[%s] because load queue peon is stopped.", new Object[]{this.serverId, segment.getId()});
                if (callback != null) {
                    callback.execute(false);
                }
                return;
            }
            SegmentHolder holder = (SegmentHolder)this.segmentsToLoad.get(segment);
            if (holder == null) {
                this.queuedSize.addAndGet(segment.getSize());
                holder = new SegmentHolder(segment, action, this.config.getLoadTimeout(), callback);
                this.segmentsToLoad.put(segment, holder);
                this.queuedSegments.add(holder);
                this.processingExecutor.execute(this::doSegmentManagement);
                this.incrementStat(holder, RequestStatus.ASSIGNED, null);
            } else {
                holder.addCallback(callback);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void dropSegment(DataSegment segment, LoadPeonCallback callback) {
        Object object = this.lock;
        synchronized (object) {
            if (this.stopped) {
                log.warn("Server[%s] cannot drop segment[%s] because load queue peon is stopped.", new Object[]{this.serverId, segment.getId()});
                if (callback != null) {
                    callback.execute(false);
                }
                return;
            }
            SegmentHolder holder = (SegmentHolder)this.segmentsToDrop.get(segment);
            if (holder == null) {
                log.trace("Server[%s] to drop segment[%s] queued.", new Object[]{this.serverId, segment.getId()});
                holder = new SegmentHolder(segment, SegmentAction.DROP, this.config.getLoadTimeout(), callback);
                this.segmentsToDrop.put(segment, holder);
                this.queuedSegments.add(holder);
                this.processingExecutor.execute(this::doSegmentManagement);
                this.incrementStat(holder, RequestStatus.ASSIGNED, null);
            } else {
                holder.addCallback(callback);
            }
        }
    }

    @Override
    public Set<DataSegment> getSegmentsToLoad() {
        return Collections.unmodifiableSet(this.segmentsToLoad.keySet());
    }

    @Override
    public Set<DataSegment> getSegmentsToDrop() {
        return Collections.unmodifiableSet(this.segmentsToDrop.keySet());
    }

    @Override
    public Set<DataSegment> getTimedOutSegments() {
        return Collections.emptySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<SegmentHolder> getSegmentsInQueue() {
        HashSet<SegmentHolder> segmentsInQueue;
        Object object = this.lock;
        synchronized (object) {
            segmentsInQueue = new HashSet<SegmentHolder>(this.queuedSegments);
        }
        return segmentsInQueue;
    }

    @Override
    public long getSizeOfSegmentsToLoad() {
        return this.queuedSize.get();
    }

    @Override
    public long getLoadRateKbps() {
        return this.loadingRateTracker.getMovingAverageLoadRateKbps();
    }

    @Override
    public CoordinatorRunStats getAndResetStats() {
        return this.stats.getAndSet(new CoordinatorRunStats());
    }

    @Override
    public void markSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.add(dataSegment);
    }

    @Override
    public void unmarkSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.remove(dataSegment);
    }

    @Override
    public Set<DataSegment> getSegmentsMarkedToDrop() {
        return Collections.unmodifiableSet(this.segmentsMarkedToDrop);
    }

    private void onRequestFailed(SegmentHolder holder, SegmentChangeStatus status) {
        log.error("Server[%s] failed segment[%s] request[%s] with cause [%s].", new Object[]{this.serverId, holder.getSegment().getId(), holder.getAction(), status.getFailureCause()});
        this.onRequestCompleted(holder, RequestStatus.FAILED, status);
    }

    private void onRequestCompleted(SegmentHolder holder, RequestStatus status, SegmentChangeStatus changeStatus) {
        SegmentAction action = holder.getAction();
        log.trace("Server[%s] completed request[%s] on segment[%s] with status[%s].", new Object[]{this.serverId, action, holder.getSegment().getId(), status});
        if (holder.isLoad()) {
            this.queuedSize.addAndGet(-holder.getSegment().getSize());
        }
        this.incrementStat(holder, status, changeStatus);
        this.executeCallbacks(holder, status == RequestStatus.SUCCESS);
    }

    private void incrementStat(SegmentHolder holder, RequestStatus status, SegmentChangeStatus changeStatus) {
        Object description = holder.getAction().name();
        if (changeStatus != null && changeStatus.getLoadingMode() != null) {
            description = (String)description + ": " + changeStatus.getLoadingMode().name();
        }
        RowKey rowKey = RowKey.with(Dimension.DATASOURCE, holder.getSegment().getDataSource()).and(Dimension.DESCRIPTION, (String)description);
        this.stats.get().add(status.datasourceStat, rowKey, 1L);
    }

    private void executeCallbacks(SegmentHolder holder, boolean success) {
        this.callBackExecutor.execute(() -> {
            for (LoadPeonCallback callback : holder.getCallbacks()) {
                callback.execute(success);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean cancelOperation(DataSegment segment) {
        Object object = this.lock;
        synchronized (object) {
            SegmentHolder holder;
            if (this.activeRequestSegments.contains(segment)) {
                return false;
            }
            SegmentHolder segmentHolder = holder = this.segmentsToLoad.containsKey(segment) ? (SegmentHolder)this.segmentsToLoad.remove(segment) : (SegmentHolder)this.segmentsToDrop.remove(segment);
            if (holder == null) {
                return false;
            }
            this.queuedSegments.remove(holder);
            this.onRequestCompleted(holder, RequestStatus.CANCELLED, SegmentChangeStatus.failed("cancelled"));
            return true;
        }
    }

    private static enum RequestStatus {
        ASSIGNED(Stats.SegmentQueue.ASSIGNED_ACTIONS),
        SUCCESS(Stats.SegmentQueue.COMPLETED_ACTIONS),
        FAILED(Stats.SegmentQueue.FAILED_ACTIONS),
        CANCELLED(Stats.SegmentQueue.CANCELLED_ACTIONS);

        final CoordinatorStat datasourceStat;

        private RequestStatus(CoordinatorStat datasourceStat) {
            this.datasourceStat = datasourceStat;
        }
    }
}

