package za.co.absa.hyperdrive.trigger.scheduler.executors;

import java.time.LocalDateTime;
import javax.inject.Inject;
import org.apache.hadoop.mapreduce.MRConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.stereotype.Component;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.math.Ordering$Int$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import za.co.absa.hyperdrive.trigger.configuration.application.SchedulerConfig;
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig;
import za.co.absa.hyperdrive.trigger.models.DagInstance;
import za.co.absa.hyperdrive.trigger.models.JobInstance;
import za.co.absa.hyperdrive.trigger.models.JobInstanceParameters;
import za.co.absa.hyperdrive.trigger.models.ShellInstanceParameters;
import za.co.absa.hyperdrive.trigger.models.SparkInstanceParameters;
import za.co.absa.hyperdrive.trigger.models.enums.DagInstanceStatuses$Failed$;
import za.co.absa.hyperdrive.trigger.models.enums.DagInstanceStatuses$Running$;
import za.co.absa.hyperdrive.trigger.models.enums.DagInstanceStatuses$Succeeded$;
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses$FailedPreviousJob$;
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses$InvalidExecutor$;
import za.co.absa.hyperdrive.trigger.persistance.DagInstanceRepository;
import za.co.absa.hyperdrive.trigger.persistance.JobInstanceRepository;
import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor$;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkClusterService;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkEmrClusterServiceImpl;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkExecutor$;
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.SparkYarnClusterServiceImpl;
import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender;

/* compiled from: Executors.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015c\u0001B\t\u0013\u0001\u0005B\u0001\u0002\u000b\u0001\u0003\u0002\u0003\u0006I!\u000b\u0005\t_\u0001\u0011\t\u0011)A\u0005a!A1\u0007\u0001B\u0001B\u0003%A\u0007\u0003\u0005;\u0001\t\u0005\t\u0015!\u0003<\u0011!9\u0005A!b\u0001\n\u0007A\u0005\u0002C)\u0001\u0005\u0003\u0005\u000b\u0011B%\t\u0011I\u0003!\u0011!Q\u0001\nMCQA\u0016\u0001\u0005\u0002]Cq!\u001b\u0001C\u0002\u0013%!\u000e\u0003\u0004r\u0001\u0001\u0006Ia\u001b\u0005\be\u0002\u0011\r\u0011b\u0003t\u0011\u0019Q\b\u0001)A\u0005i\"91\u0010\u0001b\u0001\n\u0013a\bbBA\u0004\u0001\u0001\u0006I! \u0005\b\u0003\u0013\u0001A\u0011AA\u0006\u0011\u001d\tI\u0003\u0001C\u0005\u0003W\u0011\u0011\"\u0012=fGV$xN]:\u000b\u0005M!\u0012!C3yK\u000e,Ho\u001c:t\u0015\t)b#A\u0005tG\",G-\u001e7fe*\u0011q\u0003G\u0001\biJLwmZ3s\u0015\tI\"$\u0001\u0006isB,'\u000f\u001a:jm\u0016T!a\u0007\u000f\u0002\t\u0005\u00147/\u0019\u0006\u0003;y\t!aY8\u000b\u0003}\t!A_1\u0004\u0001M\u0011\u0001A\t\t\u0003G\u0019j\u0011\u0001\n\u0006\u0002K\u0005)1oY1mC&\u0011q\u0005\n\u0002\u0007\u0003:L(+\u001a4\u0002+\u0011\fw-\u00138ti\u0006t7-\u001a*fa>\u001c\u0018\u000e^8ssB\u0011!&L\u0007\u0002W)\u0011AFF\u0001\fa\u0016\u00148/[:uC:\u001cW-\u0003\u0002/W\t)B)Y4J]N$\u0018M\\2f%\u0016\u0004xn]5u_JL\u0018!\u00066pE&s7\u000f^1oG\u0016\u0014V\r]8tSR|'/\u001f\t\u0003UEJ!AM\u0016\u0003+){'-\u00138ti\u0006t7-\u001a*fa>\u001c\u0018\u000e^8ss\u0006\u0011bn\u001c;jM&\u001c\u0017\r^5p]N+g\u000eZ3s!\t)\u0004(D\u00017\u0015\t9D#A\u0007o_RLg-[2bi&|gn]\u0005\u0003sY\u0012!CT8uS\u001aL7-\u0019;j_:\u001cVM\u001c3fe\u0006Y!-Z1o\r\u0006\u001cGo\u001c:z!\taT)D\u0001>\u0015\tqt(A\u0004gC\u000e$xN]=\u000b\u0005\u0001\u000b\u0015!\u00022fC:\u001c(B\u0001\"D\u0003=\u0019\bO]5oO\u001a\u0014\u0018-\\3x_J\\'\"\u0001#\u0002\u0007=\u0014x-\u0003\u0002G{\tY!)Z1o\r\u0006\u001cGo\u001c:z\u0003-\u0019\b/\u0019:l\u0007>tg-[4\u0016\u0003%\u0003\"AS(\u000e\u0003-S!\u0001T'\u0002\u0017\u0005\u0004\b\u000f\\5dCRLwN\u001c\u0006\u0003\u001dZ\tQbY8oM&<WO]1uS>t\u0017B\u0001)L\u0005-\u0019\u0006/\u0019:l\u0007>tg-[4\u0002\u0019M\u0004\u0018M]6D_:4\u0017n\u001a\u0011\u0002\u001fM\u001c\u0007.\u001a3vY\u0016\u00148i\u001c8gS\u001e\u0004\"A\u0013+\n\u0005U[%aD*dQ\u0016$W\u000f\\3s\u0007>tg-[4\u0002\rqJg.\u001b;?)\u001dA&l\u0017/^=~\u0003\"!\u0017\u0001\u000e\u0003IAQ\u0001\u000b\u0005A\u0002%BQa\f\u0005A\u0002ABQa\r\u0005A\u0002QBQA\u000f\u0005A\u0002mBQa\u0012\u0005A\u0004%CQA\u0015\u0005A\u0002MC#\u0001C1\u0011\u0005\t<W\"A2\u000b\u0005\u0011,\u0017AB5oU\u0016\u001cGOC\u0001g\u0003\u0015Q\u0017M^1y\u0013\tA7M\u0001\u0004J]*,7\r^\u0001\u0007Y><w-\u001a:\u0016\u0003-\u0004\"\u0001\\8\u000e\u00035T!A\\\"\u0002\u000bMdg\r\u000e6\n\u0005Al'A\u0002'pO\u001e,'/A\u0004m_\u001e<WM\u001d\u0011\u0002!\u0015DXmY;uS>t7i\u001c8uKb$X#\u0001;\u0011\u0005UDX\"\u0001<\u000b\u0005]$\u0013AC2p]\u000e,(O]3oi&\u0011\u0011P\u001e\u0002\u0019\u000bb,7-\u001e;j_:\u001cuN\u001c;fqR,\u00050Z2vi>\u0014\u0018!E3yK\u000e,H/[8o\u0007>tG/\u001a=uA\u0005\u00192\u000f]1sW\u000ecWo\u001d;feN+'O^5dKV\tQ\u0010E\u0002\u007f\u0003\u0007i\u0011a \u0006\u0004\u0003\u0003\u0011\u0012!B:qCJ\\\u0017bAA\u0003\u007f\n\u00192\u000b]1sW\u000ecWo\u001d;feN+'O^5dK\u0006!2\u000f]1sW\u000ecWo\u001d;feN+'O^5dK\u0002\n!\"\u001a=fGV$X\rR1h)\u0011\ti!!\u0007\u0011\u000bU\fy!a\u0005\n\u0007\u0005EaO\u0001\u0004GkR,(/\u001a\t\u0004G\u0005U\u0011bAA\fI\t!QK\\5u\u0011\u001d\tYb\u0004a\u0001\u0003;\t1\u0002Z1h\u0013:\u001cH/\u00198dKB!\u0011qDA\u0013\u001b\t\t\tCC\u0002\u0002$Y\ta!\\8eK2\u001c\u0018\u0002BA\u0014\u0003C\u00111\u0002R1h\u0013:\u001cH/\u00198dK\u0006IQ\u000f\u001d3bi\u0016TuN\u0019\u000b\u0005\u0003\u001b\ti\u0003C\u0004\u00020A\u0001\r!!\r\u0002\u0017)|'-\u00138ti\u0006t7-\u001a\t\u0005\u0003?\t\u0019$\u0003\u0003\u00026\u0005\u0005\"a\u0003&pE&s7\u000f^1oG\u0016D3\u0001AA\u001d!\u0011\tY$!\u0011\u000e\u0005\u0005u\"bAA \u0003\u0006Q1\u000f^3sK>$\u0018\u0010]3\n\t\u0005\r\u0013Q\b\u0002\n\u0007>l\u0007o\u001c8f]R\u0004")
@Component
/* loaded from: input_file:WEB-INF/classes/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.class */
public class Executors {
    private final DagInstanceRepository dagInstanceRepository;
    private final JobInstanceRepository jobInstanceRepository;
    private final NotificationSender notificationSender;
    private final SparkConfig sparkConfig;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final ExecutionContextExecutor executionContext;
    private final SparkClusterService sparkClusterService;

    public SparkConfig sparkConfig() {
        return this.sparkConfig;
    }

    private Logger logger() {
        return this.logger;
    }

    private ExecutionContextExecutor executionContext() {
        return this.executionContext;
    }

    private SparkClusterService sparkClusterService() {
        return this.sparkClusterService;
    }

    public Future<BoxedUnit> executeDag(DagInstance dagInstance) {
        return this.jobInstanceRepository.getJobInstances(dagInstance.id(), executionContext()).flatMap(seq -> {
            Future future;
            if (seq.exists(jobInstance -> {
                return BoxesRunTime.boxToBoolean($anonfun$executeDag$2(jobInstance));
            })) {
                DagInstance copy = dagInstance.copy(DagInstanceStatuses$Failed$.MODULE$, dagInstance.copy$default$2(), dagInstance.copy$default$3(), dagInstance.copy$default$4(), Option$.MODULE$.apply(LocalDateTime.now()), dagInstance.copy$default$6());
                Future flatMap = this.jobInstanceRepository.updateJobsStatus((Seq) seq.filter(jobInstance2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$executeDag$3(jobInstance2));
                }).map(jobInstance3 -> {
                    return BoxesRunTime.boxToLong(jobInstance3.id());
                }, Seq$.MODULE$.canBuildFrom()), JobStatuses$FailedPreviousJob$.MODULE$, this.executionContext()).flatMap(boxedUnit -> {
                    return this.dagInstanceRepository.update(copy).flatMap(boxedUnit -> {
                        return this.notificationSender.createNotifications(copy, seq, this.executionContext()).map(boxedUnit -> {
                            $anonfun$executeDag$7(boxedUnit);
                            return BoxedUnit.UNIT;
                        }, this.executionContext());
                    }, this.executionContext());
                }, this.executionContext());
                flatMap.onComplete(r6 -> {
                    $anonfun$executeDag$8(this, dagInstance, r6);
                    return BoxedUnit.UNIT;
                }, this.executionContext());
                future = flatMap;
            } else if (seq.forall(jobInstance4 -> {
                return BoxesRunTime.boxToBoolean($anonfun$executeDag$9(jobInstance4));
            })) {
                DagInstance copy2 = dagInstance.copy(DagInstanceStatuses$Succeeded$.MODULE$, dagInstance.copy$default$2(), dagInstance.copy$default$3(), dagInstance.copy$default$4(), Option$.MODULE$.apply(LocalDateTime.now()), dagInstance.copy$default$6());
                Future flatMap2 = this.dagInstanceRepository.update(copy2).flatMap(boxedUnit2 -> {
                    return this.notificationSender.createNotifications(copy2, seq, this.executionContext()).map(boxedUnit2 -> {
                        $anonfun$executeDag$11(boxedUnit2);
                        return BoxedUnit.UNIT;
                    }, this.executionContext());
                }, this.executionContext());
                flatMap2.onComplete(r62 -> {
                    $anonfun$executeDag$12(this, dagInstance, r62);
                    return BoxedUnit.UNIT;
                }, this.executionContext());
                future = flatMap2;
            } else {
                Option headOption = ((TraversableLike) seq.filter(jobInstance5 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$executeDag$13(jobInstance5));
                }).sortBy(jobInstance6 -> {
                    return BoxesRunTime.boxToInteger(jobInstance6.order());
                }, Ordering$Int$.MODULE$)).headOption();
                Future flatMap3 = this.dagInstanceRepository.update(dagInstance.copy(DagInstanceStatuses$Running$.MODULE$, dagInstance.copy$default$2(), dagInstance.copy$default$3(), dagInstance.copy$default$4(), dagInstance.copy$default$5(), dagInstance.copy$default$6())).flatMap(boxedUnit3 -> {
                    Future<BoxedUnit> successful;
                    Future<BoxedUnit> updateJob;
                    if (headOption instanceof Some) {
                        JobInstance jobInstance7 = (JobInstance) ((Some) headOption).value();
                        JobInstanceParameters jobParameters = jobInstance7.jobParameters();
                        if (jobParameters instanceof SparkInstanceParameters) {
                            updateJob = SparkExecutor$.MODULE$.execute(jobInstance7, (SparkInstanceParameters) jobParameters, jobInstance8 -> {
                                return this.updateJob(jobInstance8);
                            }, this.sparkClusterService(), this.executionContext(), this.sparkConfig());
                        } else if (jobParameters instanceof ShellInstanceParameters) {
                            updateJob = ShellExecutor$.MODULE$.execute(jobInstance7, (ShellInstanceParameters) jobParameters, jobInstance9 -> {
                                return this.updateJob(jobInstance9);
                            }, this.executionContext());
                        } else {
                            updateJob = this.updateJob(jobInstance7.copy(jobInstance7.copy$default$1(), jobInstance7.copy$default$2(), JobStatuses$InvalidExecutor$.MODULE$, jobInstance7.copy$default$4(), jobInstance7.copy$default$5(), jobInstance7.copy$default$6(), jobInstance7.copy$default$7(), jobInstance7.copy$default$8(), jobInstance7.copy$default$9(), jobInstance7.copy$default$10(), jobInstance7.copy$default$11()));
                        }
                        successful = updateJob;
                    } else {
                        if (!None$.MODULE$.equals(headOption)) {
                            throw new MatchError(headOption);
                        }
                        successful = Future$.MODULE$.successful(BoxedUnit.UNIT);
                    }
                    return successful;
                }, this.executionContext());
                flatMap3.onComplete(r63 -> {
                    $anonfun$executeDag$18(this, headOption, r63);
                    return BoxedUnit.UNIT;
                }, this.executionContext());
                future = flatMap3;
            }
            return future;
        }, executionContext());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<BoxedUnit> updateJob(JobInstance jobInstance) {
        return this.jobInstanceRepository.updateJob(jobInstance, executionContext());
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$2(JobInstance jobInstance) {
        return jobInstance.jobStatus().isFailed();
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$3(JobInstance jobInstance) {
        return !jobInstance.jobStatus().isFinalStatus();
    }

    public static final /* synthetic */ void $anonfun$executeDag$7(BoxedUnit boxedUnit) {
    }

    public static final /* synthetic */ void $anonfun$executeDag$8(Executors executors, DagInstance dagInstance, Try r7) {
        if (!(r7 instanceof Failure)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        executors.logger().error(new StringBuilder(57).append("Updating status failed for failed run. Dag instance id = ").append(dagInstance.id()).toString(), ((Failure) r7).exception());
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$9(JobInstance jobInstance) {
        return jobInstance.jobStatus().isFinalStatus() && !jobInstance.jobStatus().isFailed();
    }

    public static final /* synthetic */ void $anonfun$executeDag$11(BoxedUnit boxedUnit) {
    }

    public static final /* synthetic */ void $anonfun$executeDag$12(Executors executors, DagInstance dagInstance, Try r7) {
        if (!(r7 instanceof Failure)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        executors.logger().error(new StringBuilder(61).append("Updating status failed for successful run. Dag instance id = ").append(dagInstance.id()).toString(), ((Failure) r7).exception());
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$executeDag$13(JobInstance jobInstance) {
        return !jobInstance.jobStatus().isFinalStatus();
    }

    public static final /* synthetic */ void $anonfun$executeDag$18(Executors executors, Option option, Try r7) {
        if (r7 instanceof Success) {
            executors.logger().debug(new StringBuilder(33).append("Executing job. Job instance id = ").append(option).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r7 instanceof Failure)) {
                throw new MatchError(r7);
            }
            executors.logger().error(new StringBuilder(41).append("Executing job failed. Job instance id = ").append(option).append(".").toString(), ((Failure) r7).exception());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    @Inject
    public Executors(DagInstanceRepository dagInstanceRepository, JobInstanceRepository jobInstanceRepository, NotificationSender notificationSender, BeanFactory beanFactory, SparkConfig sparkConfig, SchedulerConfig schedulerConfig) {
        SparkClusterService sparkClusterService;
        this.dagInstanceRepository = dagInstanceRepository;
        this.jobInstanceRepository = jobInstanceRepository;
        this.notificationSender = notificationSender;
        this.sparkConfig = sparkConfig;
        this.executionContext = ExecutionContext$.MODULE$.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(schedulerConfig.executors().threadPoolSize()));
        String lowerCase = sparkConfig.submitApi().toLowerCase();
        if (MRConfig.YARN_FRAMEWORK_NAME.equals(lowerCase)) {
            logger().info("Using yarn cluster");
            sparkClusterService = (SparkClusterService) beanFactory.getBean(SparkYarnClusterServiceImpl.class);
        } else {
            if (!"emr".equals(lowerCase)) {
                throw new IllegalArgumentException("Invalid spark cluster api - use one of: yarn, emr");
            }
            logger().info("Using emr cluster");
            sparkClusterService = (SparkClusterService) beanFactory.getBean(SparkEmrClusterServiceImpl.class);
        }
        this.sparkClusterService = sparkClusterService;
    }
}
