/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.JetCancelJobCodec;
import com.hazelcast.client.impl.protocol.codec.JetGetJobIdsCodec;
import com.hazelcast.client.impl.protocol.codec.JetGetJobStatusCodec;
import com.hazelcast.client.impl.protocol.codec.JetJoinSubmittedJobCodec;
import com.hazelcast.client.impl.protocol.codec.JetSubmitJobCodec;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.Member;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.AbstractSubmittedJobImpl;
import com.hazelcast.jet.impl.AbstractTrackedJobImpl;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.serialization.SerializationService;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class JetClientInstanceImpl
extends AbstractJetInstance {
    private final HazelcastClientInstanceImpl client;
    private final ILogger logger;
    private SerializationService serializationService;

    public JetClientInstanceImpl(HazelcastClientInstanceImpl hazelcastInstance) {
        super((HazelcastInstance)hazelcastInstance);
        this.client = hazelcastInstance;
        this.logger = this.getLogger(JetInstance.class);
        this.serializationService = this.client.getSerializationService();
        ExceptionUtil.registerJetExceptions(hazelcastInstance.getClientExceptionFactory());
    }

    @Override
    public JetConfig getConfig() {
        throw new UnsupportedOperationException("Jet Configuration is not available on the client");
    }

    @Override
    public Job newJob(DAG dag) {
        SubmittedJobImpl job = new SubmittedJobImpl(this, this.getLogger(SubmittedJobImpl.class), dag, new JobConfig());
        job.init();
        return job;
    }

    @Override
    public Job newJob(DAG dag, JobConfig config) {
        SubmittedJobImpl job = new SubmittedJobImpl(this, this.getLogger(SubmittedJobImpl.class), dag, config);
        job.init();
        return job;
    }

    @Override
    public Collection<Job> getJobs() {
        Set jobIds;
        ClientMessage request = JetGetJobIdsCodec.encodeRequest();
        ClientInvocation invocation = new ClientInvocation(this.client, request, null, this.masterAddress());
        try {
            ClientMessage clientMessage = (ClientMessage)invocation.invoke().get();
            JetGetJobIdsCodec.ResponseParameters response = JetGetJobIdsCodec.decodeResponse((ClientMessage)clientMessage);
            jobIds = (Set)this.serializationService.toObject((Object)response.response);
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
        List<Job> jobs = jobIds.stream().map(jobId -> new TrackedJobImpl(this.getLogger(TrackedJobImpl.class), (long)jobId)).collect(Collectors.toList());
        jobs.forEach(job -> ((TrackedJobImpl)job).init());
        return jobs;
    }

    private JobStatus sendJobStatusRequest(long jobId, boolean retryOnNotFound) {
        ClientMessage request = JetGetJobStatusCodec.encodeRequest((long)jobId, (boolean)retryOnNotFound);
        ClientInvocation invocation = new ClientInvocation(this.client, request, JetClientInstanceImpl.jobObjectName(jobId), this.masterAddress());
        try {
            ClientMessage clientMessage = (ClientMessage)invocation.invoke().get();
            JetGetJobStatusCodec.ResponseParameters response = JetGetJobStatusCodec.decodeResponse((ClientMessage)clientMessage);
            return (JobStatus)((Object)this.serializationService.toObject((Object)response.response));
        }
        catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private ILogger getLogger(Class type) {
        return this.client.getLoggingService().getLogger(type);
    }

    private Address masterAddress() {
        Optional first = this.client.getCluster().getMembers().stream().findFirst();
        return ((Member)first.orElseThrow(() -> new IllegalStateException("No members found in cluster"))).getAddress();
    }

    private static String jobObjectName(long jobId) {
        return "jobId=" + Util.idToString(jobId);
    }

    private final class ExecutionFuture
    implements ICompletableFuture<Void> {
        private final ClientInvocationFuture future;
        private final long jobId;
        private final Address executionAddress;

        ExecutionFuture(ClientInvocationFuture future, long jobId, Address executionAddress) {
            this.future = future;
            this.jobId = jobId;
            this.executionAddress = executionAddress;
        }

        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = this.future.cancel(true);
            if (!cancelled) {
                return false;
            }
            new ClientInvocation(JetClientInstanceImpl.this.client, JetCancelJobCodec.encodeRequest((long)this.jobId), JetClientInstanceImpl.jobObjectName(this.jobId), this.executionAddress).invoke().andThen((ExecutionCallback)new ExecutionCallback<ClientMessage>(){

                public void onResponse(ClientMessage clientMessage) {
                }

                public void onFailure(Throwable throwable) {
                    JetClientInstanceImpl.this.logger.warning("Error cancelling job with jobId " + Util.idToString(ExecutionFuture.this.jobId), throwable);
                }
            });
            return true;
        }

        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        public boolean isDone() {
            return this.future.isDone();
        }

        public Void get() throws InterruptedException, ExecutionException {
            this.future.get();
            return null;
        }

        public Void get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            this.future.get(timeout, unit);
            return null;
        }

        public void andThen(final ExecutionCallback<Void> callback) {
            this.future.andThen((ExecutionCallback)new ExecutionCallback<ClientMessage>(){

                public void onResponse(ClientMessage response) {
                    callback.onResponse(null);
                }

                public void onFailure(Throwable t) {
                    callback.onFailure(t);
                }
            });
        }

        public void andThen(final ExecutionCallback<Void> callback, Executor executor) {
            this.future.andThen((ExecutionCallback)new ExecutionCallback<ClientMessage>(){

                public void onResponse(ClientMessage response) {
                    callback.onResponse(null);
                }

                public void onFailure(Throwable t) {
                    callback.onFailure(t);
                }
            }, executor);
        }
    }

    private class TrackedJobImpl
    extends AbstractTrackedJobImpl {
        TrackedJobImpl(ILogger logger, long jobId) {
            super(logger, jobId);
        }

        @Override
        protected Address getMasterAddress() {
            return JetClientInstanceImpl.this.masterAddress();
        }

        @Override
        protected ICompletableFuture<Void> sendJoinRequest(Address masterAddress) {
            ClientMessage request = JetJoinSubmittedJobCodec.encodeRequest((long)this.getJobId());
            ClientInvocation invocation = new ClientInvocation(JetClientInstanceImpl.this.client, request, JetClientInstanceImpl.jobObjectName(this.getJobId()), masterAddress);
            return new ExecutionFuture(invocation.invoke(), this.getJobId(), masterAddress);
        }

        @Override
        protected JobStatus sendJobStatusRequest() {
            return JetClientInstanceImpl.this.sendJobStatusRequest(this.getJobId(), false);
        }
    }

    private class SubmittedJobImpl
    extends AbstractSubmittedJobImpl {
        SubmittedJobImpl(JetInstance jetInstance, ILogger logger, DAG dag, JobConfig config) {
            super(jetInstance, logger, dag, config);
        }

        @Override
        protected Address getMasterAddress() {
            return JetClientInstanceImpl.this.masterAddress();
        }

        @Override
        protected ICompletableFuture<Void> sendJoinRequest(Address masterAddress) {
            ClientInvocation invocation = new ClientInvocation(JetClientInstanceImpl.this.client, this.createJoinJobRequest(), JetClientInstanceImpl.jobObjectName(this.getJobId()), masterAddress);
            return new ExecutionFuture(invocation.invoke(), this.getJobId(), masterAddress);
        }

        @Override
        protected JobStatus sendJobStatusRequest() {
            return JetClientInstanceImpl.this.sendJobStatusRequest(this.getJobId(), true);
        }

        private ClientMessage createJoinJobRequest() {
            Data serializedDag = JetClientInstanceImpl.this.serializationService.toData((Object)this.dag);
            Data serializedConfig = JetClientInstanceImpl.this.serializationService.toData((Object)this.config);
            return JetSubmitJobCodec.encodeRequest((long)this.getJobId(), (Data)serializedDag, (Data)serializedConfig);
        }
    }
}

