/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.internal.networking.OutboundFrame;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.BufferObjectDataInput;
import com.hazelcast.nio.BufferObjectDataOutput;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.Packet;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Networking {
    private static final byte[] EMPTY_BYTES = new byte[0];
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final JobExecutionService jobExecutionService;
    private final ScheduledFuture<?> flowControlSender;

    Networking(NodeEngine nodeEngine, JobExecutionService jobExecutionService, int flowControlPeriodMs) {
        this.nodeEngine = (NodeEngineImpl)nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobExecutionService = jobExecutionService;
        this.flowControlSender = nodeEngine.getExecutionService().scheduleWithRepetition(this::broadcastFlowControlPacket, 0L, (long)flowControlPeriodMs, TimeUnit.MILLISECONDS);
    }

    void shutdown() {
        this.flowControlSender.cancel(false);
    }

    void handle(Packet packet) throws IOException {
        if (!packet.isFlagRaised(2)) {
            this.handleStreamPacket(packet);
            return;
        }
        this.handleFlowControlPacket(packet.getConn().getEndPoint(), packet.toByteArray());
    }

    private void handleStreamPacket(Packet packet) throws IOException {
        BufferObjectDataInput in = Util.createObjectDataInput((NodeEngine)this.nodeEngine, packet.toByteArray());
        long executionId = in.readLong();
        int vertexId = in.readInt();
        int ordinal = in.readInt();
        ExecutionContext executionContext = this.jobExecutionService.getExecutionContext(executionId);
        executionContext.handlePacket(vertexId, ordinal, packet.getConn().getEndPoint(), in);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static byte[] createStreamPacketHeader(NodeEngine nodeEngine, long executionId, int destinationVertexId, int ordinal) {
        try (BufferObjectDataOutput out = Util.createObjectDataOutput(nodeEngine);){
            out.writeLong(executionId);
            out.writeInt(destinationVertexId);
            out.writeInt(ordinal);
            byte[] byArray = out.toByteArray();
            return byArray;
        }
        catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private void broadcastFlowControlPacket() {
        try {
            Util.getRemoteMembers((NodeEngine)this.nodeEngine).forEach(member -> Util.uncheckRun(() -> {
                byte[] packetBuf = this.createFlowControlPacket((Address)member);
                if (packetBuf.length == 0) {
                    return;
                }
                Connection conn = Util.getMemberConnection((NodeEngine)this.nodeEngine, member);
                if (conn != null) {
                    conn.write((OutboundFrame)new Packet(packetBuf).setPacketType(Packet.Type.JET).raiseFlags(18));
                }
            }));
        }
        catch (Throwable t) {
            this.logger.severe("Flow-control packet broadcast failed", t);
        }
    }

    private byte[] createFlowControlPacket(Address member) throws IOException {
        try (BufferObjectDataOutput out = Util.createObjectDataOutput((NodeEngine)this.nodeEngine);){
            boolean[] hasData = new boolean[]{false};
            Map<Long, ExecutionContext> executionContexts = this.jobExecutionService.getExecutionContextsFor(member);
            out.writeInt(executionContexts.size());
            executionContexts.forEach((execId, exeCtx) -> Util.uncheckRun(() -> {
                out.writeLong(execId.longValue());
                out.writeInt(exeCtx.receiverMap().values().stream().mapToInt(Map::size).sum());
                exeCtx.receiverMap().forEach((vertexId, ordinalToSenderToTasklet) -> ordinalToSenderToTasklet.forEach((ordinal, senderToTasklet) -> Util.uncheckRun(() -> {
                    out.writeInt(vertexId.intValue());
                    out.writeInt(ordinal.intValue());
                    out.writeInt(((ReceiverTasklet)senderToTasklet.get(member)).updateAndGetSendSeqLimitCompressed());
                    hasData[0] = true;
                })));
            }));
            byte[] byArray = hasData[0] ? out.toByteArray() : EMPTY_BYTES;
            return byArray;
        }
    }

    private void handleFlowControlPacket(Address fromAddr, byte[] packet) throws IOException {
        try (BufferObjectDataInput in = Util.createObjectDataInput((NodeEngine)this.nodeEngine, packet);){
            int executionCtxCount = in.readInt();
            for (int j = 0; j < executionCtxCount; ++j) {
                long executionId = in.readLong();
                Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> senderMap = this.jobExecutionService.getSenderMap(executionId);
                if (senderMap == null) {
                    this.logMissingExeCtx(executionId);
                    continue;
                }
                int flowCtlMsgCount = in.readInt();
                for (int k = 0; k < flowCtlMsgCount; ++k) {
                    int destVertexId = in.readInt();
                    int destOrdinal = in.readInt();
                    int sendSeqLimitCompressed = in.readInt();
                    SenderTasklet t = Optional.ofNullable(senderMap.get(destVertexId)).map(ordinalMap -> (Map)ordinalMap.get(destOrdinal)).map(addrMap -> (SenderTasklet)addrMap.get(fromAddr)).orElse(null);
                    if (t == null) {
                        this.logMissingSenderTasklet(destVertexId, destOrdinal);
                        return;
                    }
                    t.setSendSeqLimitCompressed(sendSeqLimitCompressed);
                }
            }
        }
    }

    private void logMissingExeCtx(long executionId) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Ignoring flow control message applying to non-existent execution context " + Util.idToString(executionId));
        }
    }

    private void logMissingSenderTasklet(int destVertexId, int destOrdinal) {
        if (this.logger.isFinestEnabled()) {
            this.logger.finest(String.format("Ignoring flow control message applying to non-existent sender tasklet (%d, %d)", destVertexId, destOrdinal));
        }
    }
}

