/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.LocalMemberResetException;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.spi.exception.TargetNotMemberException;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import javax.annotation.Nonnull;

public abstract class AbstractJobProxy<T>
implements Job {
    private final long jobId;
    private final ILogger logger;
    private final T container;
    private final NonCompletableFuture future = new NonCompletableFuture();
    private final AtomicBoolean joinedJob = new AtomicBoolean();
    private final BiConsumer<Void, Throwable> joinJobCallback = new JoinJobCallback();
    private volatile JobConfig jobConfig;
    private final Supplier<Long> submissionTimeSup = Util.memoizeConcurrent(this::doGetJobSubmissionTime);

    AbstractJobProxy(T container, long jobId) {
        this.jobId = jobId;
        this.container = container;
        this.logger = this.loggingService().getLogger(Job.class);
    }

    AbstractJobProxy(T container, long jobId, DAG dag, JobConfig config) {
        this.jobId = jobId;
        this.container = container;
        this.logger = this.loggingService().getLogger(Job.class);
        try {
            this.doSubmitJob(dag, config);
            this.joinedJob.set(true);
            this.doInvokeJoinJob();
        }
        catch (Throwable t) {
            throw ExceptionUtil.rethrow(t);
        }
    }

    @Override
    public long getId() {
        return this.jobId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nonnull
    public JobConfig getConfig() {
        JobConfig loadResult = this.jobConfig;
        if (loadResult != null) {
            return loadResult;
        }
        AbstractJobProxy abstractJobProxy = this;
        synchronized (abstractJobProxy) {
            if (this.jobConfig != null) {
                return this.jobConfig;
            }
            this.jobConfig = this.doGetJobConfig();
            if (this.jobConfig == null) {
                throw new NullPointerException("Supplier returned null");
            }
            return this.jobConfig;
        }
    }

    private String idAndName() {
        JobConfig config = this.jobConfig;
        return this.getIdString() + " (name " + (config != null ? "'" + (config.getName() != null ? config.getName() : "") + "'" : "??") + ')';
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> getFuture() {
        if (this.joinedJob.compareAndSet(false, true)) {
            this.doInvokeJoinJob();
        }
        return this.future;
    }

    @Override
    public long getSubmissionTime() {
        return this.submissionTimeSup.get();
    }

    @Override
    public void cancel() {
        this.terminate(TerminationMode.CANCEL_FORCEFUL);
    }

    @Override
    public void restart() {
        this.terminate(TerminationMode.RESTART_GRACEFUL);
    }

    public void restart(boolean graceful) {
        this.terminate(graceful ? TerminationMode.RESTART_GRACEFUL : TerminationMode.RESTART_FORCEFUL);
    }

    @Override
    public void suspend() {
        this.terminate(TerminationMode.SUSPEND_GRACEFUL);
    }

    private void terminate(TerminationMode mode) {
        this.logger.fine("Sending " + (Object)((Object)mode) + " request for job " + this.idAndName());
        while (true) {
            try {
                this.invokeTerminateJob(mode).get();
            }
            catch (Exception e) {
                if (!this.isRestartable(e)) {
                    throw ExceptionUtil.rethrow(e);
                }
                this.logger.fine("Re-sending " + (Object)((Object)mode) + " request for job " + this.idAndName());
                continue;
            }
            break;
        }
    }

    public String toString() {
        return "Job{id=" + this.getIdString() + ", name=" + this.getName() + ", submissionTime=" + Util.toLocalDateTime(this.getSubmissionTime()) + ", status=" + (Object)((Object)this.getStatus()) + "}";
    }

    protected abstract CompletableFuture<Void> invokeSubmitJob(Data var1, JobConfig var2);

    protected abstract CompletableFuture<Void> invokeJoinJob();

    protected abstract CompletableFuture<Void> invokeTerminateJob(TerminationMode var1);

    protected abstract long doGetJobSubmissionTime();

    protected abstract JobConfig doGetJobConfig();

    protected abstract UUID masterUuid();

    protected abstract SerializationService serializationService();

    protected abstract LoggingService loggingService();

    protected T container() {
        return this.container;
    }

    private void doSubmitJob(DAG dag, JobConfig config) {
        CompletableFuture<Void> submitFuture = new CompletableFuture<Void>();
        SubmitJobCallback callback = new SubmitJobCallback(submitFuture, dag, config);
        this.invokeSubmitJob(this.serializationService().toData((Object)dag), config).whenCompleteAsync((BiConsumer)callback);
        submitFuture.join();
    }

    private boolean isRestartable(Throwable t) {
        return t instanceof MemberLeftException || t instanceof TargetDisconnectedException || t instanceof TargetNotMemberException;
    }

    private void doInvokeJoinJob() {
        this.invokeJoinJob().whenCompleteAsync((BiConsumer)this.joinJobCallback);
    }

    private class JoinJobCallback
    implements BiConsumer<Void, Throwable> {
        private JoinJobCallback() {
        }

        @Override
        public void accept(Void aVoid, Throwable t) {
            if (t != null) {
                Throwable ex = ExceptionUtil.peel(t);
                if (ex instanceof LocalMemberResetException) {
                    String msg = "Job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing a split-brain merge";
                    AbstractJobProxy.this.logger.warning(msg, ex);
                    AbstractJobProxy.this.future.internalCompleteExceptionally(new CancellationException(msg));
                } else if (!AbstractJobProxy.this.isRestartable(ex)) {
                    AbstractJobProxy.this.future.internalCompleteExceptionally(ex);
                } else {
                    try {
                        this.rejoinJob(t);
                    }
                    catch (Exception e) {
                        AbstractJobProxy.this.future.internalCompleteExceptionally(ExceptionUtil.peel(e));
                    }
                }
            } else {
                AbstractJobProxy.this.future.internalComplete();
            }
        }

        private void rejoinJob(Throwable t) {
            if (AbstractJobProxy.this.masterUuid() != null) {
                AbstractJobProxy.this.logger.fine("Rejoining to job " + AbstractJobProxy.this.idAndName() + " after " + t.getClass().getSimpleName());
                AbstractJobProxy.this.doInvokeJoinJob();
                return;
            }
            String msg = "Job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing  split-brain merge and coordinator is not known";
            AbstractJobProxy.this.logger.warning(msg, t);
            AbstractJobProxy.this.future.internalCompleteExceptionally(new CancellationException(msg));
        }
    }

    private class SubmitJobCallback
    implements BiConsumer<Void, Throwable> {
        private final CompletableFuture<Void> future;
        private final DAG dag;
        private final JobConfig config;

        SubmitJobCallback(CompletableFuture<Void> future, DAG dag, JobConfig config) {
            this.future = future;
            this.dag = dag;
            this.config = config;
        }

        @Override
        public void accept(Void aVoid, Throwable t) {
            if (t != null) {
                Throwable ex = ExceptionUtil.peel(t);
                if (ex instanceof LocalMemberResetException) {
                    String msg = "Submission of job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing split-brain merge";
                    AbstractJobProxy.this.logger.warning(msg, ex);
                    this.future.completeExceptionally(new CancellationException(msg));
                } else if (!AbstractJobProxy.this.isRestartable(ex)) {
                    this.future.completeExceptionally(ex);
                } else {
                    try {
                        this.resubmitJob(t);
                    }
                    catch (Exception e) {
                        this.future.completeExceptionally(ExceptionUtil.peel(e));
                    }
                }
            } else {
                this.future.complete(null);
            }
        }

        private void resubmitJob(Throwable t) {
            if (AbstractJobProxy.this.masterUuid() != null) {
                AbstractJobProxy.this.logger.fine("Resubmitting job " + AbstractJobProxy.this.idAndName() + " after " + t.getClass().getSimpleName());
                AbstractJobProxy.this.invokeSubmitJob(AbstractJobProxy.this.serializationService().toData((Object)this.dag), this.config).whenCompleteAsync((BiConsumer)this);
                return;
            }
            String msg = "Job " + AbstractJobProxy.this.idAndName() + " failed because the cluster is performing  split-brain merge and coordinator is not known";
            AbstractJobProxy.this.logger.warning(msg, t);
            this.future.completeExceptionally(new CancellationException(msg));
        }
    }
}

