/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventprocessorhost.BaseLease;
import com.microsoft.azure.eventprocessorhost.Closable;
import com.microsoft.azure.eventprocessorhost.CompleteLease;
import com.microsoft.azure.eventprocessorhost.HostContext;
import com.microsoft.azure.eventprocessorhost.LoggingUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionScanner
extends Closable {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PartitionScanner.class);
    private static final Random RANDOMIZER = new Random();
    private final HostContext hostContext;
    private final Consumer<CompleteLease> addPump;
    private List<BaseLease> allLeaseStates = null;
    private int desiredCount;
    private int unownedCount;
    private final ConcurrentHashMap<String, BaseLease> leasesOwnedByOthers;

    PartitionScanner(HostContext hostContext, Consumer<CompleteLease> addPump, Closable parent) {
        super(parent);
        this.hostContext = hostContext;
        this.addPump = addPump;
        this.desiredCount = 0;
        this.unownedCount = 0;
        this.leasesOwnedByOthers = new ConcurrentHashMap();
    }

    public CompletableFuture<Boolean> scan(boolean isFirst) {
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.getAllLeaseStates().thenComposeAsync(unused -> {
            this.throwIfClosingOrClosed("PartitionScanner is shutting down");
            int ourLeasesCount = this.sortLeasesAndCalculateDesiredCount(isFirst);
            return this.acquireExpiredInChunksParallel(0, this.desiredCount - ourLeasesCount);
        }, (Executor)this.hostContext.getExecutor())).thenApplyAsync(remainingNeeded -> {
            this.throwIfClosingOrClosed("PartitionScanner is shutting down");
            ArrayList<Object> stealThese = new ArrayList();
            if (remainingNeeded > 0) {
                TRACE_LOGGER.debug(this.hostContext.withHost("Looking to steal: " + remainingNeeded));
                stealThese = this.findLeasesToSteal((int)remainingNeeded);
            }
            return stealThese;
        }, (Executor)this.hostContext.getExecutor())).thenComposeAsync(stealThese -> {
            this.throwIfClosingOrClosed("PartitionScanner is shutting down");
            return this.stealLeases((List<BaseLease>)stealThese);
        }, (Executor)this.hostContext.getExecutor())).handleAsync((didSteal, e) -> {
            if (e != null && !(e instanceof Closable.ClosingException)) {
                StringBuilder outAction = new StringBuilder();
                Exception notifyWith = (Exception)LoggingUtils.unwrapException(e, outAction);
                TRACE_LOGGER.warn(this.hostContext.withHost("Exception scanning leases"), (Throwable)notifyWith);
                this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), notifyWith, outAction.toString(), "N/A");
                didSteal = false;
            }
            return didSteal;
        }, (Executor)this.hostContext.getExecutor());
    }

    private CompletableFuture<Void> getAllLeaseStates() {
        this.throwIfClosingOrClosed("PartitionScanner is shutting down");
        return this.hostContext.getLeaseManager().getAllLeases().thenAcceptAsync(states -> {
            this.throwIfClosingOrClosed("PartitionScanner is shutting down");
            this.allLeaseStates = states;
            Collections.sort(this.allLeaseStates);
        }, (Executor)this.hostContext.getExecutor());
    }

    private int sortLeasesAndCalculateDesiredCount(boolean isFirst) {
        TRACE_LOGGER.debug(this.hostContext.withHost("Accounting input: allLeaseStates size is " + this.allLeaseStates.size()));
        HashSet<String> uniqueOwners = new HashSet<String>();
        uniqueOwners.add(this.hostContext.getHostName());
        int ourLeasesCount = 0;
        this.unownedCount = 0;
        for (BaseLease info : this.allLeaseStates) {
            boolean ownedByUs;
            boolean bl = ownedByUs = info.getIsOwned() && info.getOwner() != null && info.getOwner().compareTo(this.hostContext.getHostName()) == 0;
            if (info.getIsOwned() && info.getOwner() != null) {
                uniqueOwners.add(info.getOwner());
            } else {
                ++this.unownedCount;
            }
            if (ownedByUs) {
                ++ourLeasesCount;
                continue;
            }
            if (!info.getIsOwned()) continue;
            this.leasesOwnedByOthers.put(info.getPartitionId(), info);
        }
        int hostCount = uniqueOwners.size();
        int countPerHost = this.allLeaseStates.size() / hostCount;
        int n = this.desiredCount = isFirst ? 1 : countPerHost;
        if (!isFirst && this.unownedCount > 0 && this.unownedCount < hostCount && this.allLeaseStates.size() % hostCount != 0) {
            ++this.desiredCount;
        }
        ArrayList sortedHosts = new ArrayList(uniqueOwners);
        Collections.sort(sortedHosts);
        int hostOrdinal = -1;
        int startingPoint = 0;
        if (isFirst) {
            startingPoint = RANDOMIZER.nextInt(this.allLeaseStates.size());
        } else {
            for (hostOrdinal = 0; hostOrdinal < sortedHosts.size() && ((String)sortedHosts.get(hostOrdinal)).compareTo(this.hostContext.getHostName()) != 0; ++hostOrdinal) {
            }
            startingPoint = countPerHost * hostOrdinal;
        }
        TRACE_LOGGER.debug(this.hostContext.withHost("Host ordinal: " + hostOrdinal + "  Rotating leases to start at " + startingPoint));
        if (startingPoint != 0) {
            ArrayList<BaseLease> rotatedList = new ArrayList<BaseLease>(this.allLeaseStates.size());
            for (int j = 0; j < this.allLeaseStates.size(); ++j) {
                rotatedList.add(this.allLeaseStates.get((j + startingPoint) % this.allLeaseStates.size()));
            }
            this.allLeaseStates = rotatedList;
        }
        TRACE_LOGGER.debug(this.hostContext.withHost("Host count is " + hostCount + "  Desired owned count is " + this.desiredCount));
        TRACE_LOGGER.debug(this.hostContext.withHost("ourLeasesCount " + ourLeasesCount + "  leasesOwnedByOthers " + this.leasesOwnedByOthers.size() + " unowned " + this.unownedCount));
        return ourLeasesCount;
    }

    private CompletableFuture<List<BaseLease>> findExpiredLeases(int startAt, int endAt) {
        ArrayList<BaseLease> expiredLeases = new ArrayList<BaseLease>();
        TRACE_LOGGER.debug(this.hostContext.withHost("Finding expired leases from '" + this.allLeaseStates.get(startAt).getPartitionId() + "'[" + startAt + "] up to '" + (endAt < this.allLeaseStates.size() ? this.allLeaseStates.get(endAt).getPartitionId() : "end") + "'[" + endAt + "]"));
        for (BaseLease info : this.allLeaseStates.subList(startAt, endAt)) {
            if (info.getIsOwned()) continue;
            expiredLeases.add(info);
        }
        TRACE_LOGGER.debug(this.hostContext.withHost("Found in range: " + expiredLeases.size()));
        return CompletableFuture.completedFuture(expiredLeases);
    }

    private CompletableFuture<Integer> acquireExpiredInChunksParallel(int startAt, int needed) {
        this.throwIfClosingOrClosed("PartitionScanner is shutting down");
        CompletionStage<Integer> resultFuture = CompletableFuture.completedFuture(needed);
        if (startAt < this.allLeaseStates.size()) {
            TRACE_LOGGER.debug(this.hostContext.withHost("Examining chunk at '" + this.allLeaseStates.get(startAt).getPartitionId() + "'[" + startAt + "] need " + needed));
        } else {
            TRACE_LOGGER.debug(this.hostContext.withHost("Examining chunk skipping, startAt is off end: " + startAt));
        }
        if (needed > 0 && this.unownedCount > 0 && startAt < this.allLeaseStates.size()) {
            AtomicInteger runningNeeded = new AtomicInteger(needed);
            int endAt = Math.min(startAt + needed, this.allLeaseStates.size());
            resultFuture = ((CompletableFuture)((CompletableFuture)this.findExpiredLeases(startAt, endAt).thenComposeAsync(getThese -> {
                this.throwIfClosingOrClosed("PartitionScanner is shutting down");
                CompletableFuture<Object> acquireFuture = CompletableFuture.completedFuture(null);
                if (getThese.size() > 0) {
                    ArrayList<CompletionStage> getFutures = new ArrayList<CompletionStage>();
                    for (BaseLease info : getThese) {
                        this.throwIfClosingOrClosed("PartitionScanner is shutting down");
                        AcquisitionHolder holder = new AcquisitionHolder();
                        CompletionStage getOneFuture = ((CompletableFuture)this.hostContext.getLeaseManager().getLease(info.getPartitionId()).thenComposeAsync(lease -> {
                            this.throwIfClosingOrClosed("PartitionScanner is shutting down");
                            holder.setAcquiredLease((CompleteLease)lease);
                            return this.hostContext.getLeaseManager().acquireLease((CompleteLease)lease);
                        }, (Executor)this.hostContext.getExecutor())).thenAcceptAsync(acquired -> {
                            this.throwIfClosingOrClosed("PartitionScanner is shutting down");
                            if (acquired.booleanValue()) {
                                runningNeeded.decrementAndGet();
                                TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(holder.getAcquiredLease().getPartitionId(), "Acquired unowned/expired"));
                                if (this.leasesOwnedByOthers.containsKey(holder.getAcquiredLease().getPartitionId())) {
                                    this.leasesOwnedByOthers.remove(holder.getAcquiredLease().getPartitionId());
                                    --this.unownedCount;
                                }
                                this.addPump.accept(holder.getAcquiredLease());
                            } else {
                                this.leasesOwnedByOthers.put(holder.getAcquiredLease().getPartitionId(), holder.getAcquiredLease());
                            }
                        }, (Executor)this.hostContext.getExecutor());
                        getFutures.add(getOneFuture);
                    }
                    CompletableFuture[] dummy = new CompletableFuture[getFutures.size()];
                    acquireFuture = CompletableFuture.allOf(getFutures.toArray(dummy));
                }
                return acquireFuture;
            }, (Executor)this.hostContext.getExecutor())).handleAsync((empty, e) -> {
                if (e != null && !(e instanceof Closable.ClosingException)) {
                    Exception notifyWith = (Exception)LoggingUtils.unwrapException(e, null);
                    TRACE_LOGGER.warn(this.hostContext.withHost("Failure getting/acquiring lease, continuing"), (Throwable)notifyWith);
                    this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), notifyWith, "Checking Leases", "N/A");
                }
                return null;
            }, (Executor)this.hostContext.getExecutor())).thenComposeAsync(unused -> this.acquireExpiredInChunksParallel(endAt, runningNeeded.get()), (Executor)this.hostContext.getExecutor());
        } else {
            TRACE_LOGGER.debug(this.hostContext.withHost("Short circuit: needed is 0, unowned is 0, or off end"));
        }
        return resultFuture;
    }

    private ArrayList<BaseLease> findLeasesToSteal(int stealAsk) {
        HashMap<String, Integer> hostOwns = new HashMap<String, Integer>();
        for (BaseLease baseLease : this.leasesOwnedByOthers.values()) {
            if (hostOwns.containsKey(baseLease.getOwner())) {
                int n = (Integer)hostOwns.get(baseLease.getOwner()) + 1;
                hostOwns.put(baseLease.getOwner(), n);
                continue;
            }
            hostOwns.put(baseLease.getOwner(), 1);
        }
        ArrayList bigOwners = new ArrayList();
        for (Map.Entry entry : hostOwns.entrySet()) {
            if ((Integer)entry.getValue() <= this.desiredCount) continue;
            bigOwners.add(entry.getKey());
            TRACE_LOGGER.debug(this.hostContext.withHost("Big owner " + (String)entry.getKey() + " has " + entry.getValue()));
        }
        ArrayList<BaseLease> arrayList = new ArrayList<BaseLease>();
        if (bigOwners.size() > 0) {
            String string = (String)bigOwners.get(RANDOMIZER.nextInt(bigOwners.size()));
            int victimExtra = (Integer)hostOwns.get(string) - this.desiredCount - 1;
            int stealCount = Math.min(victimExtra, stealAsk);
            TRACE_LOGGER.debug(this.hostContext.withHost("Stealing " + stealCount + " from " + string));
            for (BaseLease candidate : this.allLeaseStates) {
                if (candidate.getOwner() == null || candidate.getOwner().compareTo(string) != 0) continue;
                arrayList.add(candidate);
                if (arrayList.size() < stealCount) continue;
                break;
            }
        } else {
            TRACE_LOGGER.debug(this.hostContext.withHost("No big owners found, skipping steal"));
        }
        return arrayList;
    }

    private CompletableFuture<Boolean> stealLeases(List<BaseLease> stealThese) {
        CompletionStage<Boolean> allSteals = CompletableFuture.completedFuture(false);
        if (stealThese.size() > 0) {
            ArrayList<CompletionStage> steals = new ArrayList<CompletionStage>();
            for (BaseLease info : stealThese) {
                this.throwIfClosingOrClosed("PartitionScanner is shutting down");
                AcquisitionHolder holder = new AcquisitionHolder();
                CompletionStage oneSteal = ((CompletableFuture)this.hostContext.getLeaseManager().getLease(info.getPartitionId()).thenComposeAsync(lease -> {
                    this.throwIfClosingOrClosed("PartitionScanner is shutting down");
                    holder.setAcquiredLease((CompleteLease)lease);
                    return this.hostContext.getLeaseManager().acquireLease((CompleteLease)lease);
                }, (Executor)this.hostContext.getExecutor())).thenAcceptAsync(acquired -> {
                    this.throwIfClosingOrClosed("PartitionScanner is shutting down");
                    if (acquired.booleanValue()) {
                        TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(holder.getAcquiredLease().getPartitionId(), "Stole lease"));
                        this.addPump.accept(holder.getAcquiredLease());
                    }
                }, (Executor)this.hostContext.getExecutor());
                steals.add(oneSteal);
            }
            CompletableFuture[] dummy = new CompletableFuture[steals.size()];
            allSteals = CompletableFuture.allOf(steals.toArray(dummy)).thenApplyAsync(empty -> true, (Executor)this.hostContext.getExecutor());
        }
        return allSteals;
    }

    private static class AcquisitionHolder {
        private CompleteLease acquiredLease;

        private AcquisitionHolder() {
        }

        void setAcquiredLease(CompleteLease l) {
            this.acquiredLease = l;
        }

        CompleteLease getAcquiredLease() {
            return this.acquiredLease;
        }
    }
}

