/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.heartbeat;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatMonitor;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

@ThreadSafe
class HeartbeatManagerImpl<I, O>
implements HeartbeatManager<I, O> {
    private final long heartbeatTimeoutIntervalMs;
    private final int failedRpcRequestsUntilUnreachable;
    private final ResourceID ownResourceID;
    private final HeartbeatListener<I, O> heartbeatListener;
    private final ScheduledExecutor mainThreadExecutor;
    protected final Logger log;
    private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
    private final HeartbeatMonitor.Factory<O> heartbeatMonitorFactory;
    protected volatile boolean stopped;

    public HeartbeatManagerImpl(long heartbeatTimeoutIntervalMs, int failedRpcRequestsUntilUnreachable, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log) {
        this(heartbeatTimeoutIntervalMs, failedRpcRequestsUntilUnreachable, ownResourceID, heartbeatListener, mainThreadExecutor, log, new DefaultHeartbeatMonitor.Factory());
    }

    public HeartbeatManagerImpl(long heartbeatTimeoutIntervalMs, int failedRpcRequestsUntilUnreachable, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log, HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
        Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
        this.failedRpcRequestsUntilUnreachable = failedRpcRequestsUntilUnreachable;
        this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
        this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener, "heartbeatListener");
        this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
        this.log = Preconditions.checkNotNull(log);
        this.heartbeatMonitorFactory = heartbeatMonitorFactory;
        this.heartbeatTargets = new ConcurrentHashMap(16);
        this.stopped = false;
    }

    ResourceID getOwnResourceID() {
        return this.ownResourceID;
    }

    HeartbeatListener<I, O> getHeartbeatListener() {
        return this.heartbeatListener;
    }

    Map<ResourceID, HeartbeatMonitor<O>> getHeartbeatTargets() {
        return this.heartbeatTargets;
    }

    @Override
    public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
        if (!this.stopped) {
            if (this.heartbeatTargets.containsKey(resourceID)) {
                this.log.debug("The target with resource ID {} is already been monitored.", (Object)resourceID.getStringWithMetadata());
            } else {
                HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatMonitorFactory.createHeartbeatMonitor(resourceID, heartbeatTarget, this.mainThreadExecutor, this.heartbeatListener, this.heartbeatTimeoutIntervalMs, this.failedRpcRequestsUntilUnreachable);
                this.heartbeatTargets.put(resourceID, heartbeatMonitor);
                if (this.stopped) {
                    heartbeatMonitor.cancel();
                    this.heartbeatTargets.remove(resourceID);
                }
            }
        }
    }

    @Override
    public void unmonitorTarget(ResourceID resourceID) {
        HeartbeatMonitor<O> heartbeatMonitor;
        if (!this.stopped && (heartbeatMonitor = this.heartbeatTargets.remove(resourceID)) != null) {
            heartbeatMonitor.cancel();
        }
    }

    @Override
    public void stop() {
        this.stopped = true;
        for (HeartbeatMonitor<O> heartbeatMonitor : this.heartbeatTargets.values()) {
            heartbeatMonitor.cancel();
        }
        this.heartbeatTargets.clear();
    }

    @Override
    public long getLastHeartbeatFrom(ResourceID resourceId) {
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceId);
        if (heartbeatMonitor != null) {
            return heartbeatMonitor.getLastHeartbeat();
        }
        return -1L;
    }

    ScheduledExecutor getMainThreadExecutor() {
        return this.mainThreadExecutor;
    }

    @Override
    public CompletableFuture<Void> receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {
        if (!this.stopped) {
            this.log.debug("Received heartbeat from {}.", (Object)heartbeatOrigin);
            this.reportHeartbeat(heartbeatOrigin);
            if (heartbeatPayload != null) {
                this.heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
            }
        }
        return FutureUtils.completedVoidFuture();
    }

    @Override
    public CompletableFuture<Void> requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload) {
        if (!this.stopped) {
            this.log.debug("Received heartbeat request from {}.", (Object)requestOrigin);
            HeartbeatTarget<O> heartbeatTarget = this.reportHeartbeat(requestOrigin);
            if (heartbeatTarget != null) {
                if (heartbeatPayload != null) {
                    this.heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                }
                heartbeatTarget.receiveHeartbeat(this.getOwnResourceID(), this.heartbeatListener.retrievePayload(requestOrigin)).whenCompleteAsync((BiConsumer)this.handleHeartbeatRpc(requestOrigin), (Executor)this.mainThreadExecutor);
            }
        }
        return FutureUtils.completedVoidFuture();
    }

    protected BiConsumer<Void, Throwable> handleHeartbeatRpc(ResourceID heartbeatTarget) {
        return (unused, failure) -> {
            if (failure != null) {
                this.handleHeartbeatRpcFailure(heartbeatTarget, ExceptionUtils.stripCompletionException(failure));
            } else {
                this.handleHeartbeatRpcSuccess(heartbeatTarget);
            }
        };
    }

    private void handleHeartbeatRpcSuccess(ResourceID heartbeatTarget) {
        this.runIfHeartbeatMonitorExists(heartbeatTarget, HeartbeatMonitor::reportHeartbeatRpcSuccess);
    }

    private void handleHeartbeatRpcFailure(ResourceID heartbeatTarget, Throwable failure) {
        if (failure instanceof RecipientUnreachableException) {
            this.reportHeartbeatTargetUnreachable(heartbeatTarget);
        }
    }

    private void reportHeartbeatTargetUnreachable(ResourceID heartbeatTarget) {
        this.runIfHeartbeatMonitorExists(heartbeatTarget, HeartbeatMonitor::reportHeartbeatRpcFailure);
    }

    private void runIfHeartbeatMonitorExists(ResourceID heartbeatTarget, Consumer<? super HeartbeatMonitor<?>> action) {
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(heartbeatTarget);
        if (heartbeatMonitor != null) {
            action.accept(heartbeatMonitor);
        }
    }

    HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
        if (this.heartbeatTargets.containsKey(resourceID)) {
            HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceID);
            heartbeatMonitor.reportHeartbeat();
            return heartbeatMonitor.getHeartbeatTarget();
        }
        return null;
    }
}

