/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.heartbeat;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatMonitor;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultHeartbeatMonitor<O>
implements HeartbeatMonitor<O>,
Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultHeartbeatMonitor.class);
    private final ResourceID resourceID;
    private final HeartbeatTarget<O> heartbeatTarget;
    private final ScheduledExecutor scheduledExecutor;
    private final HeartbeatListener<?, ?> heartbeatListener;
    private final long heartbeatTimeoutIntervalMs;
    private final int failedRpcRequestsUntilUnreachable;
    private volatile ScheduledFuture<?> futureTimeout;
    private final AtomicReference<State> state = new AtomicReference<State>(State.RUNNING);
    private final AtomicInteger numberFailedRpcRequestsSinceLastSuccess = new AtomicInteger(0);
    private volatile long lastHeartbeat;

    DefaultHeartbeatMonitor(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget, ScheduledExecutor scheduledExecutor, HeartbeatListener<?, O> heartbeatListener, long heartbeatTimeoutIntervalMs, int failedRpcRequestsUntilUnreachable) {
        this.resourceID = (ResourceID)Preconditions.checkNotNull((Object)resourceID);
        this.heartbeatTarget = (HeartbeatTarget)Preconditions.checkNotNull(heartbeatTarget);
        this.scheduledExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)scheduledExecutor);
        this.heartbeatListener = (HeartbeatListener)Preconditions.checkNotNull(heartbeatListener);
        Preconditions.checkArgument((heartbeatTimeoutIntervalMs > 0L ? 1 : 0) != 0, (Object)"The heartbeat timeout interval has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
        Preconditions.checkArgument((failedRpcRequestsUntilUnreachable > 0 || failedRpcRequestsUntilUnreachable == -1 ? 1 : 0) != 0, (Object)"The number of failed heartbeat RPC requests has to be larger than 0 or -1 (deactivated).");
        this.failedRpcRequestsUntilUnreachable = failedRpcRequestsUntilUnreachable;
        this.lastHeartbeat = 0L;
        this.resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
    }

    @Override
    public HeartbeatTarget<O> getHeartbeatTarget() {
        return this.heartbeatTarget;
    }

    @Override
    public ResourceID getHeartbeatTargetId() {
        return this.resourceID;
    }

    @Override
    public long getLastHeartbeat() {
        return this.lastHeartbeat;
    }

    @Override
    public void reportHeartbeatRpcFailure() {
        int failedRpcRequestsSinceLastSuccess = this.numberFailedRpcRequestsSinceLastSuccess.incrementAndGet();
        if (this.isHeartbeatRpcFailureDetectionEnabled() && failedRpcRequestsSinceLastSuccess >= this.failedRpcRequestsUntilUnreachable && this.state.compareAndSet(State.RUNNING, State.UNREACHABLE)) {
            LOG.debug("Mark heartbeat target {} as unreachable because {} consecutive heartbeat RPCs have failed.", (Object)this.resourceID, (Object)failedRpcRequestsSinceLastSuccess);
            this.cancelTimeout();
            this.heartbeatListener.notifyTargetUnreachable(this.resourceID);
        }
    }

    private boolean isHeartbeatRpcFailureDetectionEnabled() {
        return this.failedRpcRequestsUntilUnreachable > 0;
    }

    @Override
    public void reportHeartbeatRpcSuccess() {
        this.numberFailedRpcRequestsSinceLastSuccess.set(0);
    }

    @Override
    public void reportHeartbeat() {
        this.lastHeartbeat = System.currentTimeMillis();
        this.resetHeartbeatTimeout(this.heartbeatTimeoutIntervalMs);
    }

    @Override
    public void cancel() {
        if (this.state.compareAndSet(State.RUNNING, State.CANCELED)) {
            this.cancelTimeout();
        }
    }

    @Override
    public void run() {
        if (this.state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
            this.heartbeatListener.notifyHeartbeatTimeout(this.resourceID);
        }
    }

    public boolean isCanceled() {
        return this.state.get() == State.CANCELED;
    }

    void resetHeartbeatTimeout(long heartbeatTimeout) {
        if (this.state.get() == State.RUNNING) {
            this.cancelTimeout();
            this.futureTimeout = this.scheduledExecutor.schedule((Runnable)this, heartbeatTimeout, TimeUnit.MILLISECONDS);
            if (this.state.get() != State.RUNNING) {
                this.cancelTimeout();
            }
        }
    }

    private void cancelTimeout() {
        if (this.futureTimeout != null) {
            this.futureTimeout.cancel(true);
        }
    }

    static class Factory<O>
    implements HeartbeatMonitor.Factory<O> {
        Factory() {
        }

        @Override
        public HeartbeatMonitor<O> createHeartbeatMonitor(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget, ScheduledExecutor mainThreadExecutor, HeartbeatListener<?, O> heartbeatListener, long heartbeatTimeoutIntervalMs, int failedRpcRequestsUntilUnreachable) {
            return new DefaultHeartbeatMonitor<O>(resourceID, heartbeatTarget, mainThreadExecutor, heartbeatListener, heartbeatTimeoutIntervalMs, failedRpcRequestsUntilUnreachable);
        }
    }

    private static enum State {
        RUNNING,
        TIMEOUT,
        UNREACHABLE,
        CANCELED;

    }
}

