/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.BaseRemoteService;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBatch;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteParams;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceCancelRequest;
import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceKey;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonRemoteService
extends BaseRemoteService
implements RRemoteService {
    private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
    private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();

    public RedissonRemoteService(RedissonClient redisson, CommandExecutor commandExecutor) {
        super(redisson, commandExecutor);
    }

    public RedissonRemoteService(RedissonClient redisson, String name, CommandExecutor commandExecutor) {
        super(redisson, name, commandExecutor);
    }

    public RedissonRemoteService(Codec codec, RedissonClient redisson, CommandExecutor commandExecutor) {
        super(codec, redisson, commandExecutor);
    }

    public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor) {
        super(codec, redisson, name, commandExecutor);
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object) {
        this.register(remoteInterface, object, 1);
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object, int workers) {
        this.register(remoteInterface, object, workers, this.commandExecutor.getConnectionManager().getExecutor());
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
        if (workers < 1) {
            throw new IllegalArgumentException("executorsAmount can't be lower than 1");
        }
        for (Method method : remoteInterface.getMethods()) {
            RemoteServiceMethod value = new RemoteServiceMethod(method, object);
            RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName(), this.getMethodSignatures(method));
            if (this.beans.put(key, value) == null) continue;
            return;
        }
        for (int i = 0; i < workers; ++i) {
            String requestQueueName = this.getRequestQueueName(remoteInterface);
            RBlockingQueue<RemoteServiceRequest> requestQueue = this.redisson.getBlockingQueue(requestQueueName, this.getCodec());
            this.subscribe(remoteInterface, requestQueue, executor);
        }
    }

    private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue, final ExecutorService executor) {
        RFuture<RemoteServiceRequest> take = requestQueue.takeAsync();
        take.addListener(new FutureListener<RemoteServiceRequest>(){

            @Override
            public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
                if (!future.isSuccess()) {
                    if (future.cause() instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error("Can't process the remote service request.", future.cause());
                    RedissonRemoteService.this.subscribe(remoteInterface, requestQueue, executor);
                    return;
                }
                final RemoteServiceRequest request = future.getNow();
                if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) {
                    log.debug("request: {} has been skipped due to ackTimeout");
                    RedissonRemoteService.this.subscribe(remoteInterface, requestQueue, executor);
                    return;
                }
                String responseName = RedissonRemoteService.this.getResponseQueueName(remoteInterface, request.getRequestId());
                if (request.getOptions().isAckExpected()) {
                    String ackName = RedissonRemoteService.this.getAckName(remoteInterface, request.getRequestId());
                    RFuture ackClientsFuture = RedissonRemoteService.this.commandExecutor.evalWriteAsync(responseName, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then redis.call('pexpire', KEYS[1], ARGV[2]);redis.call('rpush', KEYS[2], ARGV[1]);redis.call('pexpire', KEYS[2], ARGV[2]);return 1;end;return 0;", Arrays.asList(ackName, responseName), RedissonRemoteService.this.encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis());
                    ackClientsFuture.addListener(new FutureListener<Boolean>(){

                        @Override
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            if (!future.isSuccess()) {
                                log.error("Can't send ack for request: " + request, future.cause());
                                if (future.cause() instanceof RedissonShutdownException) {
                                    return;
                                }
                                RedissonRemoteService.this.subscribe(remoteInterface, requestQueue, executor);
                                return;
                            }
                            if (!future.getNow().booleanValue()) {
                                RedissonRemoteService.this.subscribe(remoteInterface, requestQueue, executor);
                                return;
                            }
                            RedissonRemoteService.this.executeMethod(remoteInterface, requestQueue, executor, request);
                        }
                    });
                } else {
                    RedissonRemoteService.this.executeMethod(remoteInterface, requestQueue, executor, request);
                }
            }
        });
    }

    private <T> void executeMethod(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue, final ExecutorService executor, final RemoteServiceRequest request) {
        final RemoteServiceMethod method = this.beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignatures()));
        final String responseName = this.getResponseQueueName(remoteInterface, request.getRequestId());
        RBlockingQueue cancelRequestQueue = this.redisson.getBlockingQueue(this.getCancelRequestQueueName(remoteInterface, request.getRequestId()), this.getCodec());
        final RFuture<RemoteServiceCancelRequest> cancelRequestFuture = cancelRequestQueue.takeAsync();
        final AtomicReference responseHolder = new AtomicReference();
        final java.util.concurrent.Future<?> submitFuture = executor.submit(new Runnable(){

            @Override
            public void run() {
                RedissonRemoteService.this.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder);
            }
        });
        cancelRequestFuture.addListener(new FutureListener<RemoteServiceCancelRequest>(){

            @Override
            public void operationComplete(Future<RemoteServiceCancelRequest> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                boolean res = submitFuture.cancel(future.getNow().isMayInterruptIfRunning());
                if (res) {
                    RemoteServiceCancelResponse response = new RemoteServiceCancelResponse();
                    if (!responseHolder.compareAndSet(null, response)) {
                        response = new RemoteServiceCancelResponse(false);
                    }
                    if (future.getNow().getResponseId() != null) {
                        String cancelResponseName = RedissonRemoteService.this.getResponseQueueName(remoteInterface, future.getNow().getResponseId());
                        RedissonRemoteService.this.send(60000L, cancelResponseName, response);
                    }
                }
            }
        });
    }

    private <T> void invokeMethod(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue, final RemoteServiceRequest request, RemoteServiceMethod method, String responseName, final ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture, final AtomicReference<RRemoteServiceResponse> responseHolder) {
        try {
            if (method.getBean() instanceof RemoteParams) {
                ((RemoteParams)method.getBean()).setRequestId(request.getRequestId());
            }
            Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
            RemoteServiceResponse response = new RemoteServiceResponse(result);
            responseHolder.compareAndSet(null, response);
        }
        catch (Exception e) {
            RemoteServiceResponse response = new RemoteServiceResponse(e.getCause());
            responseHolder.compareAndSet(null, response);
            log.error("Can't execute: " + request, e);
        }
        if (cancelRequestFuture != null) {
            cancelRequestFuture.cancel(false);
        }
        if (request.getOptions().isResultExpected() || responseHolder.get() instanceof RemoteServiceCancelResponse) {
            long timeout = 60000L;
            if (request.getOptions().getExecutionTimeoutInMillis() != null) {
                timeout = request.getOptions().getExecutionTimeoutInMillis();
            }
            RFuture<List<?>> clientsFuture = this.send(timeout, responseName, responseHolder.get());
            clientsFuture.addListener(new FutureListener<List<?>>(){

                @Override
                public void operationComplete(Future<List<?>> future) throws Exception {
                    if (!future.isSuccess()) {
                        log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause());
                        if (future.cause() instanceof RedissonShutdownException) {
                            return;
                        }
                    }
                    RedissonRemoteService.this.subscribe(remoteInterface, requestQueue, executor);
                }
            });
        } else {
            this.subscribe(remoteInterface, requestQueue, executor);
        }
    }

    private <T extends RRemoteServiceResponse> RFuture<List<?>> send(long timeout, String responseName, T response) {
        RBatch batch = this.redisson.createBatch();
        RBlockingQueueAsync<T> queue = batch.getBlockingQueue(responseName, this.getCodec());
        queue.putAsync(response);
        queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
        return batch.executeAsync();
    }
}

