/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.client.impl.ClientEngineImpl;
import com.hazelcast.config.Config;
import com.hazelcast.config.ConfigAccessor;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.impl.MetricsService;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.partition.InternalPartitionService;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.services.ManagedService;
import com.hazelcast.internal.services.MembershipAwareService;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JetProperties;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.JetInstanceImpl;
import com.hazelcast.jet.impl.JobCoordinationService;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.LiveOperationRegistry;
import com.hazelcast.jet.impl.Networking;
import com.hazelcast.jet.impl.execution.TaskletExecutionService;
import com.hazelcast.jet.impl.metrics.JobMetricsPublisher;
import com.hazelcast.jet.impl.operation.NotifyMemberShutdownOperation;
import com.hazelcast.jet.impl.serialization.DelegatingSerializationService;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.sql.impl.JetSqlCoreBackend;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;

public class JetService
implements ManagedService,
MembershipAwareService,
LiveOperationsTracker {
    public static final String SERVICE_NAME = "hz:impl:jetService";
    public static final int MAX_PARALLEL_ASYNC_OPS = 1000;
    private static final int NOTIFY_MEMBER_SHUTDOWN_DELAY = 5;
    private NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final LiveOperationRegistry liveOperationRegistry;
    private final AtomicReference<CompletableFuture<Void>> shutdownFuture = new AtomicReference();
    private final Thread shutdownHookThread;
    private JetConfig config;
    private AbstractJetInstance jetInstance;
    private Networking networking;
    private TaskletExecutionService taskletExecutionService;
    private JobRepository jobRepository;
    private JobCoordinationService jobCoordinationService;
    private JobExecutionService jobExecutionService;
    private final AtomicInteger numConcurrentAsyncOps = new AtomicInteger();
    private final Supplier<int[]> sharedPartitionKeys = Util.memoizeConcurrent(this::computeSharedPartitionKeys);
    @Nullable
    private final JetSqlCoreBackend sqlCoreBackend;

    public JetService(Node node) {
        JetSqlCoreBackend sqlCoreBackend;
        this.logger = node.getLogger(this.getClass());
        this.liveOperationRegistry = new LiveOperationRegistry();
        this.shutdownHookThread = this.shutdownHookThread(node);
        try {
            Class<?> jetSqlServiceClass = Class.forName("com.hazelcast.jet.sql.impl.JetSqlCoreBackendImpl");
            sqlCoreBackend = (JetSqlCoreBackend)jetSqlServiceClass.newInstance();
        }
        catch (ClassNotFoundException e) {
            sqlCoreBackend = null;
        }
        catch (ReflectiveOperationException e) {
            throw new RuntimeException(e);
        }
        this.sqlCoreBackend = sqlCoreBackend;
    }

    public void init(NodeEngine engine, Properties hzProperties) {
        this.nodeEngine = (NodeEngineImpl)engine;
        this.config = JetService.findJetServiceConfig(engine.getConfig());
        this.jetInstance = new JetInstanceImpl((HazelcastInstanceImpl)engine.getHazelcastInstance(), this.config);
        this.taskletExecutionService = new TaskletExecutionService(this.nodeEngine, this.config.getInstanceConfig().getCooperativeThreadCount(), this.nodeEngine.getProperties());
        this.jobRepository = new JobRepository(this.jetInstance);
        this.jobExecutionService = new JobExecutionService(this.nodeEngine, this.taskletExecutionService, this.jobRepository);
        this.jobCoordinationService = this.createJobCoordinationService();
        MetricsService metricsService = (MetricsService)this.nodeEngine.getService("hz:impl:metricsService");
        metricsService.registerPublisher(nodeEngine -> new JobMetricsPublisher(this.jobExecutionService, nodeEngine.getLocalMember()));
        this.nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider((DynamicMetricsProvider)this.jobExecutionService);
        this.networking = new Networking(engine, this.jobExecutionService, this.config.getInstanceConfig().getFlowControlPeriodMs());
        ClientEngineImpl clientEngine = (ClientEngineImpl)engine.getService("hz:core:clientEngine");
        ExceptionUtil.registerJetExceptions(clientEngine.getExceptionFactory());
        if (Boolean.parseBoolean(this.config.getHazelcastConfig().getProperties().getProperty(JetProperties.JET_SHUTDOWNHOOK_ENABLED.getName()))) {
            this.logger.finest("Adding Jet shutdown hook");
            Runtime.getRuntime().addShutdownHook(this.shutdownHookThread);
        }
        this.logger.info("Setting number of cooperative threads and default parallelism to " + this.config.getInstanceConfig().getCooperativeThreadCount());
        if (this.sqlCoreBackend != null) {
            try {
                Method initJetInstanceMethod = this.sqlCoreBackend.getClass().getMethod("init", AbstractJetInstance.class);
                initJetInstanceMethod.invoke((Object)this.sqlCoreBackend, this.jetInstance);
            }
            catch (ReflectiveOperationException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static JetConfig findJetServiceConfig(Config hzConfig) {
        return (JetConfig)ConfigAccessor.getServicesConfig((Config)hzConfig).getServiceConfig(SERVICE_NAME).getConfigObject();
    }

    void shutDownJobs() {
        if (this.shutdownFuture.compareAndSet(null, new CompletableFuture())) {
            this.notifyMasterWeAreShuttingDown(this.shutdownFuture.get());
        }
        this.shutdownFuture.get().join();
        assert (this.jobExecutionService.numberOfExecutions() == 0) : "numberOfExecutions should be zero, but is " + this.jobExecutionService.numberOfExecutions();
    }

    private void notifyMasterWeAreShuttingDown(CompletableFuture<Void> future) {
        NotifyMemberShutdownOperation op = new NotifyMemberShutdownOperation();
        this.nodeEngine.getOperationService().invokeOnTarget(SERVICE_NAME, (Operation)op, this.nodeEngine.getClusterService().getMasterAddress()).whenCompleteAsync((response, throwable) -> {
            if (throwable != null) {
                this.logger.warning("Failed to notify master member that this member is shutting down, will retry in 5 seconds", throwable);
                this.nodeEngine.getExecutionService().schedule(() -> this.notifyMasterWeAreShuttingDown(future), 5L, TimeUnit.SECONDS);
            } else {
                future.complete(null);
            }
        });
    }

    public void shutdown(boolean forceful) {
        if (!Thread.currentThread().equals(this.shutdownHookThread)) {
            Runtime.getRuntime().removeShutdownHook(this.shutdownHookThread);
        }
        this.jobExecutionService.shutdown();
        this.taskletExecutionService.shutdown();
        this.taskletExecutionService.awaitWorkerTermination();
        this.networking.shutdown();
    }

    public void reset() {
        this.jobExecutionService.reset();
        this.jobCoordinationService.reset();
    }

    JobCoordinationService createJobCoordinationService() {
        return new JobCoordinationService(this.nodeEngine, this, this.config, this.jobRepository);
    }

    public InternalSerializationService createSerializationService(Map<String, String> serializerConfigs) {
        return DelegatingSerializationService.from(this.getNodeEngine().getSerializationService(), serializerConfigs);
    }

    public Operation createExportSnapshotOperation(long jobId, String name, boolean cancelJob) {
        throw new UnsupportedOperationException("You need Hazelcast Jet Enterprise to use this feature");
    }

    public JetInstance getJetInstance() {
        return this.jetInstance;
    }

    public LiveOperationRegistry getLiveOperationRegistry() {
        return this.liveOperationRegistry;
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public NodeEngineImpl getNodeEngine() {
        return this.nodeEngine;
    }

    public JetConfig getConfig() {
        return this.config;
    }

    public JobCoordinationService getJobCoordinationService() {
        return this.jobCoordinationService;
    }

    public JobExecutionService getJobExecutionService() {
        return this.jobExecutionService;
    }

    public JobConfig getJobConfig(long jobId) {
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return jobRecord.getConfig();
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getJobConfig();
        }
        throw new JobNotFoundException(jobId);
    }

    public ClassLoader getClassLoader(long jobId) {
        return this.getJobExecutionService().getClassLoader(this.getJobConfig(jobId), jobId);
    }

    void handlePacket(Packet packet) {
        try {
            this.networking.handle(packet);
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    public void memberRemoved(MembershipServiceEvent event) {
        this.jobExecutionService.onMemberRemoved(event.getMember().getAddress());
        this.jobCoordinationService.onMemberRemoved(event.getMember().getUuid());
    }

    public void memberAdded(MembershipServiceEvent event) {
        this.jobCoordinationService.onMemberAdded(event.getMember());
    }

    public AtomicInteger numConcurrentAsyncOps() {
        return this.numConcurrentAsyncOps;
    }

    public void populate(LiveOperations liveOperations) {
        this.liveOperationRegistry.populate(liveOperations);
    }

    public int[] getSharedPartitionKeys() {
        return this.sharedPartitionKeys.get();
    }

    private int[] computeSharedPartitionKeys() {
        InternalPartitionService partitionService = this.nodeEngine.getPartitionService();
        int[] keys = new int[partitionService.getPartitionCount()];
        int remainingCount = partitionService.getPartitionCount();
        int i = 1;
        while (remainingCount > 0) {
            int partitionId = partitionService.getPartitionId((Object)i);
            if (keys[partitionId] == 0) {
                keys[partitionId] = i;
                --remainingCount;
            }
            ++i;
        }
        return keys;
    }

    private Thread shutdownHookThread(Node node) {
        return new Thread(() -> {
            String policy = node.getProperties().getString(ClusterProperty.SHUTDOWNHOOK_POLICY);
            if (policy.equals("TERMINATE")) {
                this.jetInstance.getHazelcastInstance().getLifecycleService().terminate();
            } else {
                this.jetInstance.shutdown();
            }
        }, "jet.ShutdownThread");
    }

    @Nullable
    JetSqlCoreBackend getSqlCoreBackend() {
        return this.sqlCoreBackend;
    }
}

