/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Redisson;
import org.redisson.RedissonClient;
import org.redisson.api.MessageListener;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RBucket;
import org.redisson.api.RExecutorService;
import org.redisson.api.RKeys;
import org.redisson.api.RTopic;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RInject;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.ConnectionManager;
import org.redisson.executor.ExecutorRemoteService;
import org.redisson.executor.RemoteExecutorService;
import org.redisson.executor.RemoteExecutorServiceAsync;
import org.redisson.executor.RemoteExecutorServiceImpl;
import org.redisson.executor.RemotePromise;

public class RedissonExecutorService
implements RExecutorService {
    public static final int SHUTDOWN_STATE = 1;
    public static final int TERMINATED_STATE = 2;
    private final CommandExecutor commandExecutor;
    private final ConnectionManager connectionManager;
    private final Codec codec;
    private final Redisson redisson;
    private final RAtomicLong tasksCounter;
    private final RBucket<Integer> status;
    private final RTopic<Integer> topic;
    private final RKeys keys;
    private final RemoteExecutorServiceAsync asyncService;
    private final RemoteExecutorServiceAsync asyncServiceWithoutResult;
    private final Map<Class<?>, byte[]> class2bytes = PlatformDependent.newConcurrentHashMap();
    private final String name;
    private final String requestQueueName;

    public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name) {
        this.codec = codec;
        this.commandExecutor = commandExecutor;
        this.connectionManager = commandExecutor.getConnectionManager();
        this.name = name;
        this.redisson = redisson;
        String objectName = this.requestQueueName = "{" + name + ":" + RemoteExecutorService.class.getName() + "}";
        this.tasksCounter = redisson.getAtomicLong(objectName + ":counter");
        this.status = redisson.getBucket(objectName + ":status", codec);
        this.topic = redisson.getTopic(objectName + ":topic", codec);
        this.keys = redisson.getKeys();
        ExecutorRemoteService remoteService = new ExecutorRemoteService(codec, redisson, name, commandExecutor);
        remoteService.setTasksCounterName(this.tasksCounter.getName());
        remoteService.setStatusName(this.status.getName());
        this.asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck());
        this.asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
    }

    @Override
    public void registerWorkers(int executors) {
        this.registerWorkers(executors, null);
    }

    @Override
    public void registerWorkers(int executors, Executor executor) {
        RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(this.commandExecutor, this.redisson, this.codec, this.requestQueueName);
        service.setStatusName(this.status.getName());
        service.setTasksCounterName(this.tasksCounter.getName());
        service.setTopicName(this.topic.getChannelNames().get(0));
        this.redisson.getRemoteSerivce(this.name, this.codec).register(RemoteExecutorService.class, service, executors, executor);
    }

    @Override
    public void execute(Runnable task) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        RemotePromise promise = (RemotePromise)this.asyncServiceWithoutResult.executeVoid(task.getClass().getName(), classBody, state);
        this.execute(promise);
    }

    private byte[] encode(Object task) {
        Field[] fields;
        for (Field field : fields = task.getClass().getDeclaredFields()) {
            if (!RedissonClient.class.isAssignableFrom(field.getType()) || !field.isAnnotationPresent(RInject.class)) continue;
            field.setAccessible(true);
            try {
                field.set(task, null);
            }
            catch (IllegalAccessException e) {
                throw new IllegalStateException(e);
            }
        }
        try {
            return this.codec.getValueEncoder().encode(task);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private byte[] getClassBody(Object task) {
        Class<?> c = task.getClass();
        byte[] classBody = this.class2bytes.get(c);
        if (classBody == null) {
            String className = c.getName();
            String classAsPath = className.replace('.', '/') + ".class";
            InputStream classStream = c.getClassLoader().getResourceAsStream(classAsPath);
            DataInputStream s = new DataInputStream(classStream);
            try {
                classBody = new byte[s.available()];
                s.readFully(classBody);
            }
            catch (IOException e) {
                throw new IllegalArgumentException(e);
            }
            this.class2bytes.put(c, classBody);
        }
        return classBody;
    }

    @Override
    public void shutdown() {
        this.commandExecutor.evalWrite(this.getName(), this.codec, RedisCommands.EVAL_VOID_WITH_VALUES_6, "if redis.call('exists', KEYS[2]) == 0 then if redis.call('get', KEYS[1]) == '0' or redis.call('exists', KEYS[1]) == 0 then redis.call('set', KEYS[2], ARGV[2]);redis.call('publish', KEYS[3], ARGV[2]);else redis.call('set', KEYS[2], ARGV[1]);end;end;", Arrays.asList(this.tasksCounter.getName(), this.status.getName(), this.topic.getChannelNames().get(0)), 1, 2);
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public boolean delete() {
        return this.keys.delete(this.requestQueueName, this.status.getName(), this.tasksCounter.getName()) > 0L;
    }

    @Override
    public List<Runnable> shutdownNow() {
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean isShutdown() {
        return this.status.isExists() && this.status.get() >= 1;
    }

    @Override
    public boolean isTerminated() {
        return this.status.isExists() && this.status.get() == 2;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        if (this.isTerminated()) {
            return true;
        }
        final CountDownLatch latch = new CountDownLatch(1);
        MessageListener<Integer> listener = new MessageListener<Integer>(){

            @Override
            public void onMessage(String channel, Integer msg) {
                if (msg == 2) {
                    latch.countDown();
                }
            }
        };
        int listenerId = this.topic.addListener(listener);
        if (this.isTerminated()) {
            this.topic.removeListener(listenerId);
            return true;
        }
        boolean res = latch.await(timeout, unit);
        this.topic.removeListener(listenerId);
        return res;
    }

    @Override
    public <T> java.util.concurrent.Future<T> submit(Callable<T> task) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        RemotePromise promise = (RemotePromise)this.asyncService.execute(task.getClass().getName(), classBody, state);
        this.execute(promise);
        return promise;
    }

    private void check(Object task) {
        if (task.getClass().isAnonymousClass()) {
            throw new IllegalArgumentException("Task can't be created using anonymous class");
        }
        if (!Serializable.class.isAssignableFrom(task.getClass())) {
            throw new IllegalArgumentException("Task class should implement Serializable interface");
        }
    }

    private <T> void execute(RemotePromise<T> promise) {
        Future<Boolean> addFuture = promise.getAddFuture();
        addFuture.syncUninterruptibly();
        Boolean res = addFuture.getNow();
        if (!res.booleanValue()) {
            throw new RejectedExecutionException("Task rejected. ExecutorService is in shutdown state");
        }
    }

    @Override
    public <T> java.util.concurrent.Future<T> submit(Runnable task, final T result) {
        final Promise resultFuture = this.connectionManager.newPromise();
        Future future = (Future)this.submit(task);
        future.addListener(new FutureListener<Object>(){

            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (!future.isSuccess()) {
                    resultFuture.setFailure(future.cause());
                    return;
                }
                resultFuture.setSuccess(result);
            }
        });
        return resultFuture;
    }

    @Override
    public java.util.concurrent.Future<?> submit(Runnable task) {
        this.check(task);
        byte[] classBody = this.getClassBody(task);
        byte[] state = this.encode(task);
        RemotePromise promise = (RemotePromise)this.asyncService.executeVoid(task.getClass().getName(), classBody, state);
        this.execute(promise);
        return promise;
    }

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long millis) throws InterruptedException, ExecutionException, TimeoutException {
        ExecutionException ee;
        ArrayList<java.util.concurrent.Future<T>> futures;
        block17: {
            if (tasks == null) {
                throw new NullPointerException();
            }
            int ntasks = tasks.size();
            if (ntasks == 0) {
                throw new IllegalArgumentException();
            }
            futures = new ArrayList<java.util.concurrent.Future<T>>(ntasks);
            ee = null;
            long lastTime = timed ? System.currentTimeMillis() : 0L;
            Iterator<Callable<T>> it = tasks.iterator();
            futures.add(this.submit(it.next()));
            --ntasks;
            int active = 1;
            while (true) {
                T now22;
                java.util.concurrent.Future<T> f;
                if ((f = this.poll(futures)) == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(this.submit(it.next()));
                        ++active;
                    } else {
                        if (active == 0) break;
                        if (timed) {
                            f = this.poll(futures, millis, TimeUnit.MILLISECONDS);
                            if (f == null) {
                                throw new TimeoutException();
                            }
                            long now22 = System.currentTimeMillis();
                            millis -= now22 - lastTime;
                            lastTime = now22;
                        } else {
                            f = this.poll(futures, -1L, null);
                        }
                    }
                }
                if (f == null) continue;
                --active;
                try {
                    now22 = f.get();
                }
                catch (ExecutionException eex) {
                    ee = eex;
                    continue;
                }
                catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                    continue;
                }
                return now22;
                break;
            }
            if (ee != null) break block17;
            ee = new ExecutionException("No tasks were finised", null);
        }
        throw ee;
        finally {
            for (java.util.concurrent.Future future : futures) {
                future.cancel(true);
            }
        }
    }

    private <T> java.util.concurrent.Future<T> poll(List<java.util.concurrent.Future<T>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
        Future f;
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference result = new AtomicReference();
        FutureListener listener = new FutureListener<T>(){

            @Override
            public void operationComplete(Future<T> future) throws Exception {
                latch.countDown();
                result.compareAndSet(null, future);
            }
        };
        for (java.util.concurrent.Future<T> future : futures) {
            f = (Future)future;
            f.addListener(listener);
        }
        if (timeout == -1L) {
            latch.await();
        } else {
            latch.await(timeout, timeUnit);
        }
        for (java.util.concurrent.Future<T> future : futures) {
            f = (Future)future;
            f.removeListener(listener);
        }
        return (java.util.concurrent.Future)result.get();
    }

    private <T> java.util.concurrent.Future<T> poll(List<java.util.concurrent.Future<T>> futures) {
        for (java.util.concurrent.Future<T> future : futures) {
            if (!future.isDone()) continue;
            return future;
        }
        return null;
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        try {
            return this.doInvokeAny(tasks, false, 0L);
        }
        catch (TimeoutException cannotHappen) {
            return null;
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return this.doInvokeAny(tasks, true, unit.toMillis(timeout));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        if (tasks == null) {
            throw new NullPointerException();
        }
        ArrayList<java.util.concurrent.Future<T>> futures = new ArrayList<java.util.concurrent.Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> callable : tasks) {
                java.util.concurrent.Future<T> future = this.submit(callable);
                futures.add(future);
            }
            for (java.util.concurrent.Future future : futures) {
                if (future.isDone()) continue;
                try {
                    future.get();
                }
                catch (CancellationException cancellationException) {
                }
                catch (ExecutionException executionException) {}
            }
            done = true;
            ArrayList<java.util.concurrent.Future<T>> arrayList = futures;
            return arrayList;
        }
        finally {
            if (!done) {
                for (java.util.concurrent.Future future : futures) {
                    future.cancel(true);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        if (tasks == null || unit == null) {
            throw new NullPointerException();
        }
        long millis = unit.toMillis(timeout);
        ArrayList<java.util.concurrent.Future<T>> futures = new ArrayList<java.util.concurrent.Future<T>>(tasks.size());
        boolean done = false;
        try {
            long lastTime = System.currentTimeMillis();
            for (Callable<T> callable : tasks) {
                java.util.concurrent.Future<T> future2 = this.submit(callable);
                futures.add(future2);
                long now = System.currentTimeMillis();
                lastTime = now;
                if ((millis -= now - lastTime) > 0L) continue;
                int n = tasks.size() - futures.size();
                for (int i = 0; i < n; ++i) {
                    Promise cancelledFuture = this.connectionManager.newPromise();
                    cancelledFuture.cancel(true);
                    futures.add(cancelledFuture);
                }
                ArrayList<java.util.concurrent.Future<T>> arrayList = futures;
                return arrayList;
            }
            for (java.util.concurrent.Future future : futures) {
                if (future.isDone()) continue;
                if (millis <= 0L) {
                    ArrayList<java.util.concurrent.Future<T>> future2 = futures;
                    return future2;
                }
                try {
                    future.get(millis, TimeUnit.MILLISECONDS);
                }
                catch (CancellationException ignore) {
                }
                catch (ExecutionException ignore) {
                }
                catch (TimeoutException toe) {
                    ArrayList<java.util.concurrent.Future<T>> arrayList = futures;
                    if (!done) {
                        for (java.util.concurrent.Future future3 : futures) {
                            future3.cancel(true);
                        }
                    }
                    return arrayList;
                }
                long now = System.currentTimeMillis();
                millis -= now - lastTime;
                lastTime = now;
            }
            done = true;
            ArrayList<java.util.concurrent.Future<T>> arrayList = futures;
            return arrayList;
        }
        finally {
            if (!done) {
                for (java.util.concurrent.Future future : futures) {
                    future.cancel(true);
                }
            }
        }
    }
}

