package org.apache.storm.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.TaskMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/utils/TransferDrainer.class */
public class TransferDrainer {
    private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
    private final Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap();

    public void add(TaskMessage taskMessage) {
        int task = taskMessage.task();
        ArrayList<TaskMessage> arrayList = this.bundles.get(Integer.valueOf(task));
        if (arrayList == null) {
            arrayList = new ArrayList<>();
            this.bundles.put(Integer.valueOf(task), arrayList);
        }
        arrayList.add(taskMessage);
    }

    public void send(Map<Integer, NodeInfo> map, Map<NodeInfo, IConnection> map2) {
        for (Map.Entry<NodeInfo, Stream<TaskMessage>> entry : groupBundleByDestination(map).entrySet()) {
            NodeInfo key = entry.getKey();
            IConnection iConnection = map2.get(key);
            if (iConnection != null) {
                Iterator<TaskMessage> it = entry.getValue().iterator();
                if (it.hasNext()) {
                    iConnection.send(it);
                }
            } else {
                LOG.warn("Connection not available for hostPort {}", key);
            }
        }
    }

    private HashMap<NodeInfo, Stream<TaskMessage>> groupBundleByDestination(Map<Integer, NodeInfo> map) {
        HashMap<NodeInfo, Stream<TaskMessage>> hashMap = new HashMap<>();
        for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : this.bundles.entrySet()) {
            if (!entry.getValue().isEmpty()) {
                NodeInfo nodeInfo = map.get(entry.getKey());
                if (nodeInfo != null) {
                    hashMap.merge(nodeInfo, entry.getValue().stream(), Stream::concat);
                } else {
                    LOG.warn("No remote destination available for task {}", entry.getKey());
                }
            }
        }
        return hashMap;
    }

    public void clear() {
        Iterator<ArrayList<TaskMessage>> it = this.bundles.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
    }
}
