/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.BufferObjectDataInput;
import com.hazelcast.nio.BufferObjectDataOutput;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.Packet;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Networking {
    private static final byte[] EMPTY_BYTES = new byte[0];
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final ScheduledFuture<?> flowControlSender;
    private final ConcurrentHashMap<Long, ExecutionContext> executionContexts;

    Networking(NodeEngine nodeEngine, ConcurrentHashMap<Long, ExecutionContext> executionContexts, int flowControlPeriodMs) {
        this.nodeEngine = (NodeEngineImpl)nodeEngine;
        this.executionContexts = executionContexts;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.flowControlSender = nodeEngine.getExecutionService().scheduleWithRepetition(this::broadcastFlowControlPacket, 0L, flowControlPeriodMs, TimeUnit.MILLISECONDS);
    }

    void destroy() {
        this.flowControlSender.cancel(false);
    }

    void handle(Packet packet) throws IOException {
        if (!packet.isFlagRaised(2)) {
            this.handleStreamPacket(packet);
            return;
        }
        this.handleFlowControlPacket(packet.getConn().getEndPoint(), packet.toByteArray());
    }

    private void handleStreamPacket(Packet packet) throws IOException {
        BufferObjectDataInput in = Util.createObjectDataInput(this.nodeEngine, packet.toByteArray());
        long executionId = in.readLong();
        int vertexId = in.readInt();
        int ordinal = in.readInt();
        this.executionContexts.get(executionId).handlePacket(vertexId, ordinal, packet.getConn().getEndPoint(), in);
    }

    public static byte[] createStreamPacketHeader(NodeEngine nodeEngine, long executionId, int destinationVertexId, int ordinal) {
        BufferObjectDataOutput out = Util.createObjectDataOutput(nodeEngine);
        try {
            out.writeLong(executionId);
            out.writeInt(destinationVertexId);
            out.writeInt(ordinal);
            return out.toByteArray();
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private void broadcastFlowControlPacket() {
        try {
            Util.getRemoteMembers(this.nodeEngine).forEach(member -> Util.uncheckRun(() -> {
                byte[] packetBuf = this.createFlowControlPacket((Address)member);
                if (packetBuf.length == 0) {
                    return;
                }
                Connection conn = Util.getMemberConnection(this.nodeEngine, member);
                conn.write(new Packet(packetBuf).setPacketType(Packet.Type.JET).raiseFlags(18));
            }));
        }
        catch (Throwable t) {
            this.logger.severe("Flow-control packet broadcast failed", t);
        }
    }

    private byte[] createFlowControlPacket(Address member) throws IOException {
        BufferObjectDataOutput out = Util.createObjectDataOutput(this.nodeEngine);
        boolean[] hasData = new boolean[]{false};
        out.writeInt(this.executionContexts.size());
        this.executionContexts.forEach((execId, exeCtx) -> Util.uncheckRun(() -> {
            if (!exeCtx.isParticipating(member)) {
                return;
            }
            out.writeLong((long)execId);
            out.writeInt(exeCtx.receiverMap().values().stream().mapToInt(Map::size).sum());
            exeCtx.receiverMap().forEach((vertexId, ordinalToSenderToTasklet) -> ordinalToSenderToTasklet.forEach((ordinal, senderToTasklet) -> Util.uncheckRun(() -> {
                out.writeInt((int)vertexId);
                out.writeInt((int)ordinal);
                out.writeInt(((ReceiverTasklet)senderToTasklet.get(member)).updateAndGetSendSeqLimitCompressed());
                hasData[0] = true;
            })));
        }));
        return hasData[0] ? out.toByteArray() : EMPTY_BYTES;
    }

    private void handleFlowControlPacket(Address fromAddr, byte[] packet) throws IOException {
        BufferObjectDataInput in = Util.createObjectDataInput(this.nodeEngine, packet);
        int executionCtxCount = in.readInt();
        for (int j = 0; j < executionCtxCount; ++j) {
            long exeCtxId = in.readLong();
            Map senderMap = Optional.ofNullable(this.executionContexts).map(exeCtxs -> (ExecutionContext)exeCtxs.get(exeCtxId)).map(ExecutionContext::senderMap).orElse(null);
            if (senderMap == null) {
                this.logMissingExeCtx(exeCtxId);
                continue;
            }
            int flowCtlMsgCount = in.readInt();
            for (int k = 0; k < flowCtlMsgCount; ++k) {
                int destVertexId = in.readInt();
                int destOrdinal = in.readInt();
                int sendSeqLimitCompressed = in.readInt();
                SenderTasklet t = Optional.ofNullable(senderMap.get(destVertexId)).map(ordinalMap -> (Map)ordinalMap.get(destOrdinal)).map(addrMap -> (SenderTasklet)addrMap.get(fromAddr)).orElse(null);
                if (t == null) {
                    this.logMissingSenderTasklet(destVertexId, destOrdinal);
                    return;
                }
                t.setSendSeqLimitCompressed(sendSeqLimitCompressed);
            }
        }
    }

    private void logMissingExeCtx(long exeCtxId) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Ignoring flow control message applying to non-existent execution context " + exeCtxId);
        }
    }

    private void logMissingSenderTasklet(int destVertexId, int destOrdinal) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest(String.format("Ignoring flow control message applying to non-existent sender tasklet (%d, %d)", destVertexId, destOrdinal));
        }
    }
}

