/*
 * Decompiled with CFR 0.152.
 */
package io.nextop.client.node.nextop;

import com.google.common.io.ByteStreams;
import io.nextop.Id;
import io.nextop.Message;
import io.nextop.Wire;
import io.nextop.WireValue;
import io.nextop.client.MessageContext;
import io.nextop.client.MessageControl;
import io.nextop.client.MessageControlChannel;
import io.nextop.client.MessageControlNode;
import io.nextop.client.MessageControlState;
import io.nextop.client.node.AbstractMessageControlNode;
import io.nextop.log.NL;
import io.nextop.util.NoCopyByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;

public class NextopNode
extends AbstractMessageControlNode {
    public static final Config DEFAULT_CONFIG = new Config(4096);
    public static final CompressionStrategy COMPRESS_NON_BINARY = new CompressionStrategy(){

        @Override
        public boolean isCompress(Message message) {
            WireValue content = message.getContent();
            if (null == content) {
                return true;
            }
            switch (content.getType()) {
                case IMAGE: 
                case BLOB: {
                    return false;
                }
            }
            return true;
        }
    };
    private static final int DEFAULT_T_STARTUP_MS = 3000;
    private static final int DEFAULT_T_DROP_MS = 6000;
    final Config config;
    @Nullable
    Wire.Factory wireFactory;
    @Nullable
    volatile Wire.Adapter wireAdapter = null;
    boolean active = false;
    @Nullable
    ControlLooper controlLooper = null;
    final SharedTransferState sts;
    CompressionStrategy compressionStrategy = COMPRESS_NON_BINARY;
    final UpstreamActive upstreamActive;
    final int startupMs = 3000;
    final int dropTimeoutMs = 6000;
    private final Runnable ON_CONNECTED = new Runnable(){

        @Override
        public void run() {
            NextopNode.this.onConnected();
        }
    };
    private final Runnable ON_DISCONNECTED = new Runnable(){

        @Override
        public void run() {
            NextopNode.this.onDisconnected();
        }
    };
    public static final byte F_MESSAGE_START = 1;
    public static final byte F_MESSAGE_CHUNK = 2;
    public static final byte F_MESSAGE_END = 3;
    static final byte F_ACK = 4;
    static final byte F_NACK = 5;
    static final byte F_SYNC_WRITE_STATE = 112;
    static final byte F_SYNC_END = 112;
    static final byte SYNC_STATUS_OK = 0;
    static final byte SYNC_STATUS_ERROR = 1;

    public NextopNode() {
        this(DEFAULT_CONFIG);
    }

    public NextopNode(Config config) {
        this.config = config;
        this.sts = new SharedTransferState(this);
        this.upstreamActive = new UpstreamActive();
    }

    public void setWireFactory(Wire.Factory wireFactory) {
        this.wireFactory = wireFactory;
    }

    public void setWireAdapter(Wire.Adapter wireAdapter) {
        this.wireAdapter = wireAdapter;
    }

    @Override
    protected void initDownstream(MessageControlNode.Bundle savedState) {
        if (this.wireFactory instanceof MessageControlNode) {
            ((MessageControlNode)((Object)this.wireFactory)).init(this, savedState);
        }
    }

    @Override
    protected void initSelf(@Nullable MessageControlNode.Bundle savedState) {
        this.upstreamActive.up(3000);
    }

    @Override
    public void onActive(boolean active) {
        if (active && this.wireFactory instanceof MessageControlNode) {
            ((MessageControlNode)((Object)this.wireFactory)).onActive(active);
        }
        if (this.active != active) {
            this.active = active;
            if (active) {
                assert (null == this.controlLooper);
                this.controlLooper = new ControlLooper();
                this.controlLooper.start();
            } else {
                assert (null != this.controlLooper);
                this.controlLooper.interrupt();
                this.controlLooper = null;
            }
        }
        if (!active && this.wireFactory instanceof MessageControlNode) {
            ((MessageControlNode)((Object)this.wireFactory)).onActive(active);
        }
    }

    @Override
    public void onMessageControl(MessageControl mc) {
        MessageControlState mcs;
        assert (MessageControl.Direction.SEND.equals((Object)mc.dir));
        assert (this.active);
        if (this.active && !(mcs = this.getMessageControlState()).onActiveMessageControl(mc, this.upstream)) {
            mcs.add(mc);
        }
    }

    private void onConnected() {
        this.upstreamActive.up();
    }

    private void onDisconnected() {
        this.upstreamActive.down(6000);
    }

    static byte[] nack(Id id) {
        byte[] nack = new byte[33];
        int c = 0;
        nack[c] = 5;
        Id.toBytes(id, nack, ++c);
        return nack;
    }

    static byte[] ack(Id id) {
        byte[] ack = new byte[33];
        int c = 0;
        ack[c] = 4;
        Id.toBytes(id, ack, ++c);
        return ack;
    }

    static WireValue pkg(MessageControl mc) {
        return MessageControl.toWireValue(mc);
    }

    static MessageControl unpkg(WireValue value) {
        return MessageControl.fromWireValue(value);
    }

    public static interface CompressionStrategy {
        public boolean isCompress(Message var1);
    }

    static final class MessageReadState {
        final Id id;
        final boolean compressed;
        final byte[] bytes;
        final int[] chunkOffsets;
        final boolean[] chunkReads;

        MessageReadState(Id id, int length, int chunkCount, boolean compressed) {
            if (length < chunkCount) {
                throw new IllegalArgumentException();
            }
            this.id = id;
            this.compressed = compressed;
            this.bytes = new byte[length];
            this.chunkOffsets = new int[chunkCount];
            this.chunkReads = new boolean[chunkCount];
        }
    }

    static final class MessageWriteState {
        final Id id;
        final byte[] bytes;
        final boolean compressed;
        final int[] chunkOffsets;
        final boolean[] chunkWrites;

        MessageWriteState(Id id, byte[] bytes, int[] chunkOffsets, boolean compressed) {
            this.id = id;
            this.bytes = bytes;
            this.chunkOffsets = chunkOffsets;
            this.compressed = compressed;
            this.chunkWrites = new boolean[chunkOffsets.length];
        }
    }

    static final class SharedTransferState {
        MessageControlState writePendingAck;
        Map<Id, MessageWriteState> writeStates;
        Map<Id, MessageReadState> readStates;
        Queue<byte[]> writeUrgentMessages;

        SharedTransferState(MessageContext context) {
            this.writePendingAck = new MessageControlState(context);
            this.writeStates = new HashMap<Id, MessageWriteState>(32);
            this.readStates = new HashMap<Id, MessageReadState>(32);
            this.writeUrgentMessages = new ConcurrentLinkedQueue<byte[]>();
        }

        synchronized void membar() {
        }
    }

    final class ReadLooper
    extends Thread {
        final SharedWireState sws;
        final MessageControlState mcs;
        final byte[] controlBuffer;

        ReadLooper(SharedWireState sws) {
            this.mcs = NextopNode.this.getMessageControlState();
            this.controlBuffer = new byte[1024];
            this.sws = sws;
        }

        @Override
        public void run() {
            NextopNode.this.sts.membar();
            Id id = null;
            MessageReadState readState = null;
            try {
                NL.nl.message("node.nextop.read", "Start read loop", new Object[0]);
                block11: while (this.sws.active) {
                    this.sws.wire.read(this.controlBuffer, 0, 1, 0);
                    long startNanos = System.nanoTime();
                    byte type = this.controlBuffer[0];
                    switch (type) {
                        case 1: {
                            int c = 41;
                            this.sws.wire.read(this.controlBuffer, 0, c, 0);
                            c = 0;
                            id = Id.fromBytes(this.controlBuffer, c);
                            int length = WireValue.getint(this.controlBuffer, c += 32);
                            int chunkCount = WireValue.getint(this.controlBuffer, c += 4);
                            boolean compressed = (0xFF & this.controlBuffer[c += 4]) != 0;
                            readState = NextopNode.this.sts.readStates.get(id);
                            if (null != readState) break;
                            readState = new MessageReadState(id, length, chunkCount, compressed);
                            NextopNode.this.sts.readStates.put(id, readState);
                            break;
                        }
                        case 2: {
                            if (null == readState) continue block11;
                            int c = 12;
                            this.sws.wire.read(this.controlBuffer, 0, c, 0);
                            c = 0;
                            int chunkIndex = WireValue.getint(this.controlBuffer, c);
                            int start = WireValue.getint(this.controlBuffer, c += 4);
                            int chunkLength = WireValue.getint(this.controlBuffer, c += 4);
                            int end = start + chunkLength;
                            boolean conflict = false;
                            try {
                                if (readState.chunkReads[chunkIndex]) {
                                    conflict = false;
                                } else {
                                    if (0 <= chunkIndex - 1 && readState.chunkReads[chunkIndex - 1] && start != readState.chunkOffsets[chunkIndex]) {
                                        conflict = true;
                                    }
                                    if (chunkIndex + 1 < readState.chunkOffsets.length) {
                                        if (readState.chunkReads[chunkIndex] && end != readState.chunkOffsets[chunkIndex + 1]) {
                                            conflict = true;
                                        }
                                    } else if (end != readState.bytes.length) {
                                        conflict = true;
                                    }
                                }
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                conflict = true;
                            }
                            if (conflict) {
                                NL.nl.count("node.nextop.read.conflict", new Object[0]);
                                this.sws.wire.skip(chunkLength, 0);
                                NextopNode.this.sts.writeUrgentMessages.add(NextopNode.nack(id));
                                this.sws.writeLooper.interrupt();
                                NextopNode.this.sts.readStates.remove(id);
                                continue block11;
                            }
                            this.sws.wire.read(readState.bytes, start, chunkLength, 0);
                            readState.chunkReads[chunkIndex] = true;
                            readState.chunkOffsets[chunkIndex] = start;
                            if (chunkIndex + 1 >= readState.chunkOffsets.length) break;
                            readState.chunkOffsets[chunkIndex + 1] = end;
                            break;
                        }
                        case 3: {
                            if (null == readState) continue block11;
                            int n = readState.chunkOffsets.length;
                            for (int i = 0; i < n; ++i) {
                                if (readState.chunkReads[i]) continue;
                                NextopNode.this.sts.writeUrgentMessages.add(NextopNode.nack(id));
                                this.sws.writeLooper.interrupt();
                                NextopNode.this.sts.readStates.remove(id);
                                readState = null;
                                continue block11;
                            }
                            NextopNode.this.sts.writeUrgentMessages.offer(NextopNode.ack(id));
                            this.sws.writeLooper.interrupt();
                            NextopNode.this.sts.readStates.remove(id);
                            NextopNode.this.post(new Dispatch(id, readState));
                            readState = null;
                            break;
                        }
                        case 4: {
                            int c = 32;
                            this.sws.wire.read(this.controlBuffer, 0, c, 0);
                            c = 0;
                            Id uid = Id.fromBytes(this.controlBuffer, c);
                            NextopNode.this.sts.writePendingAck.remove(uid, MessageControlState.End.COMPLETED);
                            break;
                        }
                        case 5: {
                            NL.nl.count("node.nextop.read.nack", new Object[0]);
                            int c = 32;
                            this.sws.wire.read(this.controlBuffer, 0, c, 0);
                            c = 0;
                            Id uid = Id.fromBytes(this.controlBuffer, c);
                            MessageControl mc = NextopNode.this.sts.writePendingAck.remove(uid, MessageControlState.End.ERROR);
                            if (null != mc) {
                                this.mcs.add(mc);
                                break;
                            }
                            assert (false);
                            break;
                        }
                        default: {
                            throw new IOException("Protocol error.");
                        }
                    }
                    NL.nl.metric("node.nextop.read.%s", System.nanoTime() - startNanos, (Object)TimeUnit.NANOSECONDS, type);
                }
            }
            catch (IOException e) {
                e.printStackTrace();
                this.sws.end();
            }
            NL.nl.message("node.nextop.read", "End read loop", new Object[0]);
            NextopNode.this.sts.membar();
        }

        final class Dispatch
        implements Runnable {
            final Id id;
            final MessageReadState readState;

            Dispatch(Id id, MessageReadState readState) {
                this.id = id;
                this.readState = readState;
            }

            @Override
            public void run() {
                try {
                    WireValue pkg;
                    if (this.readState.compressed) {
                        NoCopyByteArrayOutputStream os = new NoCopyByteArrayOutputStream(1024);
                        ByteStreams.copy((InputStream)new GZIPInputStream(new ByteArrayInputStream(this.readState.bytes)), (OutputStream)os);
                        pkg = WireValue.valueOf(os.getBytes(), os.getOffset());
                    } else {
                        pkg = WireValue.valueOf(this.readState.bytes);
                    }
                    MessageControl mc = NextopNode.unpkg(pkg);
                    NL.nl.count("node.nextop.read.%s", new Object[]{mc.type});
                    NextopNode.this.upstream.onMessageControl(MessageControl.receive(mc.type, mc.message));
                }
                catch (Exception e) {
                    NextopNode.this.sts.writeUrgentMessages.add(NextopNode.nack(this.id));
                    ReadLooper.this.sws.writeLooper.interrupt();
                    NL.nl.unhandled("node.nextop.read", e);
                }
            }
        }
    }

    final class WriteLooper
    extends Thread {
        final SharedWireState sws;
        final SerializationState ss;
        final MessageControlState mcs;
        final byte[] controlBuffer;

        WriteLooper(SharedWireState sws, SerializationState ss) {
            this.mcs = NextopNode.this.getMessageControlState();
            this.controlBuffer = new byte[1024];
            this.sws = sws;
            this.ss = ss;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            NextopNode.this.sts.membar();
            MessageControlState.Entry entry = null;
            try {
                NL.nl.message("node.nextop.write", "Start write loop", new Object[0]);
                block12: while (this.sws.active) {
                    MessageWriteState writeState;
                    this.pollUrgent();
                    if (null == entry && null == (entry = this.mcs.takeFirstAvailable(NextopNode.this))) {
                        this.sws.wire.flush();
                        try {
                            entry = this.mcs.takeFirstAvailable(NextopNode.this, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
                            assert (null != entry);
                            if (null == entry) {
                            }
                        }
                        catch (InterruptedException e) {}
                        continue;
                    }
                    if (null == (writeState = NextopNode.this.sts.writeStates.get(entry.id))) {
                        boolean compressed;
                        byte[] bytes;
                        long startNanos;
                        block26: {
                            startNanos = System.nanoTime();
                            ByteBuffer serBuffer = this.ss.serBuffer;
                            byte[] serBytes = this.ss.serBytes;
                            try {
                                NextopNode.pkg(entry.mc).toBytes(serBuffer);
                                serBuffer.flip();
                                int n = serBuffer.remaining();
                                assert (NextopNode.pkg(entry.mc).equals(WireValue.valueOf(serBytes)));
                                if (NextopNode.this.compressionStrategy.isCompress(entry.message)) {
                                    try {
                                        NoCopyByteArrayOutputStream os = new NoCopyByteArrayOutputStream(serBytes, n);
                                        GZIPOutputStream gzos = new GZIPOutputStream(os);
                                        try {
                                            gzos.write(serBytes, 0, n);
                                            gzos.finish();
                                        }
                                        finally {
                                            gzos.close();
                                        }
                                        bytes = os.toByteArray();
                                        compressed = true;
                                    }
                                    catch (OutOfMemoryError e) {
                                        bytes = new byte[n];
                                        System.arraycopy(serBytes, 0, bytes, 0, n);
                                        compressed = false;
                                    }
                                    break block26;
                                }
                                bytes = new byte[n];
                                System.arraycopy(serBytes, 0, bytes, 0, n);
                                compressed = false;
                            }
                            finally {
                                serBuffer.clear();
                            }
                        }
                        assert (0 < bytes.length);
                        int chunkCount = (bytes.length + NextopNode.this.config.chunkBytes - 1) / NextopNode.this.config.chunkBytes;
                        int[] chunkOffsets = new int[chunkCount];
                        chunkOffsets[0] = 0;
                        for (int i = 1; i < chunkCount; ++i) {
                            chunkOffsets[i] = chunkOffsets[i - 1] + NextopNode.this.config.chunkBytes;
                        }
                        writeState = new MessageWriteState(entry.id, bytes, chunkOffsets, compressed);
                        NL.nl.metric("node.nextop.write.state", System.nanoTime() - startNanos, (Object)TimeUnit.NANOSECONDS, new Object[0]);
                        NL.nl.count("node.nextop.write.%s", new Object[]{entry.mc.type});
                    }
                    int n = writeState.chunkOffsets.length;
                    long startNanos = System.nanoTime();
                    int c = 0;
                    this.controlBuffer[c] = 1;
                    Id.toBytes(entry.id, this.controlBuffer, ++c);
                    WireValue.putint(this.controlBuffer, c += 32, writeState.bytes.length);
                    WireValue.putint(this.controlBuffer, c += 4, n);
                    this.controlBuffer[c += 4] = writeState.compressed ? (byte)1 : 0;
                    this.sws.wire.write(this.controlBuffer, 0, ++c, 0);
                    NL.nl.metric("node.nextop.write.start", System.nanoTime() - startNanos, (Object)TimeUnit.NANOSECONDS, new Object[0]);
                    for (int i = 0; i < n; ++i) {
                        int end;
                        this.pollUrgent();
                        if (writeState.chunkWrites[i]) continue;
                        if (null != entry.end) {
                            entry = null;
                            continue block12;
                        }
                        int start = writeState.chunkOffsets[i];
                        int n2 = end = i + 1 < n ? writeState.chunkOffsets[i + 1] : writeState.bytes.length;
                        assert (start < end);
                        long startNanos2 = System.nanoTime();
                        int c2 = 0;
                        this.controlBuffer[c2] = 2;
                        WireValue.putint(this.controlBuffer, ++c2, i);
                        WireValue.putint(this.controlBuffer, c2 += 4, start);
                        WireValue.putint(this.controlBuffer, c2 += 4, end - start);
                        this.sws.wire.write(this.controlBuffer, 0, c2 += 4, 0);
                        this.sws.wire.write(writeState.bytes, start, end - start, 0);
                        NL.nl.metric("node.nextop.write.chunk", System.nanoTime() - startNanos2, (Object)TimeUnit.NANOSECONDS, new Object[0]);
                        writeState.chunkWrites[i] = true;
                        MessageControlState.Entry preemptEntry = this.mcs.takeFirstAvailable(entry.id, (MessageControlChannel)NextopNode.this);
                        if (null == preemptEntry) continue;
                        this.mcs.release(entry.id, NextopNode.this);
                        entry = preemptEntry;
                        continue block12;
                    }
                    startNanos = System.nanoTime();
                    c = 0;
                    this.controlBuffer[c] = 3;
                    this.sws.wire.write(this.controlBuffer, 0, ++c, 0);
                    NL.nl.metric("node.nextop.write.end", System.nanoTime() - startNanos, (Object)TimeUnit.NANOSECONDS, new Object[0]);
                    this.mcs.remove(entry.id, MessageControlState.End.COMPLETED);
                    NextopNode.this.sts.writePendingAck.add(entry.mc);
                    entry = null;
                }
                this.sws.wire.flush();
            }
            catch (IOException e) {
                e.printStackTrace();
                this.sws.end();
            }
            if (null != entry) {
                this.mcs.release(entry.id, NextopNode.this);
                entry = null;
            }
            NL.nl.message("node.nextop.write", "End write loop", new Object[0]);
            NextopNode.this.sts.membar();
        }

        private void pollUrgent() throws IOException {
            byte[] urgentMessage;
            int u = 0;
            long startNanos = System.nanoTime();
            while (null != (urgentMessage = NextopNode.this.sts.writeUrgentMessages.poll())) {
                this.sws.wire.write(urgentMessage, 0, urgentMessage.length, 0);
                ++u;
            }
            if (0 < u) {
                NL.nl.metric("node.nextop.write.urgent", System.nanoTime() - startNanos, (Object)TimeUnit.NANOSECONDS, new Object[0]);
            }
        }
    }

    static final class SerializationState {
        final byte[] serBytes = new byte[0x800000];
        final ByteBuffer serBuffer = ByteBuffer.wrap(this.serBytes);

        SerializationState() {
        }
    }

    static final class SharedWireState {
        final Wire wire;
        volatile boolean active = true;
        WriteLooper writeLooper;
        ReadLooper readLooper;

        SharedWireState(Wire wire) {
            this.wire = wire;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void end() {
            SharedWireState sharedWireState = this;
            synchronized (sharedWireState) {
                this.active = false;
                this.notifyAll();
            }
            this.writeLooper.interrupt();
            this.readLooper.interrupt();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void awaitEnd() throws InterruptedException {
            SharedWireState sharedWireState = this;
            synchronized (sharedWireState) {
                while (this.active) {
                    this.wait();
                }
            }
            this.writeLooper.join();
            this.readLooper.join();
        }
    }

    final class ControlLooper
    extends Thread {
        final byte[] controlBuffer = new byte[4096];

        ControlLooper() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            SerializationState ss = null;
            SharedWireState sws = null;
            while (NextopNode.this.active) {
                try {
                    if (null == sws || !sws.active) {
                        Wire wire;
                        try {
                            wire = NextopNode.this.wireFactory.create(null != sws ? sws.wire : null);
                        }
                        catch (NoSuchElementException e) {
                            sws = null;
                            NextopNode.this.post(NextopNode.this.ON_DISCONNECTED);
                            continue;
                        }
                        Wire.Adapter wireAdapter = NextopNode.this.wireAdapter;
                        if (null != wireAdapter) {
                            wire = wireAdapter.adapt(wire);
                        }
                        long startNanos = System.nanoTime();
                        try {
                            this.syncTransferState(wire);
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                            NextopNode.this.post(NextopNode.this.ON_DISCONNECTED);
                            continue;
                        }
                        NL.nl.metric("node.nextop.control.sync", System.nanoTime() - startNanos, (Object)TimeUnit.NANOSECONDS, new Object[0]);
                        NextopNode.this.post(NextopNode.this.ON_CONNECTED);
                        sws = new SharedWireState(wire);
                        if (null == ss) {
                            ss = new SerializationState();
                        }
                        WriteLooper writeLooper = new WriteLooper(sws, ss);
                        ReadLooper readLooper = new ReadLooper(sws);
                        sws.writeLooper = writeLooper;
                        sws.readLooper = readLooper;
                        writeLooper.start();
                        readLooper.start();
                    }
                    try {
                        sws.awaitEnd();
                    }
                    catch (InterruptedException e) {
                        NextopNode.this.post(NextopNode.this.ON_DISCONNECTED);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    NextopNode.this.post(NextopNode.this.ON_DISCONNECTED);
                }
            }
            if (null != sws) {
                sws.end();
                while (true) {
                    try {
                        sws.awaitEnd();
                    }
                    catch (InterruptedException e) {
                        continue;
                    }
                    break;
                }
            }
        }

        void syncTransferState(final Wire wire) throws IOException {
            int readCount;
            NextopNode.this.sts.membar();
            final int n = NextopNode.this.sts.writeStates.size();
            int c = 0;
            this.controlBuffer[c] = 112;
            WireValue.putint(this.controlBuffer, ++c, n);
            wire.write(this.controlBuffer, 0, c += 4, 0);
            wire.flush();
            wire.read(this.controlBuffer, 0, c, 0);
            c = 0;
            if (112 != this.controlBuffer[c]) {
                throw new IOException("Bad sync header.");
            }
            int m = WireValue.getint(this.controlBuffer, ++c);
            int bytesPerFrame = 32;
            class Writer
            extends Thread {
                int i = 0;
                Iterator<MessageWriteState> itr;
                @Nullable
                IOException e;

                Writer() {
                    this.itr = NextopNode.this.sts.writeStates.values().iterator();
                    this.e = null;
                }

                @Override
                public void run() {
                    try {
                        int writeCount;
                        while (0 < (writeCount = Math.min(n - this.i, ControlLooper.this.controlBuffer.length / 32))) {
                            for (int k = 0; k < writeCount; ++k) {
                                MessageWriteState writeState = this.itr.next();
                                Id.toBytes(writeState.id, ControlLooper.this.controlBuffer, k * 32);
                            }
                            wire.write(ControlLooper.this.controlBuffer, 0, writeCount * 32, 0);
                            this.i += writeCount;
                        }
                        wire.flush();
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                        this.e = e;
                    }
                }
            }
            Writer writer = new Writer();
            writer.start();
            int j = 0;
            Id[] pairs = new Id[m];
            while (0 < (readCount = Math.min(m - j, this.controlBuffer.length / 32))) {
                wire.read(this.controlBuffer, 0, readCount * 32, 0);
                for (int k = 0; k < readCount; ++k) {
                    Id id;
                    pairs[j + k] = id = Id.fromBytes(this.controlBuffer, k * 32);
                }
                j += readCount;
            }
            NextopNode.this.sts.readStates.keySet().retainAll(Arrays.asList(pairs));
            while (true) {
                try {
                    writer.join();
                }
                catch (InterruptedException e) {
                    continue;
                }
                break;
            }
            if (null != writer.e) {
                throw writer.e;
            }
            int c2 = 0;
            this.controlBuffer[c2] = 112;
            this.controlBuffer[++c2] = 0;
            wire.write(this.controlBuffer, 0, ++c2, 0);
            wire.flush();
            wire.read(this.controlBuffer, 0, c2, 0);
            c2 = 0;
            if (112 != this.controlBuffer[c2]) {
                throw new IOException("Bad sync end.");
            }
            if (0 != this.controlBuffer[++c2]) {
                throw new IOException("Bad sync status.");
            }
            NextopNode.this.sts.membar();
        }
    }

    final class UpstreamActive {
        boolean active = false;
        long pendingDownTime = 0L;
        @Nullable
        Runnable pendingDown = null;

        UpstreamActive() {
        }

        private void clearPendingDown() {
            this.pendingDownTime = 0L;
            this.pendingDown = null;
        }

        void up() {
            this.clearPendingDown();
            if (!this.active) {
                this.active = true;
                NextopNode.this.upstream.onActive(true);
            }
        }

        void down() {
            this.clearPendingDown();
            if (this.active) {
                this.active = false;
                NextopNode.this.upstream.onActive(false);
            }
        }

        void up(int holdMs) {
            this.up();
            this.down(holdMs);
        }

        void down(int delayMs) {
            long downTime = System.currentTimeMillis() + (long)delayMs;
            if (this.pendingDownTime < downTime) {
                this.pendingDownTime = downTime;
                this.pendingDown = new Runnable(){

                    @Override
                    public void run() {
                        if (this == UpstreamActive.this.pendingDown) {
                            UpstreamActive.this.down();
                        }
                    }
                };
                NextopNode.this.postDelayed(this.pendingDown, delayMs);
            }
        }
    }

    public static final class Config {
        public final int chunkBytes;

        public Config(int chunkBytes) {
            this.chunkBytes = chunkBytes;
        }
    }
}

