/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderelection.LeaderElectionEventHandler;
import org.apache.flink.runtime.leaderelection.LeaderElectionException;
import org.apache.flink.runtime.leaderelection.LeaderElectionUtils;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultLeaderElectionService
extends AbstractLeaderElectionService
implements LeaderElectionEventHandler,
AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class);
    private final Object lock = new Object();
    private final LeaderElectionDriverFactory leaderElectionDriverFactory;
    @GuardedBy(value="lock")
    private LeaderContender leaderContender;
    @Nullable
    @GuardedBy(value="lock")
    private UUID issuedLeaderSessionID;
    @GuardedBy(value="lock")
    private LeaderInformation confirmedLeaderInformation;
    @GuardedBy(value="lock")
    private LeaderElectionDriver leaderElectionDriver;
    @GuardedBy(value="lock")
    private final ExecutorService leadershipOperationExecutor;

    public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory) {
        this(leaderElectionDriverFactory, Executors.newSingleThreadExecutor((ThreadFactory)new ExecutorThreadFactory("DefaultLeaderElectionService-leadershipOperationExecutor")));
    }

    @VisibleForTesting
    DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory, ExecutorService leadershipOperationExecutor) {
        this.leaderElectionDriverFactory = (LeaderElectionDriverFactory)Preconditions.checkNotNull((Object)leaderElectionDriverFactory);
        this.leaderContender = null;
        this.issuedLeaderSessionID = null;
        this.leaderElectionDriver = null;
        this.confirmedLeaderInformation = LeaderInformation.empty();
        this.leadershipOperationExecutor = (ExecutorService)Preconditions.checkNotNull((Object)leadershipOperationExecutor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startLeaderElectionBackend() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.leaderContender == null ? 1 : 0) != 0, (Object)"No LeaderContender should have been registered, yet.");
            this.leaderElectionDriver = this.leaderElectionDriverFactory.createLeaderElectionDriver(this, new LeaderElectionFatalErrorHandler());
            LOG.info("Instantiating DefaultLeaderElectionService with {}.", (Object)this.leaderElectionDriver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void register(LeaderContender contender) throws Exception {
        Preconditions.checkNotNull((Object)contender, (String)"Contender must not be null.");
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.leaderContender == null ? 1 : 0) != 0, (Object)"Only one LeaderContender is allowed to be registered to this service.");
            Preconditions.checkState((this.leaderElectionDriver != null ? 1 : 0) != 0, (Object)"The DefaultLeaderElectionService should have established a connection to the backend before it's started.");
            this.leaderContender = contender;
            LOG.info("LeaderContender {} has been registered for {}.", (Object)contender.getDescription(), (Object)this.leaderElectionDriver);
            if (this.issuedLeaderSessionID != null) {
                this.runInLeaderEventThread(() -> this.notifyLeaderContenderOfLeadership(this.issuedLeaderSessionID));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected final void remove(LeaderContender contender) {
        Preconditions.checkArgument((contender == this.leaderContender ? 1 : 0) != 0);
        LOG.info("Stopping DefaultLeaderElectionService.");
        Object object = this.lock;
        synchronized (object) {
            if (this.leaderContender == null) {
                LOG.debug("The stop procedure was called on an already stopped DefaultLeaderElectionService instance. No action necessary.");
                return;
            }
            if (this.issuedLeaderSessionID != null) {
                this.notifyLeaderContenderOfLeadershipLoss();
                LOG.debug("DefaultLeaderElectionService is stopping while having the leadership acquired. The revoke event is forwarded to the LeaderContender.");
                if (this.leaderElectionDriver.hasLeadership()) {
                    this.leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty());
                    LOG.debug("Leader information is cleaned up while stopping.");
                }
            } else {
                Preconditions.checkState((boolean)this.confirmedLeaderInformation.isEmpty(), (Object)"The confirmed leader information should have been cleared.");
                LOG.debug("DefaultLeaderElectionService is stopping while not having the leadership acquired. No cleanup necessary.");
            }
            this.leaderContender = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((this.leaderContender == null ? 1 : 0) != 0, (Object)"The DefaultLeaderElectionService should have been stopped before closing the instance.");
            this.issuedLeaderSessionID = null;
            if (this.leaderElectionDriver != null) {
                this.leaderElectionDriver.close();
                this.leaderElectionDriver = null;
                List<Runnable> outstandingEventHandlingCalls = ((ExecutorService)Preconditions.checkNotNull((Object)this.leadershipOperationExecutor)).shutdownNow();
                if (!outstandingEventHandlingCalls.isEmpty()) {
                    LOG.debug("The DefaultLeaderElectionService was closed with {} still not being processed. No further action necessary.", (Object)(outstandingEventHandlingCalls.size() == 1 ? "one event" : outstandingEventHandlingCalls.size() + " events"));
                }
            } else {
                LOG.debug("The HA backend connection isn't established. No actions taken.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
        LOG.debug("Confirm leader session ID {} for leader {}.", (Object)leaderSessionID, (Object)leaderAddress);
        Preconditions.checkNotNull((Object)leaderSessionID);
        Object object = this.lock;
        synchronized (object) {
            if (this.hasLeadership(leaderSessionID)) {
                Preconditions.checkState((boolean)this.confirmedLeaderInformation.isEmpty(), (Object)"No confirmation should have happened, yet.");
                this.confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, leaderAddress);
                this.leaderElectionDriver.writeLeaderInformation(this.confirmedLeaderInformation);
            } else if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
                LOG.debug("Receive an old confirmation call of leader session ID {}, current issued session ID is {}", (Object)leaderSessionID, (Object)this.issuedLeaderSessionID);
            } else {
                LOG.warn("The leader session ID {} was confirmed even though the corresponding service was not elected as the leader or has been stopped already.", (Object)leaderSessionID);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected boolean hasLeadership(UUID leaderSessionId) {
        Object object = this.lock;
        synchronized (object) {
            if (this.leaderElectionDriver != null) {
                if (this.leaderContender != null) {
                    return this.leaderElectionDriver.hasLeadership() && leaderSessionId.equals(this.issuedLeaderSessionID);
                }
                LOG.debug("hasLeadership is called after the service is stopped, returning false.");
                return false;
            }
            LOG.debug("hasLeadership is called after the service is closed, returning false.");
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    @VisibleForTesting
    public UUID getLeaderSessionID() {
        Object object = this.lock;
        synchronized (object) {
            return this.confirmedLeaderInformation.getLeaderSessionID();
        }
    }

    @Override
    public void onGrantLeadership(UUID newLeaderSessionId) {
        this.runInLeaderEventThread(() -> this.onGrantLeadershipInternal(newLeaderSessionId));
    }

    @GuardedBy(value="lock")
    private void onGrantLeadershipInternal(UUID newLeaderSessionId) {
        Preconditions.checkNotNull((Object)newLeaderSessionId);
        Preconditions.checkState((this.issuedLeaderSessionID == null ? 1 : 0) != 0, (Object)"The leadership should have been granted while not having the leadership acquired.");
        this.issuedLeaderSessionID = newLeaderSessionId;
        this.notifyLeaderContenderOfLeadership(this.issuedLeaderSessionID);
    }

    @GuardedBy(value="lock")
    private void notifyLeaderContenderOfLeadership(UUID sessionID) {
        if (this.leaderContender == null) {
            LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.", (Object)sessionID, (Object)this.leaderElectionDriver);
            return;
        }
        if (!sessionID.equals(this.issuedLeaderSessionID)) {
            LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.", (Object)sessionID, (Object)this.issuedLeaderSessionID);
            return;
        }
        Preconditions.checkState((boolean)this.confirmedLeaderInformation.isEmpty(), (Object)"The leadership should have been granted while not having the leadership acquired.");
        LOG.debug("Granting leadership to contender {} with session ID {}.", (Object)this.leaderContender.getDescription(), (Object)this.issuedLeaderSessionID);
        this.leaderContender.grantLeadership(this.issuedLeaderSessionID);
    }

    @Override
    public void onRevokeLeadership() {
        this.runInLeaderEventThread(this::onRevokeLeadershipInternal);
    }

    @GuardedBy(value="lock")
    private void onRevokeLeadershipInternal() {
        if (this.leaderContender != null) {
            this.notifyLeaderContenderOfLeadershipLoss();
        } else {
            LOG.debug("The revoke leadership for session {} notification is not forwarded because the DefaultLeaderElectionService({}) has no contender registered.", (Object)this.issuedLeaderSessionID, (Object)this.leaderElectionDriver);
        }
        this.issuedLeaderSessionID = null;
    }

    @GuardedBy(value="lock")
    private void notifyLeaderContenderOfLeadershipLoss() {
        Preconditions.checkState((this.leaderContender != null ? 1 : 0) != 0, (Object)"The LeaderContender should be always set when calling this method.");
        if (this.confirmedLeaderInformation.isEmpty()) {
            LOG.debug("Revoking leadership to contender {} while a previous leadership grant wasn't confirmed, yet.", (Object)this.leaderContender.getDescription());
        } else {
            LOG.debug("Revoking leadership to contender {} for {}.", (Object)this.leaderContender.getDescription(), (Object)LeaderElectionUtils.convertToString(this.confirmedLeaderInformation));
        }
        this.confirmedLeaderInformation = LeaderInformation.empty();
        this.leaderContender.revokeLeadership();
    }

    @Override
    public void onLeaderInformationChange(LeaderInformation leaderInformation) {
        this.runInLeaderEventThread(() -> this.onLeaderInformationChangeInternal(leaderInformation));
    }

    @GuardedBy(value="lock")
    private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) {
        if (this.leaderContender != null) {
            LOG.trace("Leader node changed while {} is the leader with {}. New leader information {}.", new Object[]{this.leaderContender.getDescription(), LeaderElectionUtils.convertToString(this.confirmedLeaderInformation), LeaderElectionUtils.convertToString(leaderInformation)});
            if (!this.confirmedLeaderInformation.isEmpty()) {
                LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
                if (leaderInformation.isEmpty()) {
                    LOG.debug("Writing leader information by {} since the external storage is empty.", (Object)this.leaderContender.getDescription());
                    this.leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
                    LOG.debug("Correcting leader information by {}.", (Object)this.leaderContender.getDescription());
                    this.leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                }
            }
        } else {
            LOG.debug("Ignoring change notification since the {} has already been stopped.", (Object)this.leaderElectionDriver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runInLeaderEventThread(Runnable callback) {
        Object object = this.lock;
        synchronized (object) {
            if (!this.leadershipOperationExecutor.isShutdown()) {
                FutureUtils.handleUncaughtException(CompletableFuture.runAsync(() -> {
                    Object object = this.lock;
                    synchronized (object) {
                        callback.run();
                    }
                }, this.leadershipOperationExecutor), (thread, error) -> this.forwardErrorToLeaderContender(error));
            } else {
                LOG.debug("Leader event handling was triggered after the DefaultLeaderElectionService is closed. The event will be ignored.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void forwardErrorToLeaderContender(Throwable t) {
        Object object = this.lock;
        synchronized (object) {
            if (this.leaderContender == null) {
                LOG.debug("Ignoring error notification since there's no contender registered.");
                return;
            }
            if (t instanceof LeaderElectionException) {
                this.leaderContender.handleError((Exception)((Object)((LeaderElectionException)((Object)t))));
            } else {
                this.leaderContender.handleError((Exception)((Object)new LeaderElectionException(t)));
            }
        }
    }

    private class LeaderElectionFatalErrorHandler
    implements FatalErrorHandler {
        private LeaderElectionFatalErrorHandler() {
        }

        public void onFatalError(Throwable throwable) {
            DefaultLeaderElectionService.this.forwardErrorToLeaderContender(throwable);
        }
    }
}

