/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.JetCancelJobCodec;
import com.hazelcast.client.impl.protocol.codec.JetCompleteResourceCodec;
import com.hazelcast.client.impl.protocol.codec.JetExecuteJobCodec;
import com.hazelcast.client.impl.protocol.codec.JetUpdateResourceCodec;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.jet.DAG;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ResourceConfig;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.deployment.ResourceIterator;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.util.function.Supplier;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class JetClientInstanceImpl
extends AbstractJetInstance {
    private final HazelcastClientInstanceImpl client;
    private final ILogger logger;

    public JetClientInstanceImpl(HazelcastClientInstanceImpl hazelcastInstance) {
        super(hazelcastInstance);
        this.client = hazelcastInstance;
        this.logger = hazelcastInstance.getLoggingService().getLogger(JetInstance.class);
        ExceptionUtil.registerJetExceptions(hazelcastInstance.getClientExceptionFactory());
    }

    @Override
    public JetConfig getConfig() {
        throw new UnsupportedOperationException("Jet Configuration is not available on the client");
    }

    @Override
    public Job newJob(DAG dag) {
        return new JobImpl(dag);
    }

    @Override
    public Job newJob(DAG dag, JobConfig config) {
        return new JobImpl(dag, config);
    }

    private final class ExecutionFuture
    implements Future<Void> {
        private final ClientInvocationFuture future;
        private final long executionId;
        private final Address executionAddress;

        protected ExecutionFuture(ClientInvocationFuture future, long executionId, Address executionAddress) {
            this.future = future;
            this.executionId = executionId;
            this.executionAddress = executionAddress;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = this.future.cancel(true);
            if (!cancelled) {
                return false;
            }
            new ClientInvocation(JetClientInstanceImpl.this.client, JetCancelJobCodec.encodeRequest(this.executionId), this.executionAddress).invoke().andThen(new ExecutionCallback<ClientMessage>(){

                @Override
                public void onResponse(ClientMessage clientMessage) {
                }

                @Override
                public void onFailure(Throwable throwable) {
                    JetClientInstanceImpl.this.logger.warning("Error cancelling job with id " + ExecutionFuture.this.executionId, throwable);
                }
            });
            return true;
        }

        @Override
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override
        public Void get() throws InterruptedException, ExecutionException {
            this.future.get();
            return null;
        }

        @Override
        public Void get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            this.future.get(timeout, unit);
            return null;
        }
    }

    private class JobImpl
    implements Job {
        private final DAG dag;
        private final JobConfig config;

        protected JobImpl(DAG dag) {
            this(dag, new JobConfig());
        }

        protected JobImpl(DAG dag, JobConfig config) {
            this.dag = dag;
            this.config = config;
        }

        @Override
        public Future<Void> execute() {
            long executionId = JetClientInstanceImpl.this.getIdGenerator().newId();
            this.deployResources(executionId);
            Object dagData = JetClientInstanceImpl.this.client.getSerializationService().toData(this.dag);
            Address executionAddress = JetClientInstanceImpl.this.client.getPartitionService().getPartition(executionId).getOwner().getAddress();
            ClientInvocation invocation = new ClientInvocation(JetClientInstanceImpl.this.client, JetExecuteJobCodec.encodeRequest(executionId, dagData), executionAddress);
            return new ExecutionFuture(invocation.invoke(), executionId, executionAddress);
        }

        private void deployResources(long executionId) {
            Set<ResourceConfig> resources = this.config.getResourceConfigs();
            if (JetClientInstanceImpl.this.logger.isFineEnabled() && resources.size() > 0) {
                JetClientInstanceImpl.this.logger.fine("Deploying the following resources for " + executionId + ':' + resources);
            }
            try (ResourceIterator it = new ResourceIterator(resources);){
                it.forEachRemaining(part -> {
                    Object partData = JetClientInstanceImpl.this.client.getSerializationService().toData(part);
                    this.invokeOnCluster(() -> JetUpdateResourceCodec.encodeRequest(executionId, partData));
                });
            }
            resources.forEach(r -> {
                Object descriptorData = JetClientInstanceImpl.this.client.getSerializationService().toData(r.getDescriptor());
                this.invokeOnCluster(() -> JetCompleteResourceCodec.encodeRequest(executionId, descriptorData));
            });
            JetClientInstanceImpl.this.logger.fine("Resource deployment for job " + executionId + " completed.");
        }

        private List<ClientMessage> invokeOnCluster(Supplier<ClientMessage> messageSupplier) {
            return JetClientInstanceImpl.this.client.getCluster().getMembers().stream().map(m -> new ClientInvocation(JetClientInstanceImpl.this.client, (ClientMessage)messageSupplier.get(), m.getAddress()).invoke()).collect(Collectors.toList()).stream().map(f -> Util.uncheckCall(f::get)).collect(Collectors.toList());
        }
    }
}

