/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.client.impl.ClientEngine;
import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MergePolicyConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastBootstrap;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.instance.impl.NodeState;
import com.hazelcast.internal.cluster.ClusterStateListener;
import com.hazelcast.internal.metrics.impl.MetricsService;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.MembershipAwareService;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JetService;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.impl.JetInstanceImpl;
import com.hazelcast.jet.impl.JobClassLoaderService;
import com.hazelcast.jet.impl.JobCoordinationService;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.LiveOperationRegistry;
import com.hazelcast.jet.impl.Networking;
import com.hazelcast.jet.impl.execution.TaskletExecutionService;
import com.hazelcast.jet.impl.metrics.JobMetricsPublisher;
import com.hazelcast.jet.impl.operation.NotifyMemberShutdownOperation;
import com.hazelcast.jet.impl.operation.PrepareForPassiveClusterOperation;
import com.hazelcast.jet.impl.serialization.DelegatingSerializationService;
import com.hazelcast.jet.impl.submitjob.memberside.JobMetaDataParameterObject;
import com.hazelcast.jet.impl.submitjob.memberside.JobMultiPartParameterObject;
import com.hazelcast.jet.impl.submitjob.memberside.JobUploadStatus;
import com.hazelcast.jet.impl.submitjob.memberside.JobUploadStore;
import com.hazelcast.jet.impl.submitjob.memberside.validator.JarOnClientValidator;
import com.hazelcast.jet.impl.submitjob.memberside.validator.JarOnMemberValidator;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.merge.DiscardMergePolicy;
import com.hazelcast.spi.merge.LatestUpdateMergePolicy;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class JetServiceBackend
implements ManagedService,
MembershipAwareService,
ClusterStateListener,
LiveOperationsTracker,
Consumer<Packet> {
    public static final String SERVICE_NAME = "hz:impl:jetService";
    public static final String SQL_ARGUMENTS_KEY_NAME = "__sql.arguments";
    public static final String SQL_CATALOG_MAP_NAME = "__sql.catalog";
    public static final int MAX_PARALLEL_ASYNC_OPS = 1000;
    private static final int NOTIFY_MEMBER_SHUTDOWN_DELAY = 5;
    private static final int SHUTDOWN_JOBS_MAX_WAIT_SECONDS = 10;
    private static final int JOB_UPLOAD_STORE_PERIOD = 30;
    private NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final LiveOperationRegistry liveOperationRegistry;
    private final AtomicReference<CompletableFuture<Void>> shutdownFuture = new AtomicReference();
    private final JetConfig jetConfig;
    private JetService jet;
    private Networking networking;
    private TaskletExecutionService taskletExecutionService;
    private JobRepository jobRepository;
    private JobCoordinationService jobCoordinationService;
    private JobClassLoaderService jobClassLoaderService;
    private JobExecutionService jobExecutionService;
    private final AtomicInteger numConcurrentAsyncOps = new AtomicInteger();
    private final Supplier<int[]> sharedPartitionKeys = Util.memoizeConcurrent(this::computeSharedPartitionKeys);
    private final JobUploadStore jobUploadStore = new JobUploadStore();
    private ScheduledFuture<?> jobUploadStoreCheckerFuture;

    public JetServiceBackend(Node node) {
        this.logger = node.getLogger(this.getClass());
        this.liveOperationRegistry = new LiveOperationRegistry();
        this.jetConfig = node.getConfig().getJetConfig();
    }

    @Override
    public void init(NodeEngine engine, Properties hzProperties) {
        this.nodeEngine = (NodeEngineImpl)engine;
        this.jet = new JetInstanceImpl(this.nodeEngine.getNode().hazelcastInstance, this.jetConfig);
        this.jobRepository = new JobRepository(engine.getHazelcastInstance());
        this.taskletExecutionService = new TaskletExecutionService(this.nodeEngine, this.jetConfig.getCooperativeThreadCount(), this.nodeEngine.getProperties());
        this.jobCoordinationService = this.createJobCoordinationService();
        this.jobClassLoaderService = new JobClassLoaderService(this.nodeEngine, this.jobRepository);
        this.jobExecutionService = new JobExecutionService(this.nodeEngine, this.taskletExecutionService, this.jobClassLoaderService);
        MetricsService metricsService = (MetricsService)this.nodeEngine.getService("hz:impl:metricsService");
        metricsService.registerPublisher(nodeEngine -> new JobMetricsPublisher(this.jobExecutionService, nodeEngine.getLocalMember()));
        this.nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(this.jobExecutionService);
        this.networking = new Networking(engine, this.jobExecutionService, this.jetConfig.getFlowControlPeriodMs());
        ClientEngine clientEngine = (ClientEngine)engine.getService("hz:core:clientEngine");
        ClientExceptionFactory clientExceptionFactory = clientEngine.getExceptionFactory();
        if (clientExceptionFactory != null) {
            ExceptionUtil.registerJetExceptions(clientExceptionFactory);
        } else {
            this.logger.fine("Jet exceptions are not registered to the ClientExceptionFactory since the ClientExceptionFactory is not accessible.");
        }
        this.logger.info("Setting number of cooperative threads and default parallelism to " + this.jetConfig.getCooperativeThreadCount());
        this.jobUploadStoreCheckerFuture = this.nodeEngine.getExecutionService().scheduleWithRepetition(this.jobUploadStore::cleanExpiredUploads, 0L, 30L, TimeUnit.SECONDS);
    }

    public void configureJetInternalObjects(Config config, HazelcastProperties properties) {
        JetConfig jetConfig = config.getJetConfig();
        MapConfig internalMapConfig = new MapConfig("__jet.*").setBackupCount(jetConfig.getBackupCount()).setStatisticsEnabled(true);
        internalMapConfig.getMergePolicyConfig().setPolicy(DiscardMergePolicy.class.getName());
        MapConfig resultsMapConfig = new MapConfig(internalMapConfig).setName("__jet.results").setTimeToLiveSeconds(properties.getSeconds(ClusterProperty.JOB_RESULTS_TTL_SECONDS));
        MapConfig metricsMapConfig = new MapConfig(internalMapConfig).setName("__jet.results.metrics").setTimeToLiveSeconds(properties.getSeconds(ClusterProperty.JOB_RESULTS_TTL_SECONDS));
        config.addMapConfig(internalMapConfig).addMapConfig(resultsMapConfig).addMapConfig(metricsMapConfig).addMapConfig(JetServiceBackend.createSqlCatalogConfig());
    }

    static MapConfig createSqlCatalogConfig() {
        return new MapConfig(SQL_CATALOG_MAP_NAME).setBackupCount(6).setAsyncBackupCount(0).setTimeToLiveSeconds(0).setReadBackupData(true).setMergePolicyConfig(new MergePolicyConfig().setPolicy(LatestUpdateMergePolicy.class.getName())).setPerEntryStatsEnabled(true);
    }

    public void shutDownJobs() {
        if (this.shutdownFuture.compareAndSet(null, new CompletableFuture())) {
            this.notifyMasterWeAreShuttingDown(this.shutdownFuture.get());
        }
        try {
            CompletableFuture<Void> future = this.shutdownFuture.get();
            future.get(10L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            this.logger.severe("Shutdown jobs timeout", e);
        }
    }

    private void notifyMasterWeAreShuttingDown(CompletableFuture<Void> future) {
        NotifyMemberShutdownOperation op = new NotifyMemberShutdownOperation();
        this.nodeEngine.getOperationService().invokeOnTarget(SERVICE_NAME, op, this.nodeEngine.getClusterService().getMasterAddress()).whenCompleteAsync((response, throwable) -> {
            NodeState nodeState = this.nodeEngine.getNode().getState();
            if (throwable != null && nodeState == NodeState.ACTIVE) {
                this.logger.warning("Failed to notify master member that this member is shutting down, will retry in 5 seconds", (Throwable)throwable);
                this.nodeEngine.getExecutionService().schedule(() -> this.notifyMasterWeAreShuttingDown(future), 5L, TimeUnit.SECONDS);
            } else {
                if (throwable != null) {
                    this.logger.warning("Failed to notify master member that this member is shutting down, but this member is " + (Object)((Object)nodeState) + ", so not retrying", (Throwable)throwable);
                }
                future.complete(null);
            }
        });
    }

    @Override
    public void shutdown(boolean forceful) {
        this.jobUploadStoreCheckerFuture.cancel(true);
        this.jobExecutionService.shutdown();
        this.taskletExecutionService.shutdown();
        this.taskletExecutionService.awaitWorkerTermination();
        this.networking.shutdown();
    }

    @Override
    public void reset() {
        this.jobExecutionService.reset();
        this.jobCoordinationService.reset();
    }

    JobCoordinationService createJobCoordinationService() {
        return new JobCoordinationService(this.nodeEngine, this, this.jetConfig, this.jobRepository);
    }

    public InternalSerializationService createSerializationService(Map<String, String> serializerConfigs) {
        return DelegatingSerializationService.from(this.getNodeEngine().getSerializationService(), serializerConfigs);
    }

    public Operation createExportSnapshotOperation(long jobId, String name, boolean cancelJob) {
        throw new UnsupportedOperationException("You need Hazelcast Enterprise to use this feature");
    }

    public JetService getJet() {
        return this.jet;
    }

    public LiveOperationRegistry getLiveOperationRegistry() {
        return this.liveOperationRegistry;
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public NodeEngineImpl getNodeEngine() {
        return this.nodeEngine;
    }

    public JetConfig getJetConfig() {
        return this.jetConfig;
    }

    public JobCoordinationService getJobCoordinationService() {
        return this.jobCoordinationService;
    }

    public JobClassLoaderService getJobClassLoaderService() {
        return this.jobClassLoaderService;
    }

    public JobExecutionService getJobExecutionService() {
        return this.jobExecutionService;
    }

    public JobConfig getJobConfig(long jobId, boolean isLightJob) {
        if (isLightJob) {
            return this.jobCoordinationService.getLightJobConfig(jobId);
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return jobRecord.getConfig();
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getJobConfig();
        }
        throw new JobNotFoundException(jobId);
    }

    @Override
    public void accept(Packet packet) {
        try {
            this.networking.handle(packet);
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override
    public void memberRemoved(MembershipServiceEvent event) {
        this.jobExecutionService.onMemberRemoved(event.getMember());
        this.jobCoordinationService.onMemberRemoved(event.getMember().getUuid());
    }

    @Override
    public void memberAdded(MembershipServiceEvent event) {
        this.jobCoordinationService.onMemberAdded(event.getMember());
    }

    @Override
    public void onClusterStateChange(ClusterState newState) {
        this.getJobCoordinationService().clusterChangeDone();
    }

    public AtomicInteger numConcurrentAsyncOps() {
        return this.numConcurrentAsyncOps;
    }

    @Override
    public void populate(LiveOperations liveOperations) {
        this.liveOperationRegistry.populate(liveOperations);
    }

    public int[] getSharedPartitionKeys() {
        return this.sharedPartitionKeys.get();
    }

    private int[] computeSharedPartitionKeys() {
        InternalPartitionService partitionService = this.nodeEngine.getPartitionService();
        int[] keys2 = new int[partitionService.getPartitionCount()];
        int remainingCount = partitionService.getPartitionCount();
        int i = 1;
        while (remainingCount > 0) {
            int partitionId = partitionService.getPartitionId(i);
            if (keys2[partitionId] == 0) {
                keys2[partitionId] = i;
                --remainingCount;
            }
            ++i;
        }
        return keys2;
    }

    public TaskletExecutionService getTaskletExecutionService() {
        return this.taskletExecutionService;
    }

    public void beforeClusterStateChange(ClusterState requestedState) {
        if (requestedState == ClusterState.PASSIVE) {
            try {
                this.nodeEngine.getOperationService().createInvocationBuilder(SERVICE_NAME, (Operation)new PrepareForPassiveClusterOperation(), this.nodeEngine.getMasterAddress()).invoke().get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw com.hazelcast.internal.util.ExceptionUtil.rethrow(e);
            }
        }
    }

    public void startScanningForJobs() {
        this.jobCoordinationService.startScanningForJobs();
    }

    public void jarOnMember(JobMetaDataParameterObject jobMetaDataParameterObject) {
        JarOnMemberValidator.validate(jobMetaDataParameterObject);
        this.executeJar(jobMetaDataParameterObject);
    }

    public void jarOnClient(JobMetaDataParameterObject jobMetaDataParameterObject) {
        this.checkResourceUploadEnabled();
        JarOnClientValidator.validate(jobMetaDataParameterObject);
        try {
            this.jobUploadStore.processJobMetaData(jobMetaDataParameterObject);
        }
        catch (Exception exception) {
            this.jobUploadStore.removeBadSession(jobMetaDataParameterObject.getSessionId());
            this.throwJetExceptionFromJobMetaData(jobMetaDataParameterObject, exception);
        }
    }

    public void storeJobMultiPart(JobMultiPartParameterObject jobMultiPartParameterObject) {
        try {
            JobMetaDataParameterObject partsComplete = this.jobUploadStore.processJobMultipart(jobMultiPartParameterObject);
            if (partsComplete != null) {
                this.executeJar(partsComplete);
            }
        }
        catch (Exception exception) {
            JobUploadStatus jobUploadStatus = this.jobUploadStore.removeBadSession(jobMultiPartParameterObject.getSessionId());
            if (jobUploadStatus != null) {
                JobMetaDataParameterObject jobMetaDataParameterObject = jobUploadStatus.getJobMetaDataParameterObject();
                if (jobMetaDataParameterObject != null) {
                    this.throwJetExceptionFromJobMetaData(jobMetaDataParameterObject, exception);
                }
            }
            JetServiceBackend.wrapWithJetException(exception);
        }
    }

    private void throwJetExceptionFromJobMetaData(JobMetaDataParameterObject jobMetaDataParameterObject, Exception exception) {
        String exceptionString = jobMetaDataParameterObject.exceptionString();
        JetException jetExceptionWithMetaData = new JetException(exceptionString, exception);
        JetServiceBackend.wrapWithJetException(jetExceptionWithMetaData);
    }

    static void wrapWithJetException(Exception exception) {
        if (!(exception instanceof JetException)) {
            ExceptionUtil.rethrow(exception);
        } else {
            ExceptionUtil.sneakyThrow(exception);
        }
    }

    private void checkResourceUploadEnabled() {
        if (!this.jetConfig.isResourceUploadEnabled()) {
            throw new JetException("Resource upload is not enabled");
        }
    }

    public void executeJar(JobMetaDataParameterObject jobMetaDataParameterObject) {
        String message;
        if (this.logger.isInfoEnabled()) {
            message = String.format("Try executing jar file %s for session %s", jobMetaDataParameterObject.getJarPath(), jobMetaDataParameterObject.getSessionId());
            this.logger.info(message);
        }
        this.checkResourceUploadEnabled();
        try {
            HazelcastBootstrap.executeJarOnMember(this::getHazelcastInstance, jobMetaDataParameterObject.getJarPath().toString(), jobMetaDataParameterObject.getSnapshotName(), jobMetaDataParameterObject.getJobName(), jobMetaDataParameterObject.getMainClass(), jobMetaDataParameterObject.getJobParameters());
            if (this.logger.isInfoEnabled()) {
                message = String.format("executing jar file %s for session %s finished successfully", jobMetaDataParameterObject.getJarPath(), jobMetaDataParameterObject.getSessionId());
                this.logger.info(message);
            }
        }
        catch (Exception exception) {
            this.logger.severe("caught exception when running the jar", exception);
            this.throwJetExceptionFromJobMetaData(jobMetaDataParameterObject, exception);
        }
        finally {
            JobUploadStatus.cleanup(jobMetaDataParameterObject);
        }
    }

    private HazelcastInstance getHazelcastInstance() {
        return this.getNodeEngine().getHazelcastInstance();
    }
}

