/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.map;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.openhft.chronicle.hash.impl.util.CloseablesManager;
import net.openhft.chronicle.hash.replication.ThrottlingConfig;
import net.openhft.chronicle.map.BufferResizer;
import net.openhft.chronicle.map.Replica;
import net.openhft.chronicle.map.ReplicatedChronicleMap;
import net.openhft.chronicle.map.SelectedSelectionKeySet;
import net.openhft.chronicle.map.StatelessChronicleMap;
import net.openhft.chronicle.map.VanillaContext;
import net.openhft.lang.io.ByteBufferBytes;
import net.openhft.lang.io.Bytes;
import net.openhft.lang.thread.NamedThreadFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractChannelReplicator
implements Closeable {
    public static final int BITS_IN_A_BYTE = 8;
    static final int SIZE_OF_SIZE = 4;
    public static final int SIZE_OF_TRANSACTION_ID = 8;
    public static final int SIZE_OF_TIME_SHIFT = 2;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractChannelReplicator.class);
    static boolean useJavaNIOSelectionKeys = Boolean.valueOf(System.getProperty("useJavaNIOSelectionKeys"));
    final SelectedSelectionKeySet selectedKeys = new SelectedSelectionKeySet();
    final CloseablesManager closeables = new CloseablesManager();
    final Selector selector = this.openSelector(this.closeables);
    private final ExecutorService executorService;
    private volatile Thread lastThread;
    private Throwable startedHere;
    private Future<?> future;
    private final Queue<Runnable> pendingRegistrations = new ConcurrentLinkedQueue<Runnable>();
    @Nullable
    private final Throttler throttler;
    volatile boolean isClosed = false;
    ReplicatedChronicleMap.BytesReplicatedContext context;

    AbstractChannelReplicator(String name, ThrottlingConfig throttlingConfig) throws IOException {
        this.executorService = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory(name, true){

            public Thread newThread(@net.openhft.lang.model.constraints.NotNull Runnable r) {
                return AbstractChannelReplicator.this.lastThread = super.newThread(r);
            }
        });
        this.throttler = throttlingConfig.throttling(TimeUnit.DAYS) > 0L ? new Throttler(this.selector, throttlingConfig.bucketInterval(TimeUnit.MILLISECONDS), throttlingConfig.throttling(TimeUnit.DAYS)) : null;
        this.startedHere = new Throwable("Started here");
    }

    Selector openSelector(CloseablesManager closeables) throws IOException {
        Selector result = Selector.open();
        closeables.add(result);
        if (!useJavaNIOSelectionKeys) {
            closeables.add(new Closeable(){

                @Override
                public void close() throws IOException {
                    int i;
                    SelectionKey[] keys = AbstractChannelReplicator.this.selectedKeys.flip();
                    for (i = 0; i < keys.length && keys[i] != null; ++i) {
                        keys[i] = null;
                    }
                    keys = AbstractChannelReplicator.this.selectedKeys.flip();
                    for (i = 0; i < keys.length && keys[i] != null; ++i) {
                        keys[i] = null;
                    }
                }
            });
            return AbstractChannelReplicator.openSelector(result, this.selectedKeys);
        }
        return result;
    }

    static SocketChannel openSocketChannel(CloseablesManager closeables) throws IOException {
        SocketChannel result = null;
        try {
            result = SocketChannel.open();
            result.socket().setTcpNoDelay(true);
        }
        finally {
            if (result != null) {
                try {
                    closeables.add(result);
                }
                catch (IllegalStateException illegalStateException) {}
            }
        }
        return result;
    }

    private static Selector openSelector(@NotNull Selector selector, @NotNull SelectedSelectionKeySet selectedKeySet) {
        try {
            Class<?> selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, AbstractChannelReplicator.getSystemClassLoader());
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);
            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);
        }
        catch (Exception e) {
            LOG.error("", (Throwable)e);
        }
        return selector;
    }

    static ClassLoader getSystemClassLoader() {
        if (System.getSecurityManager() == null) {
            return ClassLoader.getSystemClassLoader();
        }
        return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>(){

            @Override
            public ClassLoader run() {
                return ClassLoader.getSystemClassLoader();
            }
        });
    }

    void addPendingRegistration(Runnable registration) {
        this.pendingRegistrations.add(registration);
    }

    void registerPendingRegistrations() throws ClosedChannelException {
        Runnable runnable = this.pendingRegistrations.poll();
        while (runnable != null) {
            try {
                runnable.run();
            }
            catch (Exception e) {
                LOG.info("", (Throwable)e);
            }
            runnable = this.pendingRegistrations.poll();
        }
    }

    abstract void processEvent() throws IOException;

    final void start() {
        this.future = this.executorService.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    AbstractChannelReplicator.this.context = VanillaContext.get(ReplicatedChronicleMap.BytesReplicatedContextFactory.INSTANCE);
                    AbstractChannelReplicator.this.processEvent();
                }
                catch (Exception e) {
                    LOG.error("", (Throwable)e);
                }
            }
        });
    }

    @Override
    public void close() {
        if (Thread.interrupted()) {
            LOG.warn("Already interrupted");
        }
        long start = System.currentTimeMillis();
        this.closeResources();
        try {
            Thread thread;
            if (this.future != null) {
                this.future.cancel(true);
            }
            if ((thread = this.lastThread) != null && Thread.currentThread() != this.lastThread) {
                for (int i = 0; i < 10; ++i) {
                    if (!thread.isAlive()) continue;
                    thread.join(1000L);
                    this.dumpThreadStackTrace(start);
                }
            }
        }
        catch (InterruptedException e) {
            this.dumpThreadStackTrace(start);
            LOG.error("", (Throwable)e);
            LOG.error("", this.startedHere);
        }
    }

    public void closeResources() {
        this.isClosed = true;
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(5L, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            this.executorService.shutdownNow();
            try {
                this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e1) {
                LOG.error("", (Throwable)e);
            }
        }
        this.closeables.closeQuietly();
    }

    private void dumpThreadStackTrace(long start) {
        if (this.lastThread != null && this.lastThread.isAlive()) {
            StringBuilder sb = new StringBuilder();
            sb.append("Replicator thread still running after ");
            sb.append((double)((System.currentTimeMillis() - start) / 100L) / 10.0);
            sb.append(" secs ");
            sb.append(this.lastThread);
            sb.append(" isAlive= ");
            sb.append(this.lastThread.isAlive());
            for (StackTraceElement ste : this.lastThread.getStackTrace()) {
                sb.append("\n\t").append(ste);
            }
            LOG.warn(sb.toString());
        }
    }

    void closeEarlyAndQuietly(SelectableChannel channel) {
        if (this.throttler != null) {
            this.throttler.remove(channel);
        }
        this.closeables.closeQuietly(channel);
    }

    void checkThrottleInterval() throws ClosedChannelException {
        if (this.throttler != null) {
            this.throttler.checkThrottleInterval();
        }
    }

    void contemplateThrottleWrites(int bytesJustWritten) throws ClosedChannelException {
        if (this.throttler != null) {
            this.throttler.contemplateThrottleWrites(bytesJustWritten);
        }
    }

    void throttle(SelectableChannel channel) {
        if (this.throttler != null) {
            this.throttler.add(channel);
        }
    }

    abstract class AbstractConnector {
        private final String name;
        private int connectionAttempts = 0;
        private volatile SelectableChannel socketChannel;

        public AbstractConnector(String name) {
            this.name = name;
        }

        abstract SelectableChannel doConnect() throws IOException, InterruptedException;

        public final void connectLater() {
            if (this.socketChannel != null) {
                AbstractChannelReplicator.this.closeables.closeQuietly(this.socketChannel);
                this.socketChannel = null;
            }
            long reconnectionInterval = this.connectionAttempts * 100;
            if (this.connectionAttempts < 5) {
                ++this.connectionAttempts;
            }
            this.doConnect(reconnectionInterval);
        }

        public void connect() {
            this.doConnect(0L);
        }

        private void doConnect(final long reconnectionInterval) {
            Thread thread = new Thread(new Runnable(){

                @Override
                public void run() {
                    SelectableChannel socketChannel = null;
                    try {
                        if (reconnectionInterval > 0L) {
                            Thread.sleep(reconnectionInterval);
                        }
                        socketChannel = AbstractConnector.this.doConnect();
                        try {
                            AbstractChannelReplicator.this.closeables.add(socketChannel);
                        }
                        catch (IllegalStateException e) {
                            this.closeQuietly(socketChannel);
                            return;
                        }
                        AbstractConnector.this.socketChannel = socketChannel;
                    }
                    catch (Exception e) {
                        this.closeQuietly(socketChannel);
                        LOG.debug("", (Throwable)e);
                    }
                }

                private void closeQuietly(SelectableChannel socketChannel) {
                    if (socketChannel == null) {
                        return;
                    }
                    try {
                        socketChannel.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            });
            thread.setName(this.name);
            thread.setDaemon(true);
            thread.start();
        }

        public void setSuccessfullyConnected() {
            this.connectionAttempts = 0;
        }
    }

    static class EntryCallback
    extends Replica.EntryCallback
    implements BufferResizer {
        private final Replica.EntryExternalizable externalizable;
        @NotNull
        private ByteBufferBytes in;
        @NotNull
        private ByteBuffer out;

        EntryCallback(@NotNull Replica.EntryExternalizable externalizable, int tcpBufferSize) {
            this.externalizable = externalizable;
            this.out = ByteBuffer.allocateDirect(tcpBufferSize);
            this.in = new ByteBufferBytes(this.out);
        }

        @NotNull
        public ByteBufferBytes in() {
            return this.in;
        }

        @NotNull
        public ByteBuffer out() {
            return this.out;
        }

        @Override
        public Bytes resizeBuffer(int size) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("resizing buffer to size=" + size);
            }
            if (size < this.out.capacity()) {
                throw new IllegalStateException("it not possible to resize the buffer smaller");
            }
            assert (size < Integer.MAX_VALUE);
            ByteBuffer result = ByteBuffer.allocate(size).order(ByteOrder.nativeOrder());
            long bytesPosition = this.in.position();
            this.in = new ByteBufferBytes(result);
            this.out.position(0);
            this.out.limit((int)bytesPosition);
            this.in.write(this.out);
            this.out = result;
            assert ((long)this.out.capacity() == this.in.capacity());
            assert (this.out.capacity() == size);
            assert ((long)this.out.capacity() == this.in.capacity());
            assert (this.in.limit() == this.in.capacity());
            return this.in;
        }

        @Override
        public boolean shouldBeIgnored(Bytes entry, int chronicleId) {
            return !this.externalizable.identifierCheck(entry, chronicleId);
        }

        @Override
        public boolean onEntry(Bytes entry, int chronicleId) {
            long startOfEntry = entry.position();
            long pos0 = this.in.position();
            long start = 0L;
            try {
                this.in.writeByte(StatelessChronicleMap.EventId.STATEFUL_UPDATE.ordinal());
                long sizeLocation = this.in.position();
                this.in.skip(4L);
                start = this.in.position();
                this.externalizable.writeExternalEntry(entry, (Bytes)this.in, chronicleId);
                if (this.in.position() == start) {
                    this.in.position(pos0);
                    return false;
                }
                long bytesWritten = (int)(this.in.position() - start);
                if (bytesWritten > Integer.MAX_VALUE) {
                    throw new IllegalStateException("entry too large, entries are limited to a size of 2147483647");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("sending entry of entrySize=" + (int)bytesWritten);
                }
                this.in.writeInt(sizeLocation, (int)bytesWritten);
            }
            catch (IllegalArgumentException e) {
                entry.position(startOfEntry);
                this.in.position(pos0);
                long remaining = this.in.remaining();
                int entrySize = this.externalizable.sizeOfEntry(entry, chronicleId);
                if ((long)entrySize > remaining) {
                    long newSize = start + (long)entrySize;
                    if (newSize > Integer.MAX_VALUE) {
                        return false;
                    }
                    this.resizeBuffer((int)newSize);
                    this.in.position(pos0);
                    entry.position(startOfEntry);
                    return this.onEntry(entry, chronicleId);
                }
                throw e;
            }
            return true;
        }
    }

    static class Details {
        private final InetSocketAddress address;
        private final byte localIdentifier;

        Details(@NotNull InetSocketAddress address, byte localIdentifier) {
            this.address = address;
            this.localIdentifier = localIdentifier;
        }

        public InetSocketAddress address() {
            return this.address;
        }

        public byte localIdentifier() {
            return this.localIdentifier;
        }

        public String toString() {
            return "Details{address=" + this.address + ", localIdentifier=" + this.localIdentifier + '}';
        }
    }

    static class Throttler {
        private final Selector selector;
        private final Set<SelectableChannel> channels = new CopyOnWriteArraySet<SelectableChannel>();
        private final long throttleInterval;
        private final long maxBytesInInterval;
        private long lastTime = System.currentTimeMillis();
        private long bytesWritten;

        Throttler(@NotNull Selector selector, long throttleIntervalInMillis, long bitsPerDay) {
            this.selector = selector;
            this.throttleInterval = throttleIntervalInMillis;
            double bytesPerMs = (double)bitsPerDay / (double)TimeUnit.DAYS.toMillis(1L) / 8.0;
            this.maxBytesInInterval = Math.round(bytesPerMs * (double)this.throttleInterval);
        }

        public void add(SelectableChannel selectableChannel) {
            this.channels.add(selectableChannel);
        }

        public void remove(SelectableChannel socketChannel) {
            this.channels.remove(socketChannel);
        }

        public void checkThrottleInterval() throws ClosedChannelException {
            long time = System.currentTimeMillis();
            if (this.lastTime + this.throttleInterval >= time) {
                return;
            }
            this.lastTime = time;
            this.bytesWritten = 0L;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Restoring OP_WRITE on all channels");
            }
            for (SelectableChannel selectableChannel : this.channels) {
                SelectionKey selectionKey = selectableChannel.keyFor(this.selector);
                if (selectionKey == null) continue;
                selectionKey.interestOps(selectionKey.interestOps() | 4);
            }
        }

        public void contemplateThrottleWrites(int bytesJustWritten) throws ClosedChannelException {
            this.bytesWritten += (long)bytesJustWritten;
            if (this.bytesWritten > this.maxBytesInInterval) {
                for (SelectableChannel channel : this.channels) {
                    SelectionKey selectionKey = channel.keyFor(this.selector);
                    if (selectionKey != null) {
                        selectionKey.interestOps(selectionKey.interestOps() & 0xFFFFFFFB);
                    }
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Throttling UDP writes");
                }
            }
        }
    }
}

