/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.AkkaException;
import akka.actor.AbstractActor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ChildRestartStats;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import akka.pattern.Patterns;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.Iterable;

class SupervisorActor
extends AbstractActor {
    private static final Logger LOG = LoggerFactory.getLogger(SupervisorActor.class);
    private final Executor terminationFutureExecutor;
    private final Map<ActorRef, AkkaRpcActorRegistration> registeredAkkaRpcActors;

    SupervisorActor(Executor terminationFutureExecutor) {
        this.terminationFutureExecutor = terminationFutureExecutor;
        this.registeredAkkaRpcActors = new HashMap<ActorRef, AkkaRpcActorRegistration>();
    }

    @Override
    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(StartAkkaRpcActor.class, this::createStartAkkaRpcActorMessage).matchAny(this::handleUnknownMessage).build();
    }

    @Override
    public void postStop() throws Exception {
        LOG.debug("Stopping supervisor actor.");
        super.postStop();
        for (AkkaRpcActorRegistration actorRegistration : this.registeredAkkaRpcActors.values()) {
            this.terminateAkkaRpcActorOnStop(actorRegistration);
        }
        this.registeredAkkaRpcActors.clear();
    }

    @Override
    public SupervisorActorSupervisorStrategy supervisorStrategy() {
        return new SupervisorActorSupervisorStrategy();
    }

    private void terminateAkkaRpcActorOnStop(AkkaRpcActorRegistration akkaRpcActorRegistration) {
        akkaRpcActorRegistration.terminateExceptionally((Throwable)((Object)new AkkaRpcException(String.format("Unexpected closing of %s with name %s.", this.getClass().getSimpleName(), akkaRpcActorRegistration.getEndpointId()))), this.terminationFutureExecutor);
    }

    private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
        String endpointId = startAkkaRpcActor.getEndpointId();
        AkkaRpcActorRegistration akkaRpcActorRegistration = new AkkaRpcActorRegistration(endpointId);
        Props akkaRpcActorProps = startAkkaRpcActor.getPropsFactory().create(akkaRpcActorRegistration.getInternalTerminationFuture());
        LOG.debug("Starting {} with name {}.", (Object)akkaRpcActorProps.actorClass().getSimpleName(), (Object)endpointId);
        try {
            ActorRef actorRef2 = this.getContext().actorOf(akkaRpcActorProps, endpointId);
            this.registeredAkkaRpcActors.put(actorRef2, akkaRpcActorRegistration);
            this.getSender().tell(StartAkkaRpcActorResponse.success(ActorRegistration.create(actorRef2, akkaRpcActorRegistration.getExternalTerminationFuture())), this.getSelf());
        }
        catch (AkkaException akkaException) {
            this.getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), this.getSelf());
        }
    }

    private void akkaRpcActorTerminated(ActorRef actorRef2) {
        AkkaRpcActorRegistration actorRegistration = this.removeAkkaRpcActor(actorRef2);
        LOG.debug("AkkaRpcActor {} has terminated.", (Object)actorRef2.path());
        actorRegistration.terminate(this.terminationFutureExecutor);
    }

    private void akkaRpcActorFailed(ActorRef actorRef2, Throwable cause) {
        LOG.warn("AkkaRpcActor {} has failed. Shutting it down now.", (Object)actorRef2.path(), (Object)cause);
        for (Map.Entry<ActorRef, AkkaRpcActorRegistration> registeredAkkaRpcActor : this.registeredAkkaRpcActors.entrySet()) {
            ActorRef otherActorRef = registeredAkkaRpcActor.getKey();
            if (otherActorRef.equals(actorRef2)) {
                AkkaRpcException error = new AkkaRpcException(String.format("Stopping actor %s because it failed.", actorRef2.path()), cause);
                registeredAkkaRpcActor.getValue().markFailed((Throwable)((Object)error));
                continue;
            }
            AkkaRpcException siblingException = new AkkaRpcException(String.format("Stopping actor %s because its sibling %s has failed.", otherActorRef.path(), actorRef2.path()));
            registeredAkkaRpcActor.getValue().markFailed((Throwable)((Object)siblingException));
        }
        this.getContext().getSystem().terminate();
    }

    private AkkaRpcActorRegistration removeAkkaRpcActor(ActorRef actorRef2) {
        return Optional.ofNullable(this.registeredAkkaRpcActors.remove(actorRef2)).orElseThrow(() -> new IllegalStateException(String.format("Could not find actor %s.", actorRef2.path())));
    }

    private void handleUnknownMessage(Object msg) {
        AkkaUnknownMessageException cause = new AkkaUnknownMessageException(String.format("Cannot handle unknown message %s.", msg));
        this.getSender().tell(new Status.Failure((Throwable)((Object)cause)), this.getSelf());
        throw cause;
    }

    public static String getActorName() {
        return "rpc";
    }

    public static ActorRef startSupervisorActor(ActorSystem actorSystem, Executor terminationFutureExecutor) {
        Props supervisorProps = Props.create(SupervisorActor.class, terminationFutureExecutor).withDispatcher("akka.actor.supervisor-dispatcher");
        return actorSystem.actorOf(supervisorProps, SupervisorActor.getActorName());
    }

    public static StartAkkaRpcActorResponse startAkkaRpcActor(ActorRef supervisor, StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
        return (StartAkkaRpcActorResponse)((CompletableFuture)Patterns.ask(supervisor, (Object)SupervisorActor.createStartAkkaRpcActorMessage(propsFactory, endpointId), RpcUtils.INF_DURATION).toCompletableFuture().thenApply(StartAkkaRpcActorResponse.class::cast)).join();
    }

    public static StartAkkaRpcActor createStartAkkaRpcActorMessage(StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
        return StartAkkaRpcActor.create(propsFactory, endpointId);
    }

    static final class StartAkkaRpcActorResponse {
        @Nullable
        private final ActorRegistration actorRegistration;
        @Nullable
        private final Throwable error;

        private StartAkkaRpcActorResponse(@Nullable ActorRegistration actorRegistration, @Nullable Throwable error) {
            this.actorRegistration = actorRegistration;
            this.error = error;
        }

        public <X extends Throwable> ActorRegistration orElseThrow(Function<? super Throwable, ? extends X> throwableFunction) throws X {
            if (this.actorRegistration != null) {
                return this.actorRegistration;
            }
            throw (Throwable)throwableFunction.apply(this.error);
        }

        public static StartAkkaRpcActorResponse success(ActorRegistration actorRegistration) {
            return new StartAkkaRpcActorResponse(actorRegistration, null);
        }

        public static StartAkkaRpcActorResponse failure(Throwable error) {
            return new StartAkkaRpcActorResponse(null, error);
        }
    }

    static final class ActorRegistration {
        private final ActorRef actorRef;
        private final CompletableFuture<Void> terminationFuture;

        private ActorRegistration(ActorRef actorRef2, CompletableFuture<Void> terminationFuture) {
            this.actorRef = actorRef2;
            this.terminationFuture = terminationFuture;
        }

        public ActorRef getActorRef() {
            return this.actorRef;
        }

        public CompletableFuture<Void> getTerminationFuture() {
            return this.terminationFuture;
        }

        public static ActorRegistration create(ActorRef actorRef2, CompletableFuture<Void> terminationFuture) {
            return new ActorRegistration(actorRef2, terminationFuture);
        }
    }

    static final class StartAkkaRpcActor {
        private final PropsFactory propsFactory;
        private final String endpointId;

        private StartAkkaRpcActor(PropsFactory propsFactory, String endpointId) {
            this.propsFactory = propsFactory;
            this.endpointId = endpointId;
        }

        public String getEndpointId() {
            return this.endpointId;
        }

        public PropsFactory getPropsFactory() {
            return this.propsFactory;
        }

        private static StartAkkaRpcActor create(PropsFactory propsFactory, String endpointId) {
            return new StartAkkaRpcActor(propsFactory, endpointId);
        }

        static interface PropsFactory {
            public Props create(CompletableFuture<Void> var1);
        }
    }

    private static final class AkkaRpcActorRegistration {
        private final String endpointId;
        private final CompletableFuture<Void> internalTerminationFuture;
        private final CompletableFuture<Void> externalTerminationFuture;
        @Nullable
        private Throwable errorCause;

        private AkkaRpcActorRegistration(String endpointId) {
            this.endpointId = endpointId;
            this.internalTerminationFuture = new CompletableFuture();
            this.externalTerminationFuture = new CompletableFuture();
            this.errorCause = null;
        }

        private CompletableFuture<Void> getInternalTerminationFuture() {
            return this.internalTerminationFuture;
        }

        private CompletableFuture<Void> getExternalTerminationFuture() {
            return this.externalTerminationFuture;
        }

        private String getEndpointId() {
            return this.endpointId;
        }

        private void terminate(Executor terminationFutureExecutor) {
            CompletionStage<Void> terminationFuture = this.internalTerminationFuture;
            if (this.errorCause != null) {
                if (!this.internalTerminationFuture.completeExceptionally(this.errorCause)) {
                    terminationFuture = this.internalTerminationFuture.handle((ignored, throwable) -> {
                        if (throwable != null) {
                            this.errorCause.addSuppressed((Throwable)throwable);
                        }
                        throw new CompletionException(this.errorCause);
                    });
                }
            } else {
                this.internalTerminationFuture.completeExceptionally((Throwable)((Object)new AkkaRpcException(String.format("RpcEndpoint %s did not complete the internal termination future.", this.endpointId))));
            }
            FutureUtils.forwardAsync(terminationFuture, this.externalTerminationFuture, (Executor)terminationFutureExecutor);
        }

        private void terminateExceptionally(Throwable cause, Executor terminationFutureExecutor) {
            terminationFutureExecutor.execute(() -> this.externalTerminationFuture.completeExceptionally(cause));
        }

        public void markFailed(Throwable cause) {
            if (this.errorCause == null) {
                this.errorCause = cause;
            } else {
                this.errorCause.addSuppressed(cause);
            }
        }
    }

    private final class SupervisorActorSupervisorStrategy
    extends SupervisorStrategy {
        private SupervisorActorSupervisorStrategy() {
        }

        @Override
        public PartialFunction<Throwable, SupervisorStrategy.Directive> decider() {
            return DeciderBuilder.match(Exception.class, e -> SupervisorStrategy.stop()).build();
        }

        @Override
        public boolean loggingEnabled() {
            return false;
        }

        @Override
        public void handleChildTerminated(ActorContext context, ActorRef child, Iterable<ActorRef> children) {
            SupervisorActor.this.akkaRpcActorTerminated(child);
        }

        @Override
        public void processFailure(ActorContext context, boolean restart, ActorRef child, Throwable cause, ChildRestartStats stats, Iterable<ChildRestartStats> children) {
            Preconditions.checkArgument((!restart ? 1 : 0) != 0, (Object)"The supervisor strategy should never restart an actor.");
            SupervisorActor.this.akkaRpcActorFailed(child, cause);
        }
    }
}

