/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.collections;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import net.openhft.collections.AbstractChannelReplicator;
import net.openhft.collections.Replica;
import net.openhft.collections.ReplicaExternalizable;
import net.openhft.lang.collection.DirectBitSet;
import net.openhft.lang.collection.SingleThreadedDirectBitSet;
import net.openhft.lang.io.AbstractBytes;
import net.openhft.lang.io.ByteBufferBytes;
import net.openhft.lang.io.Bytes;
import net.openhft.lang.io.RandomDataInput;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ClusterReplicator<K, V>
implements ReplicaExternalizable<K, V>,
Closeable {
    public static final int WRITE_BUFFER_SIZE = 1024;
    private final byte localIdentifer;
    private static final Logger LOG = LoggerFactory.getLogger((String)ClusterReplicator.class.getName());
    private final byte BOOTSTAP_MESSAGE = (byte)66;
    private final DirectBitSet chronicalChannelBitSet;
    private final ReplicaExternalizable[] chronicalChannels;
    private final AtomicReferenceArray<PayloadProvider> systemModificationIterator = new AtomicReferenceArray(128);
    private final DirectBitSet systemModificationIteratorBitSet = ClusterReplicator.newBitSet(this.systemModificationIterator.length());
    private final AtomicReferenceArray<Replica.ModificationIterator> modificationIterator = new AtomicReferenceArray(128);
    private final Set<AbstractChannelReplicator> replicators = new CopyOnWriteArraySet<AbstractChannelReplicator>();
    private final MessageHandler systemMessageHandler = new MessageHandler(){

        @Override
        public void onMessage(Bytes bytes) {
            byte type = bytes.readByte();
            if (type == 66) {
                ClusterReplicator.this.onBootstrapMessage(bytes);
            } else {
                LOG.info("message of type=" + type + " was ignored.");
            }
        }
    };
    private final SystemQueue systemMessageQueue;

    public ClusterReplicator(byte identifier, int maxNumberOfChronicleChannels) {
        this.localIdentifer = identifier;
        this.chronicalChannels = new ReplicaExternalizable[maxNumberOfChronicleChannels];
        this.chronicalChannelBitSet = ClusterReplicator.newBitSet(this.chronicalChannels.length);
        this.systemMessageQueue = new SystemQueue(this.systemModificationIteratorBitSet, this.systemModificationIterator, this.systemMessageHandler);
        this.add((short)0, this.systemMessageQueue.replicaExternalizable);
    }

    private void onBootstrapMessage(Bytes bytes) {
        short remoteIdentifier = bytes.readByte();
        int chronicleChannel = bytes.readUnsignedShort();
        long lastModificationTime = bytes.readLong();
        if (this.chronicalChannels[chronicleChannel] != null) {
            this.chronicalChannels[chronicleChannel].acquireModificationIterator(remoteIdentifier, Replica.ModificationNotifier.NOP).dirtyEntries(lastModificationTime);
        }
    }

    private ByteBufferBytes toBootstrapMessage(short chronicleChannel, long lastModificationTime) {
        ByteBufferBytes writeBuffer = new ByteBufferBytes(ByteBuffer.allocate(12));
        writeBuffer.writeByte(66);
        writeBuffer.writeByte((int)this.localIdentifer);
        writeBuffer.writeUnsignedShort((int)chronicleChannel);
        writeBuffer.writeLong(lastModificationTime);
        writeBuffer.flip();
        return writeBuffer;
    }

    private static DirectBitSet newBitSet(int numberOfBits) {
        ByteBufferBytes bytes = new ByteBufferBytes(ByteBuffer.wrap(new byte[(numberOfBits + 7) / 8]));
        return new SingleThreadedDirectBitSet((Bytes)bytes);
    }

    public void add(short chronicleChannel, ReplicaExternalizable replica) {
        if (this.chronicalChannels[chronicleChannel] != null) {
            throw new IllegalArgumentException("chronicleId=" + chronicleChannel + " is already in use.");
        }
        this.chronicalChannels[chronicleChannel] = replica;
        this.chronicalChannelBitSet.set((long)chronicleChannel);
        if (chronicleChannel == 0) {
            return;
        }
        int i = (int)this.systemModificationIteratorBitSet.nextSetBit(0L);
        while (i > 0) {
            byte remoteIdentifier = (byte)i;
            long lastModificationTime = replica.lastModificationTime(remoteIdentifier);
            ByteBufferBytes message = this.toBootstrapMessage(chronicleChannel, lastModificationTime);
            this.systemModificationIterator.get(remoteIdentifier).addPayload((Bytes)message);
            i = (int)this.systemModificationIteratorBitSet.nextSetBit((long)(i + 1));
        }
    }

    public void writeExternalEntry(@NotNull AbstractBytes entry, @NotNull Bytes destination, int chronicleChannel) {
        destination.writeStopBit((long)chronicleChannel);
        this.chronicalChannels[chronicleChannel].writeExternalEntry(entry, destination, chronicleChannel);
    }

    public void readExternalEntry(@NotNull Bytes source) {
        int chronicleId = (int)source.readStopBit();
        if (chronicleId < this.chronicalChannels.length) {
            this.chronicalChannels[chronicleId].readExternalEntry(source);
        } else {
            LOG.info("skipped entry with chronicleId=" + chronicleId + ", ");
        }
    }

    public byte identifier() {
        return this.localIdentifer;
    }

    public Replica.ModificationIterator acquireModificationIterator(final short remoteIdentifier, final Replica.ModificationNotifier notifier) {
        Replica.ModificationIterator result = this.modificationIterator.get(remoteIdentifier);
        if (result != null) {
            return result;
        }
        Replica.ModificationIterator result0 = new Replica.ModificationIterator(){

            public boolean hasNext() {
                int i = (int)ClusterReplicator.this.chronicalChannelBitSet.nextSetBit(0L);
                while (i >= 0) {
                    Replica.ModificationIterator modificationIterator = ClusterReplicator.this.chronicalChannels[i].acquireModificationIterator(remoteIdentifier, notifier);
                    if (modificationIterator.hasNext()) {
                        return true;
                    }
                    i = (int)ClusterReplicator.this.chronicalChannelBitSet.nextSetBit((long)(i + 1));
                }
                return false;
            }

            public boolean nextEntry(@NotNull Replica.AbstractEntryCallback callback, int na) {
                int i = (int)ClusterReplicator.this.chronicalChannelBitSet.nextSetBit(0L);
                while (i >= 0) {
                    Replica.ModificationIterator modificationIterator = ClusterReplicator.this.chronicalChannels[i].acquireModificationIterator(remoteIdentifier, notifier);
                    if (modificationIterator.nextEntry(callback, i)) {
                        return true;
                    }
                    i = (int)ClusterReplicator.this.chronicalChannelBitSet.nextSetBit((long)(i + 1));
                }
                return false;
            }

            public void dirtyEntries(long fromTimeStamp) {
                int i = (int)ClusterReplicator.this.chronicalChannelBitSet.nextSetBit(0L);
                while (i >= 0) {
                    ClusterReplicator.this.chronicalChannels[i].acquireModificationIterator(remoteIdentifier, notifier).dirtyEntries(fromTimeStamp);
                    i = (int)ClusterReplicator.this.chronicalChannelBitSet.nextSetBit((long)(i + 1));
                }
                notifier.onChange();
            }
        };
        this.modificationIterator.set(remoteIdentifier, result0);
        return result0;
    }

    public long lastModificationTime(byte remoteIdentifier) {
        if (this.chronicalChannelBitSet.nextSetBit(0L) == -1L) {
            return 0L;
        }
        long t = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(1L);
        int i = (int)this.chronicalChannelBitSet.nextSetBit(0L);
        while (i > 0) {
            t = Math.min(t, this.chronicalChannels[i].lastModificationTime(remoteIdentifier));
            i = (int)this.chronicalChannelBitSet.nextSetBit((long)(i + 1));
        }
        return t;
    }

    @Override
    public void close() throws IOException {
        for (AbstractChannelReplicator replicator : this.replicators) {
            replicator.close();
        }
        int i = (int)this.chronicalChannelBitSet.nextSetBit(0L);
        while (i > 0) {
            this.chronicalChannels[i].close();
            i = (int)this.chronicalChannelBitSet.nextSetBit((long)(i + 1));
        }
    }

    public void add(AbstractChannelReplicator replicator) {
        this.replicators.add(replicator);
    }

    static class SystemQueue {
        private final DirectBitSet systemModificationIteratorBitSet;
        private final AtomicReferenceArray<PayloadProvider> systemModificationIterator;
        private final MessageHandler messageHandler;
        final ReplicaExternalizable replicaExternalizable = new ReplicaExternalizable(){

            public byte identifier() {
                return 0;
            }

            public Replica.ModificationIterator acquireModificationIterator(short remoteIdentifier, final Replica.ModificationNotifier modificationNotifier) {
                Replica.ModificationIterator result = (Replica.ModificationIterator)systemModificationIterator.get(remoteIdentifier);
                if (result != null) {
                    return result;
                }
                PayloadProvider iterator = new PayloadProvider(){
                    final ByteBufferBytes writeBuffer = new ByteBufferBytes(ByteBuffer.allocate(1024));
                    final Queue<Bytes> payloads = new LinkedTransferQueue<Bytes>();

                    public boolean hasNext() {
                        return this.payloads.peek() != null;
                    }

                    public boolean nextEntry(@NotNull Replica.AbstractEntryCallback callback, int na) {
                        Bytes bytes = this.payloads.poll();
                        if (bytes == null) {
                            return false;
                        }
                        if (bytes instanceof AbstractBytes) {
                            callback.onEntry((AbstractBytes)bytes, 0);
                        } else {
                            this.writeBuffer.clear();
                            this.writeBuffer.write((RandomDataInput)bytes);
                            callback.onEntry((AbstractBytes)this.writeBuffer, 0);
                        }
                        return true;
                    }

                    public void dirtyEntries(long fromTimeStamp) {
                    }

                    @Override
                    public void addPayload(Bytes bytes) {
                        if (bytes.remaining() == 0L) {
                            return;
                        }
                        if (bytes.remaining() > this.writeBuffer.capacity()) {
                            throw new IllegalArgumentException("BUFFER_OVERFLOW: bytes.remaining()=" + bytes.remaining() + " " + " the maximum allow number of bytes is " + this.writeBuffer.capacity());
                        }
                        this.payloads.add(bytes);
                        modificationNotifier.onChange();
                    }
                };
                systemModificationIterator.set(remoteIdentifier, iterator);
                systemModificationIteratorBitSet.set((long)remoteIdentifier);
                return iterator;
            }

            public long lastModificationTime(byte remoteIdentifier) {
                return 0L;
            }

            public void close() throws IOException {
            }

            public void writeExternalEntry(@NotNull AbstractBytes entry, @NotNull Bytes destination, int na) {
                destination.write((RandomDataInput)entry);
            }

            public void readExternalEntry(@NotNull Bytes source) {
                messageHandler.onMessage(source);
            }
        };

        SystemQueue(DirectBitSet systemModificationIteratorBitSet, AtomicReferenceArray<PayloadProvider> systemModificationIterator, MessageHandler messageHandler) {
            this.systemModificationIteratorBitSet = systemModificationIteratorBitSet;
            this.systemModificationIterator = systemModificationIterator;
            this.messageHandler = messageHandler;
        }
    }

    static interface PayloadProvider
    extends Replica.ModificationIterator {
        public void addPayload(Bytes var1);
    }

    static interface MessageHandler {
        public void onMessage(Bytes var1);
    }
}

