/*
 * Decompiled with CFR 0.152.
 */
package za.co.absa.hyperdrive.trigger.scheduler.executors;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.stereotype.Component;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableLike;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import za.co.absa.hyperdrive.trigger.configuration.application.SchedulerConfig;
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig;
import za.co.absa.hyperdrive.trigger.models.DagInstance;
import za.co.absa.hyperdrive.trigger.models.JobInstance;
import za.co.absa.hyperdrive.trigger.models.JobInstanceParameters;
import za.co.absa.hyperdrive.trigger.models.ShellInstanceParameters;
import za.co.absa.hyperdrive.trigger.models.SparkInstanceParameters;
import za.co.absa.hyperdrive.trigger.models.enums.DagInstanceStatuses;
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses;
import za.co.absa.hyperdrive.trigger.persistance.DagInstanceRepository;
import za.co.absa.hyperdrive.trigger.persistance.JobInstanceRepository;
import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor$;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkClusterService;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkEmrClusterServiceImpl;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkExecutor$;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkYarnClusterServiceImpl;
import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender;

/*
 * Exception performing whole class analysis ignored.
 */
@Component
@ScalaSignature(bytes="\u0006\u0001\u0005\u0015c\u0001B\t\u0013\u0001\u0005B\u0001\u0002\u000b\u0001\u0003\u0002\u0003\u0006I!\u000b\u0005\t_\u0001\u0011\t\u0011)A\u0005a!A1\u0007\u0001B\u0001B\u0003%A\u0007\u0003\u0005;\u0001\t\u0005\t\u0015!\u0003<\u0011!9\u0005A!b\u0001\n\u0007A\u0005\u0002C)\u0001\u0005\u0003\u0005\u000b\u0011B%\t\u0011I\u0003!\u0011!Q\u0001\nMCQA\u0016\u0001\u0005\u0002]Cq!\u001b\u0001C\u0002\u0013%!\u000e\u0003\u0004r\u0001\u0001\u0006Ia\u001b\u0005\be\u0002\u0011\r\u0011b\u0003t\u0011\u0019Q\b\u0001)A\u0005i\"91\u0010\u0001b\u0001\n\u0013a\bbBA\u0004\u0001\u0001\u0006I! \u0005\b\u0003\u0013\u0001A\u0011AA\u0006\u0011\u001d\tI\u0003\u0001C\u0005\u0003W\u0011\u0011\"\u0012=fGV$xN]:\u000b\u0005M!\u0012!C3yK\u000e,Ho\u001c:t\u0015\t)b#A\u0005tG\",G-\u001e7fe*\u0011q\u0003G\u0001\biJLwmZ3s\u0015\tI\"$\u0001\u0006isB,'\u000f\u001a:jm\u0016T!a\u0007\u000f\u0002\t\u0005\u00147/\u0019\u0006\u0003;y\t!aY8\u000b\u0003}\t!A_1\u0004\u0001M\u0011\u0001A\t\t\u0003G\u0019j\u0011\u0001\n\u0006\u0002K\u0005)1oY1mC&\u0011q\u0005\n\u0002\u0007\u0003:L(+\u001a4\u0002+\u0011\fw-\u00138ti\u0006t7-\u001a*fa>\u001c\u0018\u000e^8ssB\u0011!&L\u0007\u0002W)\u0011AFF\u0001\fa\u0016\u00148/[:uC:\u001cW-\u0003\u0002/W\t)B)Y4J]N$\u0018M\\2f%\u0016\u0004xn]5u_JL\u0018!\u00066pE&s7\u000f^1oG\u0016\u0014V\r]8tSR|'/\u001f\t\u0003UEJ!AM\u0016\u0003+){'-\u00138ti\u0006t7-\u001a*fa>\u001c\u0018\u000e^8ss\u0006\u0011bn\u001c;jM&\u001c\u0017\r^5p]N+g\u000eZ3s!\t)\u0004(D\u00017\u0015\t9D#A\u0007o_RLg-[2bi&|gn]\u0005\u0003sY\u0012!CT8uS\u001aL7-\u0019;j_:\u001cVM\u001c3fe\u0006Y!-Z1o\r\u0006\u001cGo\u001c:z!\taT)D\u0001>\u0015\tqt(A\u0004gC\u000e$xN]=\u000b\u0005\u0001\u000b\u0015!\u00022fC:\u001c(B\u0001\"D\u0003=\u0019\bO]5oO\u001a\u0014\u0018-\\3x_J\\'\"\u0001#\u0002\u0007=\u0014x-\u0003\u0002G{\tY!)Z1o\r\u0006\u001cGo\u001c:z\u0003-\u0019\b/\u0019:l\u0007>tg-[4\u0016\u0003%\u0003\"AS(\u000e\u0003-S!\u0001T'\u0002\u0017\u0005\u0004\b\u000f\\5dCRLwN\u001c\u0006\u0003\u001dZ\tQbY8oM&<WO]1uS>t\u0017B\u0001)L\u0005-\u0019\u0006/\u0019:l\u0007>tg-[4\u0002\u0019M\u0004\u0018M]6D_:4\u0017n\u001a\u0011\u0002\u001fM\u001c\u0007.\u001a3vY\u0016\u00148i\u001c8gS\u001e\u0004\"A\u0013+\n\u0005U[%aD*dQ\u0016$W\u000f\\3s\u0007>tg-[4\u0002\rqJg.\u001b;?)\u001dA&l\u0017/^=~\u0003\"!\u0017\u0001\u000e\u0003IAQ\u0001\u000b\u0005A\u0002%BQa\f\u0005A\u0002ABQa\r\u0005A\u0002QBQA\u000f\u0005A\u0002mBQa\u0012\u0005A\u0004%CQA\u0015\u0005A\u0002MC#\u0001C1\u0011\u0005\t<W\"A2\u000b\u0005\u0011,\u0017AB5oU\u0016\u001cGOC\u0001g\u0003\u0015Q\u0017M^1y\u0013\tA7M\u0001\u0004J]*,7\r^\u0001\u0007Y><w-\u001a:\u0016\u0003-\u0004\"\u0001\\8\u000e\u00035T!A\\\"\u0002\u000bMdg\r\u000e6\n\u0005Al'A\u0002'pO\u001e,'/A\u0004m_\u001e<WM\u001d\u0011\u0002!\u0015DXmY;uS>t7i\u001c8uKb$X#\u0001;\u0011\u0005UDX\"\u0001<\u000b\u0005]$\u0013AC2p]\u000e,(O]3oi&\u0011\u0011P\u001e\u0002\u0019\u000bb,7-\u001e;j_:\u001cuN\u001c;fqR,\u00050Z2vi>\u0014\u0018!E3yK\u000e,H/[8o\u0007>tG/\u001a=uA\u0005\u00192\u000f]1sW\u000ecWo\u001d;feN+'O^5dKV\tQ\u0010E\u0002\u007f\u0003\u0007i\u0011a \u0006\u0004\u0003\u0003\u0011\u0012!B:qCJ\\\u0017bAA\u0003\u007f\n\u00192\u000b]1sW\u000ecWo\u001d;feN+'O^5dK\u0006!2\u000f]1sW\u000ecWo\u001d;feN+'O^5dK\u0002\n!\"\u001a=fGV$X\rR1h)\u0011\ti!!\u0007\u0011\u000bU\fy!a\u0005\n\u0007\u0005EaO\u0001\u0004GkR,(/\u001a\t\u0004G\u0005U\u0011bAA\fI\t!QK\\5u\u0011\u001d\tYb\u0004a\u0001\u0003;\t1\u0002Z1h\u0013:\u001cH/\u00198dKB!\u0011qDA\u0013\u001b\t\t\tCC\u0002\u0002$Y\ta!\\8eK2\u001c\u0018\u0002BA\u0014\u0003C\u00111\u0002R1h\u0013:\u001cH/\u00198dK\u0006IQ\u000f\u001d3bi\u0016TuN\u0019\u000b\u0005\u0003\u001b\ti\u0003C\u0004\u00020A\u0001\r!!\r\u0002\u0017)|'-\u00138ti\u0006t7-\u001a\t\u0005\u0003?\t\u0019$\u0003\u0003\u00026\u0005\u0005\"a\u0003&pE&s7\u000f^1oG\u0016D3\u0001AA\u001d!\u0011\tY$!\u0011\u000e\u0005\u0005u\"bAA \u0003\u0006Q1\u000f^3sK>$\u0018\u0010]3\n\t\u0005\r\u0013Q\b\u0002\n\u0007>l\u0007o\u001c8f]R\u0004")
public class Executors {
    private final DagInstanceRepository dagInstanceRepository;
    private final JobInstanceRepository jobInstanceRepository;
    private final NotificationSender notificationSender;
    private final SparkConfig sparkConfig;
    private final Logger logger;
    private final ExecutionContextExecutor executionContext;
    private final SparkClusterService sparkClusterService;

    public SparkConfig sparkConfig() {
        return this.sparkConfig;
    }

    private Logger logger() {
        return this.logger;
    }

    private ExecutionContextExecutor executionContext() {
        return this.executionContext;
    }

    private SparkClusterService sparkClusterService() {
        return this.sparkClusterService;
    }

    public Future<BoxedUnit> executeDag(DagInstance dagInstance) {
        return this.jobInstanceRepository.getJobInstances(dagInstance.id(), (ExecutionContext)this.executionContext()).flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Future future;
            Seq seq = x0$1;
            if (seq.exists((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)Executors.$anonfun$executeDag$2((JobInstance)x$1)))) {
                DagInstanceStatuses.Failed$ x$12 = DagInstanceStatuses.Failed$.MODULE$;
                Option x$22 = Option$.MODULE$.apply((Object)LocalDateTime.now());
                String x$32 = dagInstance.copy$default$2();
                long x$42 = dagInstance.copy$default$3();
                LocalDateTime x$52 = dagInstance.copy$default$4();
                long x$62 = dagInstance.copy$default$6();
                DagInstance updatedDagInstance = dagInstance.copy((DagInstanceStatuses.DagInstanceStatus)x$12, x$32, x$42, x$52, x$22, x$62);
                Future fut = $this.jobInstanceRepository.updateJobsStatus((Seq)((TraversableLike)seq.filter((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)Executors.$anonfun$executeDag$3((JobInstance)x$2)))).map((Function1 & Serializable & scala.Serializable)x$3 -> BoxesRunTime.boxToLong((long)Executors.$anonfun$executeDag$4((JobInstance)x$3)), Seq$.MODULE$.canBuildFrom()), (JobStatuses.JobStatus)JobStatuses.FailedPreviousJob$.MODULE$, (ExecutionContext)this.executionContext()).flatMap((Function1 & Serializable & scala.Serializable)_ -> $this.dagInstanceRepository.update(updatedDagInstance).flatMap((Function1 & Serializable & scala.Serializable)_2 -> $this.notificationSender.createNotifications(updatedDagInstance, seq, (ExecutionContext)this.executionContext()).map((Function1 & Serializable & scala.Serializable)_ -> {
                    Executors.$anonfun$executeDag$7((BoxedUnit)_);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)this.executionContext()), (ExecutionContext)this.executionContext()), (ExecutionContext)this.executionContext());
                fut.onComplete((Function1 & Serializable & scala.Serializable)x0$2 -> {
                    Executors.$anonfun$executeDag$8((Executors)this, (DagInstance)dagInstance, (Try)x0$2);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)this.executionContext());
                future = fut;
            } else if (seq.forall((Function1 & Serializable & scala.Serializable)ji -> BoxesRunTime.boxToBoolean((boolean)Executors.$anonfun$executeDag$9((JobInstance)ji)))) {
                DagInstanceStatuses.Succeeded$ x$7 = DagInstanceStatuses.Succeeded$.MODULE$;
                Option x$8 = Option$.MODULE$.apply((Object)LocalDateTime.now());
                String x$9 = dagInstance.copy$default$2();
                long x$10 = dagInstance.copy$default$3();
                LocalDateTime x$11 = dagInstance.copy$default$4();
                long x$12 = dagInstance.copy$default$6();
                DagInstance updatedDagInstance = dagInstance.copy((DagInstanceStatuses.DagInstanceStatus)x$7, x$9, x$10, x$11, x$8, x$12);
                Future fut = $this.dagInstanceRepository.update(updatedDagInstance).flatMap((Function1 & Serializable & scala.Serializable)_2 -> $this.notificationSender.createNotifications(updatedDagInstance, seq, (ExecutionContext)this.executionContext()).map((Function1 & Serializable & scala.Serializable)_ -> {
                    Executors.$anonfun$executeDag$11((BoxedUnit)_);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)this.executionContext()), (ExecutionContext)this.executionContext());
                fut.onComplete((Function1 & Serializable & scala.Serializable)x0$3 -> {
                    Executors.$anonfun$executeDag$12((Executors)this, (DagInstance)dagInstance, (Try)x0$3);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)this.executionContext());
                future = fut;
            } else {
                Option jobInstance = ((TraversableLike)((SeqLike)seq.filter((Function1 & Serializable & scala.Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)Executors.$anonfun$executeDag$13((JobInstance)x$4)))).sortBy((Function1 & Serializable & scala.Serializable)x$5 -> BoxesRunTime.boxToInteger((int)Executors.$anonfun$executeDag$14((JobInstance)x$5)), (Ordering)Ordering.Int$.MODULE$)).headOption();
                Future fut = $this.dagInstanceRepository.update(dagInstance.copy((DagInstanceStatuses.DagInstanceStatus)DagInstanceStatuses.Running$.MODULE$, dagInstance.copy$default$2(), dagInstance.copy$default$3(), dagInstance.copy$default$4(), dagInstance.copy$default$5(), dagInstance.copy$default$6())).flatMap((Function1 & Serializable & scala.Serializable)x$6 -> {
                    Future future;
                    Option option = jobInstance;
                    if (option instanceof Some) {
                        Future future2;
                        Some some = (Some)option;
                        JobInstance ji = (JobInstance)some.value();
                        JobInstanceParameters jobInstanceParameters = ji.jobParameters();
                        if (jobInstanceParameters instanceof SparkInstanceParameters) {
                            SparkInstanceParameters sparkInstanceParameters = (SparkInstanceParameters)jobInstanceParameters;
                            future2 = SparkExecutor$.MODULE$.execute(ji, sparkInstanceParameters, (Function1 & Serializable & scala.Serializable)jobInstance -> this.updateJob(jobInstance), this.sparkClusterService(), (ExecutionContext)this.executionContext(), this.sparkConfig());
                        } else if (jobInstanceParameters instanceof ShellInstanceParameters) {
                            ShellInstanceParameters shellInstanceParameters = (ShellInstanceParameters)jobInstanceParameters;
                            future2 = ShellExecutor$.MODULE$.execute(ji, shellInstanceParameters, (Function1 & Serializable & scala.Serializable)jobInstance -> this.updateJob(jobInstance), (ExecutionContext)this.executionContext());
                        } else {
                            JobStatuses.InvalidExecutor$ x$13 = JobStatuses.InvalidExecutor$.MODULE$;
                            String x$14 = ji.copy$default$1();
                            JobInstanceParameters x$15 = ji.copy$default$2();
                            Option x$16 = ji.copy$default$4();
                            Option x$17 = ji.copy$default$5();
                            Option x$18 = ji.copy$default$6();
                            LocalDateTime x$19 = ji.copy$default$7();
                            Option x$20 = ji.copy$default$8();
                            int x$21 = ji.copy$default$9();
                            long x$22 = ji.copy$default$10();
                            long x$23 = ji.copy$default$11();
                            future2 = this.updateJob(ji.copy(x$14, x$15, (JobStatuses.JobStatus)x$13, x$16, x$17, x$18, x$19, x$20, x$21, x$22, x$23));
                        }
                        future = future2;
                    } else if (None$.MODULE$.equals(option)) {
                        future = Future$.MODULE$.successful((Object)BoxedUnit.UNIT);
                    } else {
                        throw new MatchError((Object)option);
                    }
                    return future;
                }, (ExecutionContext)this.executionContext());
                fut.onComplete((Function1 & Serializable & scala.Serializable)x0$4 -> {
                    Executors.$anonfun$executeDag$18((Executors)this, (Option)jobInstance, (Try)x0$4);
                    return BoxedUnit.UNIT;
                }, (ExecutionContext)this.executionContext());
                future = fut;
            }
            return future;
        }, (ExecutionContext)this.executionContext());
    }

    private Future<BoxedUnit> updateJob(JobInstance jobInstance) {
        return this.jobInstanceRepository.updateJob(jobInstance, (ExecutionContext)this.executionContext());
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$2(JobInstance x$1) {
        return x$1.jobStatus().isFailed();
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$3(JobInstance x$2) {
        return !x$2.jobStatus().isFinalStatus();
    }

    public static final /* synthetic */ long $anonfun$executeDag$4(JobInstance x$3) {
        return x$3.id();
    }

    public static final /* synthetic */ void $anonfun$executeDag$7(BoxedUnit _) {
    }

    public static final /* synthetic */ void $anonfun$executeDag$8(Executors $this, DagInstance dagInstance$1, Try x0$2) {
        Try try_ = x0$2;
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable exception = failure.exception();
            $this.logger().error(new StringBuilder(57).append("Updating status failed for failed run. Dag instance id = ").append(dagInstance$1.id()).toString(), exception);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$9(JobInstance ji) {
        return ji.jobStatus().isFinalStatus() && !ji.jobStatus().isFailed();
    }

    public static final /* synthetic */ void $anonfun$executeDag$11(BoxedUnit _) {
    }

    public static final /* synthetic */ void $anonfun$executeDag$12(Executors $this, DagInstance dagInstance$1, Try x0$3) {
        Try try_ = x0$3;
        if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable exception = failure.exception();
            $this.logger().error(new StringBuilder(61).append("Updating status failed for successful run. Dag instance id = ").append(dagInstance$1.id()).toString(), exception);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$13(JobInstance x$4) {
        return !x$4.jobStatus().isFinalStatus();
    }

    public static final /* synthetic */ int $anonfun$executeDag$14(JobInstance x$5) {
        return x$5.order();
    }

    public static final /* synthetic */ void $anonfun$executeDag$18(Executors $this, Option jobInstance$1, Try x0$4) {
        Try try_ = x0$4;
        if (try_ instanceof Success) {
            $this.logger().debug(new StringBuilder(33).append("Executing job. Job instance id = ").append(jobInstance$1).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (try_ instanceof Failure) {
            Failure failure = (Failure)try_;
            Throwable exception = failure.exception();
            $this.logger().error(new StringBuilder(41).append("Executing job failed. Job instance id = ").append(jobInstance$1).append(".").toString(), exception);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)try_);
        }
    }

    @Inject
    public Executors(DagInstanceRepository dagInstanceRepository, JobInstanceRepository jobInstanceRepository, NotificationSender notificationSender, BeanFactory beanFactory, SparkConfig sparkConfig, SchedulerConfig schedulerConfig) {
        SparkClusterService sparkClusterService;
        this.dagInstanceRepository = dagInstanceRepository;
        this.jobInstanceRepository = jobInstanceRepository;
        this.notificationSender = notificationSender;
        this.sparkConfig = sparkConfig;
        this.logger = LoggerFactory.getLogger((Class)this.getClass());
        this.executionContext = ExecutionContext$.MODULE$.fromExecutor((Executor)java.util.concurrent.Executors.newFixedThreadPool(schedulerConfig.executors().threadPoolSize()));
        String string = sparkConfig.submitApi().toLowerCase();
        if ("yarn".equals(string)) {
            this.logger().info("Using yarn cluster");
            sparkClusterService = (SparkClusterService)beanFactory.getBean(SparkYarnClusterServiceImpl.class);
        } else if ("emr".equals(string)) {
            this.logger().info("Using emr cluster");
            sparkClusterService = (SparkClusterService)beanFactory.getBean(SparkEmrClusterServiceImpl.class);
        } else {
            throw new IllegalArgumentException("Invalid spark cluster api - use one of: yarn, emr");
        }
        this.sparkClusterService = sparkClusterService;
    }
}

