/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.serialization;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.InstantiationUtil;

public class EventSerializer {
    private static final int END_OF_PARTITION_EVENT = 0;
    private static final int CHECKPOINT_BARRIER_EVENT = 1;
    private static final int END_OF_SUPERSTEP_EVENT = 2;
    private static final int OTHER_EVENT = 3;
    private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
    private static final int END_OF_CHANNEL_STATE_EVENT = 5;
    private static final int ANNOUNCEMENT_EVENT = 6;
    private static final int VIRTUAL_CHANNEL_SELECTOR_EVENT = 7;
    private static final int END_OF_USER_RECORDS_EVENT = 8;
    private static final int END_OF_SEGMENT = 9;
    private static final byte CHECKPOINT_TYPE_CHECKPOINT = 0;
    private static final byte CHECKPOINT_TYPE_SAVEPOINT = 1;
    private static final byte CHECKPOINT_TYPE_SAVEPOINT_SUSPEND = 2;
    private static final byte CHECKPOINT_TYPE_SAVEPOINT_TERMINATE = 3;
    private static final byte CHECKPOINT_TYPE_FULL_CHECKPOINT = 4;
    private static final byte SAVEPOINT_FORMAT_CANONICAL = 0;
    private static final byte SAVEPOINT_FORMAT_NATIVE = 1;

    public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException {
        Class<?> eventClass = event.getClass();
        if (eventClass == EndOfPartitionEvent.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 0});
        }
        if (eventClass == CheckpointBarrier.class) {
            return EventSerializer.serializeCheckpointBarrier((CheckpointBarrier)event);
        }
        if (eventClass == EndOfSuperstepEvent.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 2});
        }
        if (eventClass == EndOfChannelStateEvent.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 5});
        }
        if (eventClass == EndOfData.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 8, (byte)((EndOfData)event).getStopMode().ordinal()});
        }
        if (eventClass == CancelCheckpointMarker.class) {
            CancelCheckpointMarker marker = (CancelCheckpointMarker)event;
            ByteBuffer buf = ByteBuffer.allocate(12);
            buf.putInt(0, 4);
            buf.putLong(4, marker.getCheckpointId());
            return buf;
        }
        if (eventClass == EventAnnouncement.class) {
            EventAnnouncement announcement = (EventAnnouncement)event;
            ByteBuffer serializedAnnouncedEvent = EventSerializer.toSerializedEvent(announcement.getAnnouncedEvent());
            ByteBuffer serializedAnnouncement = ByteBuffer.allocate(8 + serializedAnnouncedEvent.capacity());
            serializedAnnouncement.putInt(0, 6);
            serializedAnnouncement.putInt(4, announcement.getSequenceNumber());
            serializedAnnouncement.position(8);
            serializedAnnouncement.put(serializedAnnouncedEvent);
            serializedAnnouncement.flip();
            return serializedAnnouncement;
        }
        if (eventClass == SubtaskConnectionDescriptor.class) {
            SubtaskConnectionDescriptor selector = (SubtaskConnectionDescriptor)event;
            ByteBuffer buf = ByteBuffer.allocate(12);
            buf.putInt(7);
            buf.putInt(selector.getInputSubtaskIndex());
            buf.putInt(selector.getOutputSubtaskIndex());
            buf.flip();
            return buf;
        }
        if (eventClass == EndOfSegmentEvent.class) {
            return ByteBuffer.wrap(new byte[]{0, 0, 0, 9});
        }
        try {
            DataOutputSerializer serializer = new DataOutputSerializer(128);
            serializer.writeInt(3);
            serializer.writeUTF(event.getClass().getName());
            event.write((DataOutputView)serializer);
            return serializer.wrapAsByteBuffer();
        }
        catch (IOException e) {
            throw new IOException("Error while serializing event.", e);
        }
    }

    public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
        if (buffer.remaining() < 4) {
            throw new IOException("Incomplete event");
        }
        ByteOrder bufferOrder = buffer.order();
        buffer.order(ByteOrder.BIG_ENDIAN);
        try {
            int type = buffer.getInt();
            if (type == 0) {
                EndOfPartitionEvent endOfPartitionEvent = EndOfPartitionEvent.INSTANCE;
                return endOfPartitionEvent;
            }
            if (type == 1) {
                CheckpointBarrier checkpointBarrier = EventSerializer.deserializeCheckpointBarrier(buffer);
                return checkpointBarrier;
            }
            if (type == 2) {
                EndOfSuperstepEvent endOfSuperstepEvent = EndOfSuperstepEvent.INSTANCE;
                return endOfSuperstepEvent;
            }
            if (type == 5) {
                EndOfChannelStateEvent endOfChannelStateEvent = EndOfChannelStateEvent.INSTANCE;
                return endOfChannelStateEvent;
            }
            if (type == 8) {
                EndOfData endOfData = new EndOfData(StopMode.values()[buffer.get()]);
                return endOfData;
            }
            if (type == 4) {
                long id = buffer.getLong();
                CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(id);
                return cancelCheckpointMarker;
            }
            if (type == 6) {
                int sequenceNumber = buffer.getInt();
                AbstractEvent announcedEvent = EventSerializer.fromSerializedEvent(buffer, classLoader);
                EventAnnouncement eventAnnouncement = new EventAnnouncement(announcedEvent, sequenceNumber);
                return eventAnnouncement;
            }
            if (type == 7) {
                SubtaskConnectionDescriptor sequenceNumber = new SubtaskConnectionDescriptor(buffer.getInt(), buffer.getInt());
                return sequenceNumber;
            }
            if (type == 9) {
                EndOfSegmentEvent sequenceNumber = EndOfSegmentEvent.INSTANCE;
                return sequenceNumber;
            }
            if (type == 3) {
                try {
                    Class<AbstractEvent> clazz;
                    DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
                    String className = deserializer.readUTF();
                    try {
                        clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
                    }
                    catch (ClassNotFoundException e) {
                        throw new IOException("Could not load event class '" + className + "'.", e);
                    }
                    catch (ClassCastException e) {
                        throw new IOException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
                    }
                    AbstractEvent event = (AbstractEvent)InstantiationUtil.instantiate(clazz, AbstractEvent.class);
                    event.read((DataInputView)deserializer);
                    AbstractEvent abstractEvent = event;
                    return abstractEvent;
                }
                catch (Exception e) {
                    throw new IOException("Error while deserializing or instantiating event.", e);
                }
            }
            throw new IOException("Corrupt byte stream for event");
        }
        finally {
            buffer.order(bufferOrder);
        }
    }

    private static ByteBuffer serializeCheckpointBarrier(CheckpointBarrier barrier) throws IOException {
        CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
        byte[] locationBytes = checkpointOptions.getTargetLocation().isDefaultReference() ? null : checkpointOptions.getTargetLocation().getReferenceBytes();
        ByteBuffer buf = ByteBuffer.allocate(38 + (locationBytes == null ? 0 : locationBytes.length));
        buf.putInt(1);
        buf.putLong(barrier.getId());
        buf.putLong(barrier.getTimestamp());
        SnapshotType snapshotType = checkpointOptions.getCheckpointType();
        if (snapshotType.isSavepoint()) {
            EventSerializer.encodeSavepointType(snapshotType, buf);
        } else if (snapshotType.equals(CheckpointType.CHECKPOINT)) {
            buf.put((byte)0);
        } else if (snapshotType.equals(CheckpointType.FULL_CHECKPOINT)) {
            buf.put((byte)4);
        } else {
            throw new IOException("Unknown checkpoint type: " + snapshotType);
        }
        if (locationBytes == null) {
            buf.putInt(-1);
        } else {
            buf.putInt(locationBytes.length);
            buf.put(locationBytes);
        }
        buf.put((byte)checkpointOptions.getAlignment().ordinal());
        buf.putLong(checkpointOptions.getAlignedCheckpointTimeout());
        buf.flip();
        return buf;
    }

    private static void encodeSavepointType(SnapshotType snapshotType, ByteBuffer buf) throws IOException {
        SavepointType savepointType = (SavepointType)snapshotType;
        switch (savepointType.getPostCheckpointAction()) {
            case NONE: {
                buf.put((byte)1);
                break;
            }
            case SUSPEND: {
                buf.put((byte)2);
                break;
            }
            case TERMINATE: {
                buf.put((byte)3);
                break;
            }
            default: {
                throw new IOException("Unknown savepoint type: " + snapshotType);
            }
        }
        switch (savepointType.getFormatType()) {
            case CANONICAL: {
                buf.put((byte)0);
                break;
            }
            case NATIVE: {
                buf.put((byte)1);
                break;
            }
            default: {
                throw new IOException("Unknown savepoint format type: " + snapshotType);
            }
        }
    }

    private static CheckpointBarrier deserializeCheckpointBarrier(ByteBuffer buffer) throws IOException {
        CheckpointStorageLocationReference locationRef;
        SnapshotType snapshotType;
        long id = buffer.getLong();
        long timestamp = buffer.getLong();
        byte checkpointTypeCode = buffer.get();
        if (checkpointTypeCode == 0) {
            snapshotType = CheckpointType.CHECKPOINT;
        } else if (checkpointTypeCode == 4) {
            snapshotType = CheckpointType.FULL_CHECKPOINT;
        } else if (checkpointTypeCode == 1 || checkpointTypeCode == 2 || checkpointTypeCode == 3) {
            snapshotType = EventSerializer.decodeSavepointType(checkpointTypeCode, buffer);
        } else {
            throw new IOException("Unknown checkpoint type code: " + checkpointTypeCode);
        }
        int locationRefLen = buffer.getInt();
        if (locationRefLen == -1) {
            locationRef = CheckpointStorageLocationReference.getDefault();
        } else {
            byte[] bytes = new byte[locationRefLen];
            buffer.get(bytes);
            locationRef = new CheckpointStorageLocationReference(bytes);
        }
        CheckpointOptions.AlignmentType alignmentType = CheckpointOptions.AlignmentType.values()[buffer.get()];
        long alignedCheckpointTimeout = buffer.getLong();
        return new CheckpointBarrier(id, timestamp, new CheckpointOptions(snapshotType, locationRef, alignmentType, alignedCheckpointTimeout));
    }

    private static SavepointType decodeSavepointType(byte checkpointTypeCode, ByteBuffer buffer) throws IOException {
        SavepointFormatType formatType;
        byte formatTypeCode = buffer.get();
        if (formatTypeCode == 0) {
            formatType = SavepointFormatType.CANONICAL;
        } else if (formatTypeCode == 1) {
            formatType = SavepointFormatType.NATIVE;
        } else {
            throw new IOException("Unknown savepoint format type code: " + formatTypeCode);
        }
        if (checkpointTypeCode == 1) {
            return SavepointType.savepoint(formatType);
        }
        if (checkpointTypeCode == 2) {
            return SavepointType.suspend(formatType);
        }
        if (checkpointTypeCode == 3) {
            return SavepointType.terminate(formatType);
        }
        throw new IOException("Unknown savepoint type code: " + checkpointTypeCode);
    }

    public static Buffer toBuffer(AbstractEvent event, boolean hasPriority) throws IOException {
        ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
        MemorySegment data = MemorySegmentFactory.wrap((byte[])serializedEvent.array());
        NetworkBuffer buffer = new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, Buffer.DataType.getDataType(event, hasPriority));
        buffer.setSize(serializedEvent.remaining());
        return buffer;
    }

    public static BufferConsumer toBufferConsumer(AbstractEvent event, boolean hasPriority) throws IOException {
        ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
        MemorySegment data = MemorySegmentFactory.wrap((byte[])serializedEvent.array());
        return new BufferConsumer(new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, Buffer.DataType.getDataType(event, hasPriority)), data.size());
    }

    public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
        return EventSerializer.fromSerializedEvent(buffer.getNioBufferReadable(), classLoader);
    }
}

