/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;

public class IncrementalCooperativeConnectProtocol {
    public static final String ALLOCATION_KEY_NAME = "allocation";
    public static final String REVOKED_KEY_NAME = "revoked";
    public static final String SCHEDULED_DELAY_KEY_NAME = "delay";
    public static final short CONNECT_PROTOCOL_V1 = 1;
    public static final boolean TOLERATE_MISSING_FIELDS_WITH_DEFAULTS = true;
    private static final Struct CONNECT_PROTOCOL_HEADER_V1 = new Struct(ConnectProtocol.CONNECT_PROTOCOL_HEADER_SCHEMA).set("version", (Object)1);
    public static final Schema CONFIG_STATE_V1 = ConnectProtocol.CONFIG_STATE_V0;
    public static final Schema ALLOCATION_V1 = new Schema(true, new Field[]{new Field("allocation", (Type)Type.NULLABLE_BYTES, null, true, null)});
    public static final Schema CONNECTOR_ASSIGNMENT_V1 = ConnectProtocol.CONNECTOR_ASSIGNMENT_V0;
    public static final Schema ASSIGNMENT_V1 = new Schema(true, new Field[]{new Field("error", (Type)Type.INT16), new Field("leader", (Type)Type.STRING), new Field("leader-url", (Type)Type.STRING), new Field("config-offset", (Type)Type.INT64), new Field("assignment", (Type)ArrayOf.nullable((Type)CONNECTOR_ASSIGNMENT_V1), null, true, null), new Field("revoked", (Type)ArrayOf.nullable((Type)CONNECTOR_ASSIGNMENT_V1), null, true, null), new Field("delay", (Type)Type.INT32, null, (Object)0)});

    public static ByteBuffer serializeMetadata(ExtendedWorkerState workerState) {
        Struct configState = new Struct(CONFIG_STATE_V1).set("url", (Object)workerState.url()).set("config-offset", (Object)workerState.offset());
        Struct allocation = new Struct(ALLOCATION_V1).set(ALLOCATION_KEY_NAME, (Object)IncrementalCooperativeConnectProtocol.serializeAssignment(workerState.assignment()));
        ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf() + CONFIG_STATE_V1.sizeOf((Object)configState) + ALLOCATION_V1.sizeOf((Object)allocation));
        CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
        CONFIG_STATE_V1.write(buffer, (Object)configState);
        ALLOCATION_V1.write(buffer, (Object)allocation);
        buffer.flip();
        return buffer;
    }

    public static JoinGroupRequestData.JoinGroupRequestProtocolCollection metadataRequest(ExtendedWorkerState workerState) {
        return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Arrays.asList(new JoinGroupRequestData.JoinGroupRequestProtocol().setName(ConnectProtocolCompatibility.COMPATIBLE.protocol()).setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(workerState).array()), new JoinGroupRequestData.JoinGroupRequestProtocol().setName(ConnectProtocolCompatibility.EAGER.protocol()).setMetadata(ConnectProtocol.serializeMetadata(workerState).array())).iterator());
    }

    public static ExtendedWorkerState deserializeMetadata(ByteBuffer buffer) {
        Struct header = ConnectProtocol.CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
        Short version = header.getShort("version");
        IncrementalCooperativeConnectProtocol.checkVersionCompatibility(version);
        Struct configState = CONFIG_STATE_V1.read(buffer);
        long configOffset = configState.getLong("config-offset");
        String url = configState.getString("url");
        Struct allocation = ALLOCATION_V1.read(buffer);
        ExtendedAssignment assignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(allocation.getBytes(ALLOCATION_KEY_NAME));
        return new ExtendedWorkerState(url, configOffset, assignment);
    }

    public static ByteBuffer serializeAssignment(ExtendedAssignment assignment) {
        if (assignment == null || ExtendedAssignment.empty().equals(assignment)) {
            return null;
        }
        Struct struct = assignment.toStruct();
        ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf() + ASSIGNMENT_V1.sizeOf((Object)struct));
        CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
        ASSIGNMENT_V1.write(buffer, (Object)struct);
        buffer.flip();
        return buffer;
    }

    public static ExtendedAssignment deserializeAssignment(ByteBuffer buffer) {
        if (buffer == null) {
            return null;
        }
        Struct header = ConnectProtocol.CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
        Short version = header.getShort("version");
        IncrementalCooperativeConnectProtocol.checkVersionCompatibility(version);
        Struct struct = ASSIGNMENT_V1.read(buffer);
        return ExtendedAssignment.fromStruct(version, struct);
    }

    private static void checkVersionCompatibility(short version) {
        if (version < 0) {
            throw new SchemaException("Unsupported subscription version: " + version);
        }
    }
}

