/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.DefaultJobTable;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.JobTable;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesBuilder;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.TestTaskManagerActions;
import org.apache.flink.runtime.taskexecutor.TestingJobServices;
import org.apache.flink.runtime.taskexecutor.TestingPartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutor;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.TriFunction;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

class TaskSubmissionTestEnvironment
implements AutoCloseable {
    private final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
    private final TestingRpcService testingRpcService;
    private final BlobCacheService blobCacheService = new BlobCacheService(new Configuration(), (BlobView)new VoidBlobStore(), null);
    private final Time timeout = Time.milliseconds((long)10000L);
    private final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    private final TimerService<AllocationID> timerService = new DefaultTimerService(TestingUtils.defaultExecutor(), this.timeout.toMilliseconds());
    private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    private final TemporaryFolder temporaryFolder;
    private final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable;
    private final JobMasterId jobMasterId;
    private TestingTaskExecutor taskExecutor;

    private TaskSubmissionTestEnvironment(JobID jobId, JobMasterId jobMasterId, int slotSize, TestingJobMasterGateway testingJobMasterGateway, Configuration configuration, List<Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>> taskManagerActionListeners, @Nullable String metricQueryServiceAddress, TestingRpcService testingRpcService, ShuffleEnvironment<?, ?> shuffleEnvironment) throws Exception {
        Object taskManagerActions;
        this.haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
        this.haServices.setJobMasterLeaderRetriever(jobId, new SettableLeaderRetrievalService());
        this.temporaryFolder = new TemporaryFolder();
        this.temporaryFolder.create();
        this.jobMasterId = jobMasterId;
        TaskSlotTableImpl taskSlotTable = slotSize > 0 ? TaskSlotUtils.createTaskSlotTable(slotSize) : TestingTaskSlotTable.newBuilder().tryMarkSlotActiveReturns(true).addTaskReturns(true).closeAsyncReturns(CompletableFuture.completedFuture(null)).allocateSlotReturns(true).memoryManagerGetterReturns(null).build();
        TestingJobMasterGateway jobMasterGateway = testingJobMasterGateway == null ? new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> jobMasterId).build() : testingJobMasterGateway;
        this.testingRpcService = testingRpcService;
        DefaultJobTable jobTable = DefaultJobTable.create();
        TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(false, new File[]{this.temporaryFolder.newFolder()}, Executors.directExecutor());
        TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setShuffleEnvironment(shuffleEnvironment).setTaskSlotTable((TaskSlotTable<Task>)taskSlotTable).setJobTable((JobTable)jobTable).setTaskStateManager(localStateStoresManager).build();
        this.taskExecutor = this.createTaskExecutor(taskManagerServices, metricQueryServiceAddress, configuration);
        this.taskExecutor.start();
        this.taskExecutor.waitUntilStarted();
        this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable(taskSlotTable, this.taskExecutor.getMainThreadExecutableForTesting());
        if (taskManagerActionListeners.size() == 0) {
            taskManagerActions = new NoOpTaskManagerActions();
        } else {
            TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(this.threadSafeTaskSlotTable, jobMasterGateway);
            for (Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) {
                testTaskManagerActions.addListener((ExecutionAttemptID)listenerTuple.f0, (ExecutionState)listenerTuple.f1, (CompletableFuture)listenerTuple.f2);
            }
            taskManagerActions = testTaskManagerActions;
        }
        TaskSubmissionTestEnvironment.registerJobMasterConnection((JobTable)jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, this.timeout, this.taskExecutor.getMainThreadExecutableForTesting());
    }

    static void registerJobMasterConnection(JobTable jobTable, JobID jobId, RpcService testingRpcService, JobMasterGateway jobMasterGateway, TaskManagerActions taskManagerActions, Time timeout, MainThreadExecutable mainThreadExecutable) {
        mainThreadExecutable.runAsync(() -> {
            JobTable.Job job = jobTable.getOrCreateJob(jobId, () -> TestingJobServices.newBuilder().build());
            job.connect(ResourceID.generate(), jobMasterGateway, taskManagerActions, (CheckpointResponder)new TestCheckpointResponder(), (GlobalAggregateManager)new TestGlobalAggregateManager(), (ResultPartitionConsumableNotifier)new RpcResultPartitionConsumableNotifier(jobMasterGateway, (Executor)testingRpcService.getScheduledExecutor(), timeout), (PartitionProducerStateChecker)TestingPartitionProducerStateChecker.newBuilder().setPartitionProducerStateFunction((TriFunction<JobID, IntermediateDataSetID, ResultPartitionID, CompletableFuture<ExecutionState>>)((TriFunction)(jobID, intermediateDataSetID, resultPartitionID) -> CompletableFuture.completedFuture(ExecutionState.RUNNING))).build());
        });
    }

    public TestingTaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    public TaskExecutorGateway getTaskExecutorGateway() {
        return (TaskExecutorGateway)this.taskExecutor.getSelfGateway(TaskExecutorGateway.class);
    }

    public TaskSlotTable<Task> getTaskSlotTable() {
        return this.threadSafeTaskSlotTable;
    }

    public JobMasterId getJobMasterId() {
        return this.jobMasterId;
    }

    public TestingFatalErrorHandler getTestingFatalErrorHandler() {
        return this.testingFatalErrorHandler;
    }

    @Nonnull
    private TestingTaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices, @Nullable String metricQueryServiceAddress, Configuration configuration) {
        Configuration copiedConf = new Configuration(configuration);
        return new TestingTaskExecutor(this.testingRpcService, TaskManagerConfiguration.fromConfiguration((Configuration)copiedConf, (TaskExecutorResourceSpec)TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution((Configuration)copiedConf), (String)InetAddress.getLoopbackAddress().getHostAddress()), this.haServices, taskManagerServices, ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, this.heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), metricQueryServiceAddress, this.blobCacheService, this.testingFatalErrorHandler, (TaskExecutorPartitionTracker)new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()));
    }

    private static ShuffleEnvironment<?, ?> createShuffleEnvironment(ResourceID taskManagerLocation, boolean localCommunication, Configuration configuration, RpcService testingRpcService, boolean mockShuffleEnvironment) throws Exception {
        ShuffleEnvironment shuffleEnvironment;
        if (mockShuffleEnvironment) {
            shuffleEnvironment = (ShuffleEnvironment)Mockito.mock(ShuffleEnvironment.class, (Answer)Mockito.RETURNS_MOCKS);
        } else {
            InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(NettyShuffleEnvironmentOptions.DATA_PORT));
            NettyConfig nettyConfig = new NettyConfig(socketAddress.getAddress(), socketAddress.getPort(), ConfigurationParserUtils.getPageSize((Configuration)configuration), ConfigurationParserUtils.getSlot((Configuration)configuration), configuration);
            shuffleEnvironment = new NettyShuffleEnvironmentBuilder().setTaskManagerLocation(taskManagerLocation).setPartitionRequestInitialBackoff(configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL)).setPartitionRequestMaxBackoff(configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX)).setNettyConfig(localCommunication ? null : nettyConfig).build();
            shuffleEnvironment.start();
        }
        return shuffleEnvironment;
    }

    @Override
    public void close() throws Exception {
        this.testingRpcService.stopService().join();
        this.timerService.stop();
        this.blobCacheService.close();
        this.temporaryFolder.delete();
        this.testingFatalErrorHandler.rethrowError();
    }

    public static final class Builder {
        private JobID jobId;
        private boolean mockShuffleEnvironment = true;
        private int slotSize;
        private JobMasterId jobMasterId = JobMasterId.generate();
        private TestingJobMasterGateway jobMasterGateway;
        private boolean localCommunication = true;
        private Configuration configuration = new Configuration();
        private Optional<ShuffleEnvironment<?, ?>> optionalShuffleEnvironment = Optional.empty();
        private ResourceID resourceID = ResourceID.generate();
        @Nullable
        private String metricQueryServiceAddress;
        private List<Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>> taskManagerActionListeners = new ArrayList<Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>>();

        public Builder(JobID jobId) {
            this.jobId = jobId;
        }

        public Builder setMetricQueryServiceAddress(String metricQueryServiceAddress) {
            this.metricQueryServiceAddress = metricQueryServiceAddress;
            return this;
        }

        public Builder useRealNonMockShuffleEnvironment() {
            this.optionalShuffleEnvironment = Optional.empty();
            this.mockShuffleEnvironment = false;
            return this;
        }

        public Builder setShuffleEnvironment(ShuffleEnvironment<?, ?> optionalShuffleEnvironment) {
            this.mockShuffleEnvironment = false;
            this.optionalShuffleEnvironment = Optional.of(optionalShuffleEnvironment);
            return this;
        }

        public Builder setSlotSize(int slotSize) {
            this.slotSize = slotSize;
            return this;
        }

        public Builder setJobMasterId(JobMasterId jobMasterId) {
            this.jobMasterId = jobMasterId;
            return this;
        }

        public Builder setJobMasterGateway(TestingJobMasterGateway jobMasterGateway) {
            this.jobMasterGateway = jobMasterGateway;
            return this;
        }

        public Builder setLocalCommunication(boolean localCommunication) {
            this.localCommunication = localCommunication;
            return this;
        }

        public Builder setConfiguration(Configuration configuration) {
            this.configuration = configuration;
            return this;
        }

        public Builder addTaskManagerActionListener(ExecutionAttemptID eid, ExecutionState executionState, CompletableFuture<Void> future) {
            this.taskManagerActionListeners.add((Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>)Tuple3.of((Object)eid, (Object)executionState, future));
            return this;
        }

        public Builder setResourceID(ResourceID resourceID) {
            this.resourceID = resourceID;
            return this;
        }

        public TaskSubmissionTestEnvironment build() throws Exception {
            TestingRpcService testingRpcService = new TestingRpcService();
            ShuffleEnvironment network = this.optionalShuffleEnvironment.orElseGet(() -> {
                try {
                    return TaskSubmissionTestEnvironment.createShuffleEnvironment(this.resourceID, this.localCommunication, this.configuration, testingRpcService, this.mockShuffleEnvironment);
                }
                catch (Exception e) {
                    throw new FlinkRuntimeException("Failed to build TaskSubmissionTestEnvironment", (Throwable)e);
                }
            });
            return new TaskSubmissionTestEnvironment(this.jobId, this.jobMasterId, this.slotSize, this.jobMasterGateway, this.configuration, this.taskManagerActionListeners, this.metricQueryServiceAddress, testingRpcService, network);
        }
    }
}

