/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.client.impl.ClientEngineImpl;
import com.hazelcast.core.Member;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.instance.HazelcastInstanceImpl;
import com.hazelcast.instance.JetBuildInfo;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.TopologyChangedException;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.impl.ClientInvocationRegistry;
import com.hazelcast.jet.impl.JetInstanceImpl;
import com.hazelcast.jet.impl.LiveOperationRegistry;
import com.hazelcast.jet.impl.Networking;
import com.hazelcast.jet.impl.deployment.JetClassLoader;
import com.hazelcast.jet.impl.deployment.ResourceStore;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.ExecutionService;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Packet;
import com.hazelcast.spi.CanCancelOperations;
import com.hazelcast.spi.ConfigurableService;
import com.hazelcast.spi.LiveOperations;
import com.hazelcast.spi.LiveOperationsTracker;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.MemberAttributeServiceEvent;
import com.hazelcast.spi.MembershipAwareService;
import com.hazelcast.spi.MembershipServiceEvent;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.PacketHandler;
import java.io.IOException;
import java.security.AccessController;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

public class JetService
implements ManagedService,
ConfigurableService<JetConfig>,
PacketHandler,
LiveOperationsTracker,
CanCancelOperations,
MembershipAwareService {
    public static final String SERVICE_NAME = "hz:impl:jetService";
    private final ILogger logger;
    private final ClientInvocationRegistry clientInvocationRegistry;
    private final LiveOperationRegistry liveOperationRegistry;
    private final ConcurrentHashMap<Long, ExecutionContext> executionContexts = new ConcurrentHashMap();
    private final ConcurrentHashMap<Long, ResourceStore> resourceStores = new ConcurrentHashMap();
    private final ConcurrentHashMap<Long, ClassLoader> classLoaders = new ConcurrentHashMap();
    private JetConfig config = new JetConfig();
    private NodeEngineImpl nodeEngine;
    private JetInstance jetInstance;
    private Networking networking;
    private ExecutionService executionService;

    public JetService(NodeEngine nodeEngine) {
        this.nodeEngine = (NodeEngineImpl)nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.clientInvocationRegistry = new ClientInvocationRegistry();
        this.liveOperationRegistry = new LiveOperationRegistry();
    }

    public void configure(JetConfig config) {
        this.config = config;
    }

    public void init(NodeEngine engine, Properties properties) {
        this.jetInstance = new JetInstanceImpl((HazelcastInstanceImpl)engine.getHazelcastInstance(), this.config);
        this.networking = new Networking(engine, this.executionContexts, this.config.getInstanceConfig().getFlowControlPeriodMs());
        this.executionService = new ExecutionService(this.nodeEngine.getHazelcastInstance(), this.config.getInstanceConfig().getCooperativeThreadCount());
        ClientEngineImpl clientEngine = (ClientEngineImpl)engine.getService("hz:core:clientEngine");
        ExceptionUtil.registerJetExceptions(clientEngine.getClientExceptionFactory());
        JetBuildInfo jetBuildInfo = BuildInfoProvider.getBuildInfo().getJetBuildInfo();
        this.logger.info("Starting Jet " + jetBuildInfo.getVersion() + " (" + jetBuildInfo.getBuild() + " - " + jetBuildInfo.getRevision() + ") ");
        this.logger.info("Setting number of cooperative threads and default parallelism to " + this.config.getInstanceConfig().getCooperativeThreadCount());
        this.logger.info("\n\to   o   o   o---o o---o o     o---o   o   o---o o-o-o        o o---o o-o-o\n\t|   |  / \\     /  |     |     |      / \\  |       |          | |       |  \n\to---o o---o   o   o-o   |     o     o---o o---o   |          | o-o     |  \n\t|   | |   |  /    |     |     |     |   |     |   |      \\   | |       |  \n\to   o o   o o---o o---o o---o o---o o   o o---o   o       o--o o---o   o   ");
        this.logger.info("Copyright (c) 2008-2017, Hazelcast, Inc. All Rights Reserved.");
    }

    public void shutdown(boolean terminate) {
        this.networking.destroy();
        this.executionService.shutdown();
    }

    public void reset() {
    }

    public void initExecution(long executionId, ExecutionPlan plan) {
        ExecutionContext[] created = new ExecutionContext[]{null};
        try {
            this.executionContexts.compute(executionId, (k, v) -> {
                if (v != null) {
                    throw new IllegalStateException("Execution context " + executionId + " already exists");
                }
                created[0] = new ExecutionContext(executionId, (NodeEngine)this.nodeEngine, this.executionService);
                return created[0].initialize(plan);
            });
        }
        catch (Throwable t) {
            if (created[0] != null) {
                this.executionContexts.put(executionId, created[0]);
            }
            throw t;
        }
    }

    public void completeExecution(long executionId, Throwable error) {
        ExecutionContext context = this.executionContexts.remove(executionId);
        if (context != null) {
            context.complete(error);
        }
    }

    public JetInstance getJetInstance() {
        return this.jetInstance;
    }

    public LiveOperationRegistry getLiveOperationRegistry() {
        return this.liveOperationRegistry;
    }

    public ClientInvocationRegistry getClientInvocationRegistry() {
        return this.clientInvocationRegistry;
    }

    public ResourceStore getResourceStore(long executionId) {
        return this.resourceStores.computeIfAbsent(executionId, k -> new ResourceStore(this.config.getInstanceConfig().getTempDir()));
    }

    public ClassLoader getClassLoader(long executionId) {
        return this.classLoaders.computeIfAbsent(executionId, k -> AccessController.doPrivileged(() -> new JetClassLoader(this.getResourceStore((long)k))));
    }

    public ExecutionContext getExecutionContext(long executionId) {
        return this.executionContexts.get(executionId);
    }

    public Map<Member, ExecutionPlan> createExecutionPlans(DAG dag) {
        return ExecutionPlan.createExecutionPlans((NodeEngine)this.nodeEngine, dag, this.config.getInstanceConfig().getCooperativeThreadCount());
    }

    public void populate(LiveOperations liveOperations) {
        this.liveOperationRegistry.populate(liveOperations);
    }

    public boolean cancelOperation(Address caller, long callId) {
        return this.liveOperationRegistry.cancel(caller, callId);
    }

    public void handle(Packet packet) throws IOException {
        this.networking.handle(packet);
    }

    public void memberAdded(MembershipServiceEvent event) {
    }

    public void memberRemoved(MembershipServiceEvent event) {
        Address address = event.getMember().getAddress();
        this.liveOperationRegistry.liveOperations.entrySet().stream().filter(e -> address.equals(e.getKey())).flatMap(e -> ((Map)e.getValue()).values().stream()).forEach(op -> Optional.ofNullable(this.executionContexts.get(op.getExecutionId())).map(ExecutionContext::getExecutionCompletionStage).ifPresent(stage -> stage.whenComplete((aVoid, throwable) -> this.completeExecution(op.getExecutionId(), (Throwable)((Object)new TopologyChangedException("Topology has been changed"))))));
    }

    public void memberAttributeChanged(MemberAttributeServiceEvent event) {
    }
}

