/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

abstract class NettyMessage {
    static final int HEADER_LENGTH = 9;
    static final int MAGIC_NUMBER = -1159983106;

    NettyMessage() {
    }

    abstract ByteBuf write(ByteBufAllocator var1) throws Exception;

    abstract void readFrom(ByteBuf var1) throws Exception;

    private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
        return NettyMessage.allocateBuffer(allocator, id, 0);
    }

    private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) {
        ByteBuf buffer = length != 0 ? allocator.directBuffer(9 + length) : allocator.directBuffer();
        buffer.writeInt(9 + length);
        buffer.writeInt(-1159983106);
        buffer.writeByte((int)id);
        return buffer;
    }

    static class CloseRequest
    extends NettyMessage {
        private static final byte ID = 5;

        @Override
        ByteBuf write(ByteBufAllocator allocator) throws Exception {
            return NettyMessage.allocateBuffer(allocator, (byte)5, 0);
        }

        @Override
        void readFrom(ByteBuf buffer) throws Exception {
        }
    }

    static class CancelPartitionRequest
    extends NettyMessage {
        static final byte ID = 4;
        InputChannelID receiverId;

        public CancelPartitionRequest() {
        }

        public CancelPartitionRequest(InputChannelID receiverId) {
            this.receiverId = receiverId;
        }

        @Override
        ByteBuf write(ByteBufAllocator allocator) throws Exception {
            ByteBuf result = null;
            try {
                result = NettyMessage.allocateBuffer(allocator, (byte)4, 16);
                this.receiverId.writeTo(result);
            }
            catch (Throwable t) {
                if (result != null) {
                    result.release();
                }
                throw new IOException(t);
            }
            return result;
        }

        @Override
        void readFrom(ByteBuf buffer) throws Exception {
            this.receiverId = InputChannelID.fromByteBuf(buffer);
        }
    }

    static class TaskEventRequest
    extends NettyMessage {
        static final byte ID = 3;
        TaskEvent event;
        InputChannelID receiverId;
        ResultPartitionID partitionId;

        public TaskEventRequest() {
        }

        TaskEventRequest(TaskEvent event, ResultPartitionID partitionId, InputChannelID receiverId) {
            this.event = event;
            this.receiverId = receiverId;
            this.partitionId = partitionId;
        }

        @Override
        ByteBuf write(ByteBufAllocator allocator) throws IOException {
            ByteBuf result = null;
            try {
                ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(this.event);
                result = NettyMessage.allocateBuffer(allocator, (byte)3, 4 + serializedEvent.remaining() + 16 + 16 + 16);
                result.writeInt(serializedEvent.remaining());
                result.writeBytes(serializedEvent);
                this.partitionId.getPartitionId().writeTo(result);
                this.partitionId.getProducerId().writeTo(result);
                this.receiverId.writeTo(result);
                return result;
            }
            catch (Throwable t) {
                if (result != null) {
                    result.release();
                }
                throw new IOException(t);
            }
        }

        @Override
        public void readFrom(ByteBuf buffer) throws IOException {
            int length = buffer.readInt();
            ByteBuffer serializedEvent = ByteBuffer.allocate(length);
            buffer.readBytes(serializedEvent);
            serializedEvent.flip();
            this.event = (TaskEvent)EventSerializer.fromSerializedEvent(serializedEvent, this.getClass().getClassLoader());
            this.partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
            this.receiverId = InputChannelID.fromByteBuf(buffer);
        }
    }

    static class PartitionRequest
    extends NettyMessage {
        static final byte ID = 2;
        ResultPartitionID partitionId;
        int queueIndex;
        InputChannelID receiverId;

        public PartitionRequest() {
        }

        PartitionRequest(ResultPartitionID partitionId, int queueIndex, InputChannelID receiverId) {
            this.partitionId = partitionId;
            this.queueIndex = queueIndex;
            this.receiverId = receiverId;
        }

        @Override
        ByteBuf write(ByteBufAllocator allocator) throws IOException {
            ByteBuf result = null;
            try {
                result = NettyMessage.allocateBuffer(allocator, (byte)2, 52);
                this.partitionId.getPartitionId().writeTo(result);
                this.partitionId.getProducerId().writeTo(result);
                result.writeInt(this.queueIndex);
                this.receiverId.writeTo(result);
                return result;
            }
            catch (Throwable t) {
                if (result != null) {
                    result.release();
                }
                throw new IOException(t);
            }
        }

        @Override
        public void readFrom(ByteBuf buffer) {
            this.partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
            this.queueIndex = buffer.readInt();
            this.receiverId = InputChannelID.fromByteBuf(buffer);
        }

        public String toString() {
            return String.format("PartitionRequest(%s:%d)", this.partitionId, this.queueIndex);
        }
    }

    static class ErrorResponse
    extends NettyMessage {
        private static final byte ID = 1;
        Throwable cause;
        InputChannelID receiverId;

        public ErrorResponse() {
        }

        ErrorResponse(Throwable cause) {
            this.cause = cause;
        }

        ErrorResponse(Throwable cause, InputChannelID receiverId) {
            this.cause = cause;
            this.receiverId = receiverId;
        }

        boolean isFatalError() {
            return this.receiverId == null;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        ByteBuf write(ByteBufAllocator allocator) throws IOException {
            ByteBuf result = NettyMessage.allocateBuffer(allocator, (byte)1);
            try (ObjectOutputStream oos = new ObjectOutputStream((OutputStream)new ByteBufOutputStream(result));){
                oos.writeObject(this.cause);
                if (this.receiverId != null) {
                    result.writeBoolean(true);
                    this.receiverId.writeTo(result);
                } else {
                    result.writeBoolean(false);
                }
                result.setInt(0, result.readableBytes());
                ByteBuf byteBuf = result;
                return byteBuf;
            }
            catch (Throwable t) {
                result.release();
                if (!(t instanceof IOException)) throw new IOException(t);
                throw (IOException)t;
            }
        }

        @Override
        void readFrom(ByteBuf buffer) throws Exception {
            try (ObjectInputStream ois = new ObjectInputStream((InputStream)new ByteBufInputStream(buffer));){
                Object obj = ois.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new ClassCastException("Read object expected to be of type Throwable, actual type is " + obj.getClass() + ".");
                }
                this.cause = (Throwable)obj;
                if (buffer.readBoolean()) {
                    this.receiverId = InputChannelID.fromByteBuf(buffer);
                }
            }
        }
    }

    static class BufferResponse
    extends NettyMessage {
        private static final byte ID = 0;
        final Buffer buffer;
        InputChannelID receiverId;
        int sequenceNumber;
        boolean isBuffer;
        int size;
        ByteBuf retainedSlice;

        public BufferResponse() {
            this.buffer = null;
        }

        public BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId) {
            this.buffer = buffer;
            this.sequenceNumber = sequenceNumber;
            this.receiverId = receiverId;
        }

        boolean isBuffer() {
            return this.isBuffer;
        }

        int getSize() {
            return this.size;
        }

        ByteBuf getNettyBuffer() {
            return this.retainedSlice;
        }

        void releaseBuffer() {
            if (this.retainedSlice != null) {
                this.retainedSlice.release();
                this.retainedSlice = null;
            }
        }

        @Override
        ByteBuf write(ByteBufAllocator allocator) throws IOException {
            int length = 25 + this.buffer.getSize();
            ByteBuf result = null;
            try {
                result = NettyMessage.allocateBuffer(allocator, (byte)0, length);
                this.receiverId.writeTo(result);
                result.writeInt(this.sequenceNumber);
                result.writeBoolean(this.buffer.isBuffer());
                result.writeInt(this.buffer.getSize());
                result.writeBytes(this.buffer.getNioBuffer());
                ByteBuf byteBuf = result;
                return byteBuf;
            }
            catch (Throwable t) {
                if (result != null) {
                    result.release();
                }
                throw new IOException(t);
            }
            finally {
                if (this.buffer != null) {
                    this.buffer.recycle();
                }
            }
        }

        @Override
        void readFrom(ByteBuf buffer) {
            this.receiverId = InputChannelID.fromByteBuf(buffer);
            this.sequenceNumber = buffer.readInt();
            this.isBuffer = buffer.readBoolean();
            this.size = buffer.readInt();
            this.retainedSlice = buffer.readSlice(this.size);
            this.retainedSlice.retain();
        }
    }

    @ChannelHandler.Sharable
    static class NettyMessageDecoder
    extends MessageToMessageDecoder<ByteBuf> {
        NettyMessageDecoder() {
        }

        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
            int magicNumber = msg.readInt();
            if (magicNumber != -1159983106) {
                throw new IllegalStateException("Network stream corrupted: received incorrect magic number.");
            }
            byte msgId = msg.readByte();
            NettyMessage decodedMsg = null;
            if (msgId == 0) {
                decodedMsg = new BufferResponse();
            } else if (msgId == 2) {
                decodedMsg = new PartitionRequest();
            } else if (msgId == 3) {
                decodedMsg = new TaskEventRequest();
            } else if (msgId == 1) {
                decodedMsg = new ErrorResponse();
            } else if (msgId == 4) {
                decodedMsg = new CancelPartitionRequest();
            } else if (msgId == 5) {
                decodedMsg = new CloseRequest();
            } else {
                throw new IllegalStateException("Received unknown message from producer: " + msg);
            }
            if (decodedMsg != null) {
                decodedMsg.readFrom(msg);
                out.add(decodedMsg);
            }
        }
    }

    @ChannelHandler.Sharable
    static class NettyMessageEncoder
    extends ChannelOutboundHandlerAdapter {
        NettyMessageEncoder() {
        }

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (msg instanceof NettyMessage) {
                ByteBuf serialized = null;
                try {
                    serialized = ((NettyMessage)msg).write(ctx.alloc());
                }
                catch (Throwable t) {
                    throw new IOException("Error while serializing message: " + msg, t);
                }
                finally {
                    if (serialized != null) {
                        ctx.write((Object)serialized, promise);
                    }
                }
            } else {
                ctx.write(msg, promise);
            }
        }

        static LengthFieldBasedFrameDecoder createFrameLengthDecoder() {
            return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, -4, 4);
        }
    }
}

