package org.apache.spark.deploy;

import java.util.concurrent.TimeoutException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.deploy.master.RecoveryState$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.Deploy$;
import org.apache.spark.util.ThreadUtils$;
import org.slf4j.Logger;
import scala.App;
import scala.Enumeration;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.IterableOnceOps;
import scala.collection.StringOps$;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.package;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.sys.package$;

/* compiled from: FaultToleranceTest.scala */
/* loaded from: input_file:org/apache/spark/deploy/FaultToleranceTest$.class */
public final class FaultToleranceTest$ implements App, Logging {
    public static final FaultToleranceTest$ MODULE$ = new FaultToleranceTest$();
    private static SparkConf conf;
    private static String zkDir;
    private static ListBuffer<TestMasterInfo> masters;
    private static ListBuffer<TestWorkerInfo> workers;
    private static SparkContext sc;
    private static CuratorFramework zk;
    private static int numPassed;
    private static int numFailed;
    private static String sparkHome;
    private static String containerSparkHome;
    private static String dockerMountDir;
    private static transient Logger org$apache$spark$internal$Logging$$log_;
    private static long executionStart;
    private static String[] scala$App$$_args;
    private static ListBuffer<Function0<BoxedUnit>> scala$App$$initCode;

    static {
        App.$init$(MODULE$);
        Logging.$init$(MODULE$);
        FaultToleranceTest$ faultToleranceTest$ = MODULE$;
        final FaultToleranceTest$ faultToleranceTest$2 = MODULE$;
        faultToleranceTest$.delayedInit(new AbstractFunction0(faultToleranceTest$2) { // from class: org.apache.spark.deploy.FaultToleranceTest$delayedInit$body
            private final FaultToleranceTest$ $outer;

            public final Object apply() {
                this.$outer.delayedEndpoint$org$apache$spark$deploy$FaultToleranceTest$1();
                return BoxedUnit.UNIT;
            }

            {
                if (faultToleranceTest$2 == null) {
                    throw null;
                }
                this.$outer = faultToleranceTest$2;
            }
        });
        Statics.releaseFence();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public final String[] args() {
        return App.args$(this);
    }

    public void delayedInit(Function0<BoxedUnit> function0) {
        App.delayedInit$(this, function0);
    }

    public final void main(String[] strArr) {
        App.main$(this, strArr);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        org$apache$spark$internal$Logging$$log_ = logger;
    }

    public final long executionStart() {
        return executionStart;
    }

    public String[] scala$App$$_args() {
        return scala$App$$_args;
    }

    public void scala$App$$_args_$eq(String[] strArr) {
        scala$App$$_args = strArr;
    }

    public ListBuffer<Function0<BoxedUnit>> scala$App$$initCode() {
        return scala$App$$initCode;
    }

    public final void scala$App$_setter_$executionStart_$eq(long j) {
        executionStart = j;
    }

    public final void scala$App$_setter_$scala$App$$initCode_$eq(ListBuffer<Function0<BoxedUnit>> listBuffer) {
        scala$App$$initCode = listBuffer;
    }

    private SparkConf conf() {
        return conf;
    }

    private String zkDir() {
        return zkDir;
    }

    private ListBuffer<TestMasterInfo> masters() {
        return masters;
    }

    private ListBuffer<TestWorkerInfo> workers() {
        return workers;
    }

    private SparkContext sc() {
        return sc;
    }

    private void sc_$eq(SparkContext sparkContext) {
        sc = sparkContext;
    }

    private CuratorFramework zk() {
        return zk;
    }

    private int numPassed() {
        return numPassed;
    }

    private void numPassed_$eq(int i) {
        numPassed = i;
    }

    private int numFailed() {
        return numFailed;
    }

    private void numFailed_$eq(int i) {
        numFailed = i;
    }

    private String sparkHome() {
        return sparkHome;
    }

    private String containerSparkHome() {
        return containerSparkHome;
    }

    private String dockerMountDir() {
        return dockerMountDir;
    }

    private void afterEach() {
        if (sc() != null) {
            sc().stop();
            sc_$eq(null);
        }
        terminateCluster();
        SparkCuratorUtil$.MODULE$.deleteRecursive(zk(), new StringBuilder(13).append(zkDir()).append("/spark_leader").toString());
        SparkCuratorUtil$.MODULE$.deleteRecursive(zk(), new StringBuilder(14).append(zkDir()).append("/master_status").toString());
    }

    private void test(String str, Function0<BoxedUnit> function0) {
        try {
            function0.apply$mcV$sp();
            numPassed_$eq(numPassed() + 1);
            logInfo(() -> {
                return "==============================================";
            });
            logInfo(() -> {
                return new StringBuilder(8).append("Passed: ").append(str).toString();
            });
            logInfo(() -> {
                return "==============================================";
            });
            afterEach();
        } catch (Exception e) {
            numFailed_$eq(numFailed() + 1);
            logInfo(() -> {
                return "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
            });
            logError(() -> {
                return new StringBuilder(8).append("FAILED: ").append(str).toString();
            }, e);
            logInfo(() -> {
                return "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!";
            });
            throw package$.MODULE$.exit(1);
        }
    }

    private void addMasters(int i) {
        logInfo(() -> {
            return new StringBuilder(24).append(">>>>> ADD MASTERS ").append(i).append(" <<<<<").toString();
        });
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i).foreach(obj -> {
            return $anonfun$addMasters$2(BoxesRunTime.unboxToInt(obj));
        });
    }

    private void addWorkers(int i) {
        logInfo(() -> {
            return new StringBuilder(24).append(">>>>> ADD WORKERS ").append(i).append(" <<<<<").toString();
        });
        String masterUrls = getMasterUrls(masters().toSeq());
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), i).foreach(obj -> {
            return $anonfun$addWorkers$2(masterUrls, BoxesRunTime.unboxToInt(obj));
        });
    }

    private void createClient() {
        logInfo(() -> {
            return ">>>>> CREATE CLIENT <<<<<";
        });
        if (sc() != null) {
            sc().stop();
        }
        System.setProperty(org.apache.spark.internal.config.package$.MODULE$.DRIVER_PORT().key(), "0");
        sc_$eq(new SparkContext(getMasterUrls(masters().toSeq()), "fault-tolerance", containerSparkHome()));
    }

    private String getMasterUrls(Seq<TestMasterInfo> seq) {
        return new StringBuilder(8).append("spark://").append(((IterableOnceOps) seq.map(testMasterInfo -> {
            return new StringBuilder(5).append(testMasterInfo.ip()).append(":7077").toString();
        })).mkString(",")).toString();
    }

    private TestMasterInfo getLeader() {
        ListBuffer listBuffer = (ListBuffer) masters().filter(testMasterInfo -> {
            return BoxesRunTime.boxToBoolean($anonfun$getLeader$1(testMasterInfo));
        });
        assertTrue(listBuffer.size() == 1, assertTrue$default$2());
        return (TestMasterInfo) listBuffer.apply(0);
    }

    private void killLeader() {
        logInfo(() -> {
            return ">>>>> KILL LEADER <<<<<";
        });
        masters().foreach(testMasterInfo -> {
            testMasterInfo.readState();
            return BoxedUnit.UNIT;
        });
        TestMasterInfo leader = getLeader();
        masters().$minus$eq(leader);
        leader.kill();
    }

    private void delay(Duration duration) {
        Thread.sleep(duration.toMillis());
    }

    private Duration delay$default$1() {
        return new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(5)).seconds();
    }

    private void terminateCluster() {
        logInfo(() -> {
            return ">>>>> TERMINATE CLUSTER <<<<<";
        });
        masters().foreach(testMasterInfo -> {
            testMasterInfo.kill();
            return BoxedUnit.UNIT;
        });
        workers().foreach(testWorkerInfo -> {
            testWorkerInfo.kill();
            return BoxedUnit.UNIT;
        });
        masters().clear();
        workers().clear();
    }

    private void assertUsable() {
        assertTrue(BoxesRunTime.unboxToBoolean(ThreadUtils$.MODULE$.awaitResult((Awaitable) Future$.MODULE$.apply(() -> {
            boolean z;
            try {
                SparkContext sc2 = MODULE$.sc();
                int[] iArr = (int[]) sc2.parallelize(RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10), sc2.parallelize$default$2(), ClassTag$.MODULE$.Int()).collect();
                FaultToleranceTest$ faultToleranceTest$ = MODULE$;
                List list = Predef$.MODULE$.wrapIntArray(iArr).toList();
                List list2 = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 10).toList();
                if (list == null) {
                    z = list2 == null;
                }
                faultToleranceTest$.assertTrue(z, MODULE$.assertTrue$default$2());
                return true;
            } catch (Exception e) {
                MODULE$.logError(() -> {
                    return "assertUsable() had exception";
                }, e);
                e.printStackTrace();
                return false;
            }
        }, ExecutionContext$Implicits$.MODULE$.global()), (Duration) new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minutes())), assertTrue$default$2());
    }

    private void assertValidClusterState() {
        logInfo(() -> {
            return ">>>>> ASSERT VALID CLUSTER STATE <<<<<";
        });
        assertUsable();
        IntRef create = IntRef.create(0);
        IntRef create2 = IntRef.create(0);
        IntRef create3 = IntRef.create(0);
        ObjectRef create4 = ObjectRef.create((Seq) scala.package$.MODULE$.List().apply(Nil$.MODULE$));
        try {
            assertTrue(BoxesRunTime.unboxToBoolean(ThreadUtils$.MODULE$.awaitResult((Awaitable) Future$.MODULE$.apply(() -> {
                while (!this.stateValid$1(create4, create, create2, create3)) {
                    try {
                        Thread.sleep(1000L);
                        create.elem = 0;
                        create2.elem = 0;
                        create3.elem = 0;
                        MODULE$.masters().foreach(testMasterInfo -> {
                            testMasterInfo.readState();
                            return BoxedUnit.UNIT;
                        });
                        MODULE$.masters().foreach(testMasterInfo2 -> {
                            $anonfun$assertValidClusterState$6(create, create4, create2, create3, testMasterInfo2);
                            return BoxedUnit.UNIT;
                        });
                    } catch (Exception e) {
                        MODULE$.logError(() -> {
                            return "assertValidClusterState() had exception";
                        }, e);
                        return false;
                    }
                }
                return true;
            }, ExecutionContext$Implicits$.MODULE$.global()), (Duration) new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).minutes())), assertTrue$default$2());
        } catch (TimeoutException e) {
            logError(() -> {
                return new StringBuilder(15).append("Master states: ").append(MODULE$.masters().map(testMasterInfo -> {
                    return testMasterInfo.state();
                })).toString();
            });
            logError(() -> {
                return new StringBuilder(10).append("Num apps: ").append(create3.elem).toString();
            });
            logError(() -> {
                return new StringBuilder(24).append("IPs expected: ").append(MODULE$.workers().map(testWorkerInfo -> {
                    return testWorkerInfo.ip();
                })).append(" / found: ").append((Seq) create4.elem).toString();
            });
            throw new RuntimeException("Failed to get into acceptable cluster state after 2 min.", e);
        }
    }

    private void assertTrue(boolean z, String str) {
        if (!z) {
            throw new IllegalStateException(new StringBuilder(18).append("Assertion failed: ").append(str).toString());
        }
    }

    private String assertTrue$default$2() {
        return "";
    }

    public static final /* synthetic */ ListBuffer $anonfun$addMasters$2(int i) {
        return MODULE$.masters().$plus$eq(SparkDocker$.MODULE$.startMaster(MODULE$.dockerMountDir()));
    }

    public static final /* synthetic */ ListBuffer $anonfun$addWorkers$2(String str, int i) {
        return MODULE$.workers().$plus$eq(SparkDocker$.MODULE$.startWorker(MODULE$.dockerMountDir(), str));
    }

    public static final /* synthetic */ boolean $anonfun$getLeader$1(TestMasterInfo testMasterInfo) {
        Enumeration.Value state = testMasterInfo.state();
        Enumeration.Value ALIVE = RecoveryState$.MODULE$.ALIVE();
        return state != null ? state.equals(ALIVE) : ALIVE == null;
    }

    private final boolean stateValid$1(ObjectRef objectRef, IntRef intRef, IntRef intRef2, IntRef intRef3) {
        IterableOnceOps iterableOnceOps = (IterableOnceOps) workers().map(testWorkerInfo -> {
            return testWorkerInfo.ip();
        });
        Seq seq = (Seq) objectRef.elem;
        return iterableOnceOps.forall(obj -> {
            return BoxesRunTime.boxToBoolean(seq.contains(obj));
        }) && intRef.elem == 1 && intRef2.elem == masters().size() - 1 && intRef3.elem >= 1;
    }

    public static final /* synthetic */ void $anonfun$assertValidClusterState$6(IntRef intRef, ObjectRef objectRef, IntRef intRef2, IntRef intRef3, TestMasterInfo testMasterInfo) {
        Enumeration.Value state = testMasterInfo.state();
        Enumeration.Value ALIVE = RecoveryState$.MODULE$.ALIVE();
        if (ALIVE != null ? !ALIVE.equals(state) : state != null) {
            Enumeration.Value STANDBY = RecoveryState$.MODULE$.STANDBY();
            if (STANDBY != null ? !STANDBY.equals(state) : state != null) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                intRef2.elem++;
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        } else {
            intRef.elem++;
            objectRef.elem = testMasterInfo.liveWorkerIPs();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        intRef3.elem += testMasterInfo.numLiveApps();
    }

    public final void delayedEndpoint$org$apache$spark$deploy$FaultToleranceTest$1() {
        conf = new SparkConf();
        zkDir = (String) ((Option) conf().get(Deploy$.MODULE$.ZOOKEEPER_DIRECTORY())).getOrElse(() -> {
            return "/spark";
        });
        masters = (ListBuffer) ListBuffer$.MODULE$.apply(Nil$.MODULE$);
        workers = (ListBuffer) ListBuffer$.MODULE$.apply(Nil$.MODULE$);
        zk = SparkCuratorUtil$.MODULE$.newClient(conf(), SparkCuratorUtil$.MODULE$.newClient$default$2());
        numPassed = 0;
        numFailed = 0;
        sparkHome = System.getenv("SPARK_HOME");
        assertTrue(sparkHome() != null, "Run with a valid SPARK_HOME");
        containerSparkHome = "/opt/spark";
        dockerMountDir = StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("%s:%s"), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{sparkHome(), containerSparkHome()}));
        System.setProperty(org.apache.spark.internal.config.package$.MODULE$.DRIVER_HOST_ADDRESS().key(), "172.17.42.1");
        test("sanity-basic", () -> {
            MODULE$.addMasters(1);
            MODULE$.addWorkers(1);
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
        });
        test("sanity-many-masters", () -> {
            MODULE$.addMasters(3);
            MODULE$.addWorkers(3);
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
        });
        test("single-master-halt", () -> {
            MODULE$.addMasters(3);
            MODULE$.addWorkers(2);
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
            MODULE$.killLeader();
            MODULE$.delay(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
            MODULE$.assertValidClusterState();
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
        });
        test("single-master-restart", () -> {
            MODULE$.addMasters(1);
            MODULE$.addWorkers(2);
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
            MODULE$.killLeader();
            MODULE$.addMasters(1);
            MODULE$.delay(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
            MODULE$.assertValidClusterState();
            MODULE$.killLeader();
            MODULE$.addMasters(1);
            MODULE$.delay(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
            MODULE$.assertValidClusterState();
        });
        test("cluster-failure", () -> {
            MODULE$.addMasters(2);
            MODULE$.addWorkers(2);
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
            MODULE$.terminateCluster();
            MODULE$.addMasters(2);
            MODULE$.addWorkers(2);
            MODULE$.assertValidClusterState();
        });
        test("all-but-standby-failure", () -> {
            MODULE$.addMasters(2);
            MODULE$.addWorkers(2);
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
            MODULE$.killLeader();
            MODULE$.workers().foreach(testWorkerInfo -> {
                testWorkerInfo.kill();
                return BoxedUnit.UNIT;
            });
            MODULE$.workers().clear();
            MODULE$.delay(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
            MODULE$.addWorkers(2);
            MODULE$.assertValidClusterState();
        });
        test("rolling-outage", () -> {
            MODULE$.addMasters(1);
            MODULE$.delay(MODULE$.delay$default$1());
            MODULE$.addMasters(1);
            MODULE$.delay(MODULE$.delay$default$1());
            MODULE$.addMasters(1);
            MODULE$.addWorkers(2);
            MODULE$.createClient();
            MODULE$.assertValidClusterState();
            FaultToleranceTest$ faultToleranceTest$ = MODULE$;
            TestMasterInfo leader = MODULE$.getLeader();
            Object head = MODULE$.masters().head();
            faultToleranceTest$.assertTrue(leader != null ? leader.equals(head) : head == null, MODULE$.assertTrue$default$2());
            RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 3).foreach$mVc$sp(i -> {
                MODULE$.killLeader();
                MODULE$.delay(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(30)).seconds());
                MODULE$.assertValidClusterState();
                FaultToleranceTest$ faultToleranceTest$2 = MODULE$;
                TestMasterInfo leader2 = MODULE$.getLeader();
                Object head2 = MODULE$.masters().head();
                faultToleranceTest$2.assertTrue(leader2 != null ? leader2.equals(head2) : head2 == null, MODULE$.assertTrue$default$2());
                MODULE$.addMasters(1);
            });
        });
        logInfo(() -> {
            return StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Ran %s tests, %s passed and %s failed"), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(MODULE$.numPassed() + MODULE$.numFailed()), BoxesRunTime.boxToInteger(MODULE$.numPassed()), BoxesRunTime.boxToInteger(MODULE$.numFailed())}));
        });
    }

    private FaultToleranceTest$() {
    }
}
