/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.registration;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
    private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100L;
    private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000L;
    private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000L;
    private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000L;
    private final Logger log;
    private final RpcService rpcService;
    private final String targetName;
    private final Class<Gateway> targetType;
    private final String targetAddress;
    private final UUID leaderId;
    private final CompletableFuture<Tuple2<Gateway, Success>> completionFuture;
    private final long initialRegistrationTimeout;
    private final long maxRegistrationTimeout;
    private final long delayOnError;
    private final long delayOnRefusedRegistration;
    private volatile boolean canceled;

    public RetryingRegistration(Logger log, RpcService rpcService, String targetName, Class<Gateway> targetType, String targetAddress, UUID leaderId) {
        this(log, rpcService, targetName, targetType, targetAddress, leaderId, 100L, 30000L, 10000L, 30000L);
    }

    public RetryingRegistration(Logger log, RpcService rpcService, String targetName, Class<Gateway> targetType, String targetAddress, UUID leaderId, long initialRegistrationTimeout, long maxRegistrationTimeout, long delayOnError, long delayOnRefusedRegistration) {
        Preconditions.checkArgument((initialRegistrationTimeout > 0L ? 1 : 0) != 0, (Object)"initial registration timeout must be greater than zero");
        Preconditions.checkArgument((maxRegistrationTimeout > 0L ? 1 : 0) != 0, (Object)"maximum registration timeout must be greater than zero");
        Preconditions.checkArgument((delayOnError >= 0L ? 1 : 0) != 0, (Object)"delay on error must be non-negative");
        Preconditions.checkArgument((delayOnRefusedRegistration >= 0L ? 1 : 0) != 0, (Object)"delay on refused registration must be non-negative");
        this.log = (Logger)Preconditions.checkNotNull((Object)log);
        this.rpcService = (RpcService)Preconditions.checkNotNull((Object)rpcService);
        this.targetName = (String)Preconditions.checkNotNull((Object)targetName);
        this.targetType = (Class)Preconditions.checkNotNull(targetType);
        this.targetAddress = (String)Preconditions.checkNotNull((Object)targetAddress);
        this.leaderId = (UUID)Preconditions.checkNotNull((Object)leaderId);
        this.initialRegistrationTimeout = initialRegistrationTimeout;
        this.maxRegistrationTimeout = maxRegistrationTimeout;
        this.delayOnError = delayOnError;
        this.delayOnRefusedRegistration = delayOnRefusedRegistration;
        this.completionFuture = new FlinkCompletableFuture<Tuple2<Gateway, Success>>();
    }

    public Future<Tuple2<Gateway, Success>> getFuture() {
        return this.completionFuture;
    }

    public void cancel() {
        this.canceled = true;
    }

    public boolean isCanceled() {
        return this.canceled;
    }

    protected abstract Future<RegistrationResponse> invokeRegistration(Gateway var1, UUID var2, long var3) throws Exception;

    public void startRegistration() {
        try {
            Future<Gateway> resourceManagerFuture = this.rpcService.connect(this.targetAddress, this.targetType);
            Future<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>(){

                @Override
                public void accept(Gateway result) {
                    RetryingRegistration.this.log.info("Resolved {} address, beginning registration", (Object)RetryingRegistration.this.targetName);
                    RetryingRegistration.this.register(result, 1, RetryingRegistration.this.initialRegistrationTimeout);
                }
            }, this.rpcService.getExecutor());
            resourceManagerAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable failure) {
                    if (!RetryingRegistration.this.isCanceled()) {
                        RetryingRegistration.this.log.warn("Could not resolve {} address {}, retrying...", new Object[]{RetryingRegistration.this.targetName, RetryingRegistration.this.targetAddress, failure});
                        RetryingRegistration.this.startRegistration();
                    }
                    return null;
                }
            }, this.rpcService.getExecutor());
        }
        catch (Throwable t) {
            this.cancel();
            this.completionFuture.completeExceptionally(t);
        }
    }

    private void register(final Gateway gateway, final int attempt, final long timeoutMillis) {
        if (this.canceled) {
            return;
        }
        try {
            this.log.info("Registration at {} attempt {} (timeout={}ms)", new Object[]{this.targetName, attempt, timeoutMillis});
            Future<RegistrationResponse> registrationFuture = this.invokeRegistration(gateway, this.leaderId, timeoutMillis);
            Future<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>(){

                @Override
                public void accept(RegistrationResponse result) {
                    if (!RetryingRegistration.this.isCanceled()) {
                        if (result instanceof RegistrationResponse.Success) {
                            RegistrationResponse.Success success = (RegistrationResponse.Success)result;
                            RetryingRegistration.this.completionFuture.complete(Tuple2.of((Object)gateway, (Object)success));
                        } else {
                            if (result instanceof RegistrationResponse.Decline) {
                                RegistrationResponse.Decline decline = (RegistrationResponse.Decline)result;
                                RetryingRegistration.this.log.info("Registration at {} was declined: {}", (Object)RetryingRegistration.this.targetName, (Object)decline.getReason());
                            } else {
                                RetryingRegistration.this.log.error("Received unknown response to registration attempt: {}", (Object)result);
                            }
                            RetryingRegistration.this.log.info("Pausing and re-attempting registration in {} ms", (Object)RetryingRegistration.this.delayOnRefusedRegistration);
                            RetryingRegistration.this.registerLater(gateway, 1, RetryingRegistration.this.initialRegistrationTimeout, RetryingRegistration.this.delayOnRefusedRegistration);
                        }
                    }
                }
            }, this.rpcService.getExecutor());
            registrationAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>(){

                @Override
                public Void apply(Throwable failure) {
                    if (!RetryingRegistration.this.isCanceled()) {
                        if (failure instanceof TimeoutException) {
                            if (RetryingRegistration.this.log.isDebugEnabled()) {
                                RetryingRegistration.this.log.debug("Registration at {} ({}) attempt {} timed out after {} ms", new Object[]{RetryingRegistration.this.targetName, RetryingRegistration.this.targetAddress, attempt, timeoutMillis});
                            }
                            long newTimeoutMillis = Math.min(2L * timeoutMillis, RetryingRegistration.this.maxRegistrationTimeout);
                            RetryingRegistration.this.register(gateway, attempt + 1, newTimeoutMillis);
                        } else {
                            RetryingRegistration.this.log.error("Registration at {} failed due to an error", (Object)RetryingRegistration.this.targetName, (Object)failure);
                            RetryingRegistration.this.log.info("Pausing and re-attempting registration in {} ms", (Object)RetryingRegistration.this.delayOnError);
                            RetryingRegistration.this.registerLater(gateway, 1, RetryingRegistration.this.initialRegistrationTimeout, RetryingRegistration.this.delayOnError);
                        }
                    }
                    return null;
                }
            }, this.rpcService.getExecutor());
        }
        catch (Throwable t) {
            this.cancel();
            this.completionFuture.completeExceptionally(t);
        }
    }

    private void registerLater(Gateway gateway, int attempt, long timeoutMillis, long delay) {
        this.rpcService.scheduleRunnable(new Runnable((RpcGateway)gateway, attempt, timeoutMillis){
            final /* synthetic */ RpcGateway val$gateway;
            final /* synthetic */ int val$attempt;
            final /* synthetic */ long val$timeoutMillis;
            {
                this.val$gateway = rpcGateway;
                this.val$attempt = n;
                this.val$timeoutMillis = l;
            }

            @Override
            public void run() {
                RetryingRegistration.this.register(this.val$gateway, this.val$attempt, this.val$timeoutMillis);
            }
        }, delay, TimeUnit.MILLISECONDS);
    }
}

