/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.LogOffsetCommittable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;

public class CommittableSerializer
implements SimpleVersionedSerializer<Committable> {
    private final CommitMessageSerializer commitMessageSerializer;

    public CommittableSerializer(CommitMessageSerializer commitMessageSerializer) {
        this.commitMessageSerializer = commitMessageSerializer;
    }

    public int getVersion() {
        return 2;
    }

    public byte[] serialize(Committable committable) throws IOException {
        byte[] wrapped;
        int version;
        switch (committable.kind()) {
            case FILE: {
                version = this.commitMessageSerializer.getVersion();
                wrapped = this.commitMessageSerializer.serialize((CommitMessage)committable.wrappedCommittable());
                break;
            }
            case LOG_OFFSET: {
                version = 1;
                wrapped = ((LogOffsetCommittable)committable.wrappedCommittable()).toBytes();
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unsupported kind: " + committable.kind());
            }
        }
        return ByteBuffer.allocate(9 + wrapped.length + 4).putLong(committable.checkpointId()).put(committable.kind().toByteValue()).put(wrapped).putInt(version).array();
    }

    public Committable deserialize(int committableVersion, byte[] bytes) throws IOException {
        Object wrappedCommittable;
        if (committableVersion != this.getVersion()) {
            throw new RuntimeException("Can not deserialize version: " + committableVersion);
        }
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        long checkpointId = buffer.getLong();
        Committable.Kind kind = Committable.Kind.fromByteValue(buffer.get());
        byte[] wrapped = new byte[bytes.length - 13];
        buffer.get(wrapped);
        int version = buffer.getInt();
        switch (kind) {
            case FILE: {
                wrappedCommittable = this.commitMessageSerializer.deserialize(version, wrapped);
                break;
            }
            case LOG_OFFSET: {
                wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unsupported kind: " + kind);
            }
        }
        return new Committable(checkpointId, kind, wrappedCommittable);
    }
}

