/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.commandrouter.impl.kafka;

import io.vertx.core.Vertx;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Collection;
import java.util.Objects;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommand;
import org.eclipse.hono.client.command.kafka.KafkaBasedCommandContext;
import org.eclipse.hono.commandrouter.impl.AbstractCommandProcessingQueue;

public class KafkaCommandProcessingQueue
extends AbstractCommandProcessingQueue<KafkaBasedCommandContext, TopicPartition> {
    public KafkaCommandProcessingQueue(Vertx vertx) {
        super(vertx);
    }

    @Override
    protected TopicPartition getQueueKey(KafkaBasedCommandContext commandContext) {
        KafkaConsumerRecord record = ((KafkaBasedCommand)commandContext.getCommand()).getRecord();
        return new TopicPartition(record.topic(), record.partition());
    }

    @Override
    protected String getCommandSourceForLog(TopicPartition queueKey) {
        return "partition [" + queueKey + "]";
    }

    public void setCurrentlyHandledPartitions(Collection<TopicPartition> partitions) {
        Objects.requireNonNull(partitions);
        this.removeCommandQueueEntries(queueKey -> !partitions.contains(queueKey));
    }

    public void setRevokedPartitions(Collection<TopicPartition> partitions) {
        Objects.requireNonNull(partitions);
        this.removeCommandQueueEntries(queueKey -> partitions.contains(queueKey));
    }
}

