/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

public class StreamPartitionAssignor
implements PartitionAssignor,
Configurable {
    private static final int UNKNOWN = -1;
    public static final int NOT_AVAILABLE = -2;
    private Logger log;
    private String logPrefix;
    private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>(){

        @Override
        public int compare(TopicPartition p1, TopicPartition p2) {
            int result = p1.topic().compareTo(p2.topic());
            if (result != 0) {
                return result;
            }
            return Integer.compare(p1.partition(), p2.partition());
        }
    };
    private String userEndPoint;
    private int numStandbyReplicas;
    private TaskManager taskManager;
    private PartitionGrouper partitionGrouper;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsValidator copartitionedTopicsValidator;

    public void configure(Map<String, ?> configs) {
        StreamsConfig streamsConfig = new StreamsConfig(configs);
        this.logPrefix = String.format("stream-thread [%s] ", streamsConfig.getString("client.id"));
        LogContext logContext = new LogContext(this.logPrefix);
        this.log = logContext.logger(this.getClass());
        Object o = configs.get("__task.manager.instance__");
        if (o == null) {
            KafkaException ex = new KafkaException("TaskManager is not specified");
            this.log.error(ex.getMessage(), (Throwable)ex);
            throw ex;
        }
        if (!(o instanceof TaskManager)) {
            KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
            this.log.error(ex.getMessage(), (Throwable)ex);
            throw ex;
        }
        this.taskManager = (TaskManager)o;
        this.numStandbyReplicas = streamsConfig.getInt("num.standby.replicas");
        this.partitionGrouper = (PartitionGrouper)streamsConfig.getConfiguredInstance("partition.grouper", PartitionGrouper.class);
        String userEndPoint = streamsConfig.getString("application.server");
        if (userEndPoint != null && !userEndPoint.isEmpty()) {
            try {
                String host = Utils.getHost((String)userEndPoint);
                Integer port = Utils.getPort((String)userEndPoint);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair but received %s", this.logPrefix, "application.server", userEndPoint));
                }
            }
            catch (NumberFormatException nfe) {
                throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s", this.logPrefix, userEndPoint, "application.server"));
            }
            this.userEndPoint = userEndPoint;
        }
        this.internalTopicManager = new InternalTopicManager(this.taskManager.adminClient, streamsConfig);
        this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(this.logPrefix);
    }

    public String name() {
        return "stream";
    }

    public PartitionAssignor.Subscription subscription(Set<String> topics) {
        Set<TaskId> previousActiveTasks = this.taskManager.prevActiveTaskIds();
        Set<TaskId> standbyTasks = this.taskManager.cachedTasksIds();
        standbyTasks.removeAll(previousActiveTasks);
        SubscriptionInfo data = new SubscriptionInfo(this.taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
        this.taskManager.updateSubscriptionsFromMetadata(topics);
        return new PartitionAssignor.Subscription(new ArrayList<String>(topics), data.encode());
    }

    /*
     * WARNING - void declaration
     */
    public Map<String, PartitionAssignor.Assignment> assign(Cluster metadata, Map<String, PartitionAssignor.Subscription> subscriptions) {
        ClientState state;
        boolean numPartitionsNeeded;
        HashMap<UUID, ClientMetadata> clientsMetadata = new HashMap<UUID, ClientMetadata>();
        for (Map.Entry<String, PartitionAssignor.Subscription> entry : subscriptions.entrySet()) {
            void var9_10;
            String consumerId = entry.getKey();
            PartitionAssignor.Subscription subscription = entry.getValue();
            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
            ClientMetadata clientMetadata = (ClientMetadata)clientsMetadata.get(info.processId);
            if (clientMetadata == null) {
                ClientMetadata clientMetadata2 = new ClientMetadata(info.userEndPoint);
                clientsMetadata.put(info.processId, clientMetadata2);
            }
            var9_10.addConsumer(consumerId, info);
        }
        this.log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.taskManager.builder().topicGroups();
        HashMap<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<String, InternalTopicMetadata>();
        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
            for (InternalTopicConfig internalTopicConfig : topicsInfo.repartitionSourceTopics.values()) {
                repartitionTopicMetadata.put(internalTopicConfig.name(), new InternalTopicMetadata(internalTopicConfig));
            }
        }
        do {
            numPartitionsNeeded = false;
            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
                for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
                    int numPartitions = ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions;
                    if (numPartitions != -1) continue;
                    for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                        Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
                        if (!otherSinkTopics.contains(topicName)) continue;
                        for (String string : otherTopicsInfo.sourceTopics) {
                            Integer numPartitionsCandidate;
                            if (repartitionTopicMetadata.containsKey(string)) {
                                numPartitionsCandidate = ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)string)).numPartitions;
                            } else {
                                numPartitionsCandidate = metadata.partitionCountForTopic(string);
                                if (numPartitionsCandidate == null) {
                                    ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions = -2;
                                }
                            }
                            if (numPartitionsCandidate == null || numPartitionsCandidate <= numPartitions) continue;
                            numPartitions = numPartitionsCandidate;
                        }
                    }
                    if (numPartitions == -1) {
                        numPartitionsNeeded = true;
                        continue;
                    }
                    ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions = numPartitions;
                }
            }
        } while (numPartitionsNeeded);
        this.ensureCopartitioning(this.taskManager.builder().copartitionGroups(), repartitionTopicMetadata, metadata);
        this.prepareTopic(repartitionTopicMetadata);
        HashMap<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<TopicPartition, PartitionInfo>();
        for (Map.Entry entry : repartitionTopicMetadata.entrySet()) {
            String topic = (String)entry.getKey();
            Integer numPartitions = ((InternalTopicMetadata)entry.getValue()).numPartitions;
            for (int partition = 0; partition < numPartitions; ++partition) {
                allRepartitionTopicPartitions.put(new TopicPartition(topic, partition), new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
            }
        }
        Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
        this.taskManager.setClusterMetadata(fullMetadata);
        this.log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
        HashSet<String> hashSet = new HashSet<String>();
        HashMap<Integer, Set<String>> sourceTopicsByGroup = new HashMap<Integer, Set<String>>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            hashSet.addAll(entry.getValue().sourceTopics);
            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
        }
        Map<TaskId, Set<TopicPartition>> partitionsForTask = this.partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
        HashSet<TopicPartition> allAssignedPartitions = new HashSet<TopicPartition>();
        HashMap<Integer, HashSet<TaskId>> tasksByTopicGroup = new HashMap<Integer, HashSet<TaskId>>();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
            Set<TopicPartition> set = entry.getValue();
            for (TopicPartition partition : set) {
                if (!allAssignedPartitions.contains(partition)) continue;
                this.log.warn("Partition {} is assigned to more than one tasks: {}", (Object)partition, partitionsForTask);
            }
            allAssignedPartitions.addAll(set);
            TaskId id = entry.getKey();
            HashSet<TaskId> ids = (HashSet<TaskId>)tasksByTopicGroup.get(id.topicGroupId);
            if (ids == null) {
                ids = new HashSet<TaskId>();
                tasksByTopicGroup.put(id.topicGroupId, ids);
            }
            ids.add(id);
        }
        for (String topic : hashSet) {
            List list = fullMetadata.partitionsForTopic(topic);
            if (!list.isEmpty()) {
                for (PartitionInfo partitionInfo : list) {
                    TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    if (allAssignedPartitions.contains(partition)) continue;
                    this.log.warn("Partition {} is not assigned to any tasks: {}", (Object)partition, partitionsForTask);
                }
                continue;
            }
            this.log.warn("No partitions found for topic {}", (Object)topic);
        }
        HashMap<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<String, InternalTopicMetadata>();
        for (Map.Entry entry : topicGroups.entrySet()) {
            int topicGroupId = (Integer)entry.getKey();
            Map<String, InternalTopicConfig> stateChangelogTopics = ((InternalTopologyBuilder.TopicsInfo)entry.getValue()).stateChangelogTopics;
            for (InternalTopicConfig internalTopicConfig : stateChangelogTopics.values()) {
                int numPartitions = -1;
                if (tasksByTopicGroup.get(topicGroupId) != null) {
                    for (TaskId task : (Set)tasksByTopicGroup.get(topicGroupId)) {
                        if (numPartitions >= task.partition + 1) continue;
                        numPartitions = task.partition + 1;
                    }
                    InternalTopicMetadata topicMetadata = new InternalTopicMetadata(internalTopicConfig);
                    topicMetadata.numPartitions = numPartitions;
                    changelogTopicMetadata.put(internalTopicConfig.name(), topicMetadata);
                    continue;
                }
                this.log.debug("No tasks found for topic group {}", (Object)topicGroupId);
            }
        }
        this.prepareTopic(changelogTopicMetadata);
        this.log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
        HashMap states = new HashMap();
        for (Map.Entry entry : clientsMetadata.entrySet()) {
            states.put(entry.getKey(), ((ClientMetadata)entry.getValue()).state);
        }
        this.log.debug("Assigning tasks {} to clients {} with number of replicas {}", new Object[]{partitionsForTask.keySet(), states, this.numStandbyReplicas});
        StickyTaskAssignor stickyTaskAssignor = new StickyTaskAssignor(states, partitionsForTask.keySet());
        stickyTaskAssignor.assign(this.numStandbyReplicas);
        this.log.info("Assigned tasks to clients as {}.", states);
        HashMap<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<HostInfo, Set<TopicPartition>>();
        for (Map.Entry entry : clientsMetadata.entrySet()) {
            HostInfo hostInfo = ((ClientMetadata)entry.getValue()).hostInfo;
            if (hostInfo == null) continue;
            HashSet topicPartitions = new HashSet();
            state = ((ClientMetadata)entry.getValue()).state;
            for (TaskId id : state.activeTasks()) {
                topicPartitions.addAll(partitionsForTask.get(id));
            }
            partitionsByHostState.put(hostInfo, topicPartitions);
        }
        this.taskManager.setPartitionsByHostState(partitionsByHostState);
        HashMap<String, PartitionAssignor.Assignment> assignment = new HashMap<String, PartitionAssignor.Assignment>();
        for (Map.Entry entry : clientsMetadata.entrySet()) {
            Set<String> consumers = ((ClientMetadata)entry.getValue()).consumers;
            state = ((ClientMetadata)entry.getValue()).state;
            List<List<TaskId>> interleavedActive = this.interleaveTasksByGroupId(state.activeTasks(), consumers.size());
            List<List<TaskId>> interleavedStandby = this.interleaveTasksByGroupId(state.standbyTasks(), consumers.size());
            int consumerTaskIndex = 0;
            for (String consumer : consumers) {
                HashMap<TaskId, Set<TopicPartition>> standby = new HashMap<TaskId, Set<TopicPartition>>();
                ArrayList<AssignedPartition> assignedPartitions = new ArrayList<AssignedPartition>();
                List<TaskId> assignedActiveList = interleavedActive.get(consumerTaskIndex);
                for (TaskId taskId : assignedActiveList) {
                    for (TopicPartition topicPartition : partitionsForTask.get(taskId)) {
                        assignedPartitions.add(new AssignedPartition(taskId, topicPartition));
                    }
                }
                if (!state.standbyTasks().isEmpty()) {
                    List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
                    for (TaskId taskId : assignedStandbyList) {
                        void var34_73;
                        Set set = (Set)standby.get(taskId);
                        if (set == null) {
                            HashSet hashSet2 = new HashSet();
                            standby.put(taskId, hashSet2);
                        }
                        var34_73.addAll((Collection)partitionsForTask.get(taskId));
                    }
                }
                ++consumerTaskIndex;
                Collections.sort(assignedPartitions);
                ArrayList<TaskId> active = new ArrayList<TaskId>();
                ArrayList<TopicPartition> activePartitions = new ArrayList<TopicPartition>();
                for (AssignedPartition assignedPartition : assignedPartitions) {
                    active.add(assignedPartition.taskId);
                    activePartitions.add(assignedPartition.partition);
                }
                assignment.put(consumer, new PartitionAssignor.Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
            }
        }
        return assignment;
    }

    List<List<TaskId>> interleaveTasksByGroupId(Collection<TaskId> taskIds, int numberThreads) {
        LinkedList<TaskId> sortedTasks = new LinkedList<TaskId>(taskIds);
        Collections.sort(sortedTasks);
        ArrayList<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<List<TaskId>>(numberThreads);
        for (int i = 0; i < numberThreads; ++i) {
            taskIdsForConsumerAssignment.add(new ArrayList());
        }
        block1: while (!sortedTasks.isEmpty()) {
            for (List list : taskIdsForConsumerAssignment) {
                TaskId taskId = sortedTasks.poll();
                if (taskId == null) continue block1;
                list.add(taskId);
            }
        }
        return taskIdsForConsumerAssignment;
    }

    public void onAssignment(PartitionAssignor.Assignment assignment) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(assignment.partitions());
        Collections.sort(partitions, PARTITION_COMPARATOR);
        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        if (partitions.size() != info.activeTasks.size()) {
            throw new TaskAssignmentException(String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", this.logPrefix, partitions.size(), info.activeTasks.size(), info.toString()));
        }
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition partition = (TopicPartition)partitions.get(i);
            TaskId id = info.activeTasks.get(i);
            HashSet<TopicPartition> assignedPartitions = (HashSet<TopicPartition>)activeTasks.get(id);
            if (assignedPartitions == null) {
                assignedPartitions = new HashSet<TopicPartition>();
                activeTasks.put(id, assignedPartitions);
            }
            assignedPartitions.add(partition);
        }
        HashMap<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<TopicPartition, PartitionInfo>();
        for (Set<TopicPartition> value : info.partitionsByHost.values()) {
            for (TopicPartition topicPartition : value) {
                topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
            }
        }
        this.taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
        this.taskManager.setPartitionsByHostState(info.partitionsByHost);
        this.taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks);
        this.taskManager.updateSubscriptionsFromAssignment(partitions);
    }

    private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
        this.log.debug("Starting to validate internal topics in partition assignor.");
        HashMap<String, InternalTopicConfig> topicsToMakeReady = new HashMap<String, InternalTopicConfig>();
        for (InternalTopicMetadata metadata : topicPartitions.values()) {
            InternalTopicConfig topic = metadata.config;
            Integer numPartitions = metadata.numPartitions;
            if (numPartitions == -2) continue;
            if (numPartitions < 0) {
                throw new TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", this.logPrefix, topic.name()));
            }
            topic.setNumberOfPartitions(numPartitions);
            topicsToMakeReady.put(topic.name(), topic);
        }
        if (!topicsToMakeReady.isEmpty()) {
            this.internalTopicManager.makeReady(topicsToMakeReady);
        }
        this.log.debug("Completed validating internal topics in partition assignor.");
    }

    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, Cluster metadata) {
        for (Set<String> copartitionGroup : copartitionGroups) {
            this.copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
        }
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    static class CopartitionedTopicsValidator {
        private final String logPrefix;

        CopartitionedTopicsValidator(String logPrefix) {
            this.logPrefix = logPrefix;
        }

        void validate(Set<String> copartitionGroup, Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, Cluster metadata) {
            int numPartitions = -1;
            for (String string : copartitionGroup) {
                if (!allRepartitionTopicsNumPartitions.containsKey(string)) {
                    Integer partitions = metadata.partitionCountForTopic(string);
                    if (partitions == null) {
                        throw new TopologyBuilderException(String.format("%sTopic not found: %s", this.logPrefix, string));
                    }
                    if (numPartitions == -1) {
                        numPartitions = partitions;
                        continue;
                    }
                    if (numPartitions == partitions) continue;
                    Object[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                    Arrays.sort(topics);
                    throw new TopologyBuilderException(String.format("%sTopics not co-partitioned: [%s]", this.logPrefix, Utils.join(Arrays.asList(topics), (String)",")));
                }
                if (allRepartitionTopicsNumPartitions.get((Object)string).numPartitions != -2) continue;
                numPartitions = -2;
                break;
            }
            if (numPartitions == -1) {
                for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) {
                    int partitions;
                    if (!copartitionGroup.contains(entry.getKey()) || (partitions = ((InternalTopicMetadata)entry.getValue()).numPartitions) <= numPartitions) continue;
                    numPartitions = partitions;
                }
            }
            for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) {
                if (!copartitionGroup.contains(entry.getKey())) continue;
                ((InternalTopicMetadata)entry.getValue()).numPartitions = numPartitions;
            }
        }
    }

    public static class SubscriptionUpdates {
        private final Set<String> updatedTopicSubscriptions = new HashSet<String>();

        public void updateTopics(Collection<String> topicNames) {
            this.updatedTopicSubscriptions.clear();
            this.updatedTopicSubscriptions.addAll(topicNames);
        }

        public Collection<String> getUpdates() {
            return Collections.unmodifiableSet(this.updatedTopicSubscriptions);
        }

        public boolean hasUpdates() {
            return !this.updatedTopicSubscriptions.isEmpty();
        }

        public String toString() {
            return "SubscriptionUpdates{updatedTopicSubscriptions=" + this.updatedTopicSubscriptions + '}';
        }
    }

    static class InternalTopicMetadata {
        public final InternalTopicConfig config;
        public int numPartitions;

        InternalTopicMetadata(InternalTopicConfig config) {
            this.config = config;
            this.numPartitions = -1;
        }

        public String toString() {
            return "InternalTopicMetadata(config=" + this.config + ", numPartitions=" + this.numPartitions + ")";
        }
    }

    private static class ClientMetadata {
        final HostInfo hostInfo;
        final Set<String> consumers;
        final ClientState state;

        ClientMetadata(String endPoint) {
            if (endPoint != null) {
                String host = Utils.getHost((String)endPoint);
                Integer port = Utils.getPort((String)endPoint);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
                }
                this.hostInfo = new HostInfo(host, port);
            } else {
                this.hostInfo = null;
            }
            this.consumers = new HashSet<String>();
            this.state = new ClientState();
        }

        void addConsumer(String consumerMemberId, SubscriptionInfo info) {
            this.consumers.add(consumerMemberId);
            this.state.addPreviousActiveTasks(info.prevTasks);
            this.state.addPreviousStandbyTasks(info.standbyTasks);
            this.state.incrementCapacity();
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    private static class AssignedPartition
    implements Comparable<AssignedPartition> {
        public final TaskId taskId;
        public final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition partition) {
            this.taskId = taskId;
            this.partition = partition;
        }

        @Override
        public int compareTo(AssignedPartition that) {
            return PARTITION_COMPARATOR.compare(this.partition, that.partition);
        }

        public boolean equals(Object o) {
            if (!(o instanceof AssignedPartition)) {
                return false;
            }
            AssignedPartition other = (AssignedPartition)o;
            return this.compareTo(other) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }
}

