package org.apache.doris.common.util;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/common/util/KafkaUtil.class */
public class KafkaUtil {
    private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
    private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;

    public static List<Integer> getAllKafkaPartitions(String str, String str2, Map<String, String> map) throws UserException {
        Backend backend = null;
        try {
            List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
            if (allBackendIds.isEmpty()) {
                throw new LoadException("Failed to get all partitions. No alive backends");
            }
            Collections.shuffle(allBackendIds);
            Backend backend2 = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0).longValue());
            InternalService.PProxyResult pProxyResult = BackendServiceProxy.getInstance().getInfo(new TNetworkAddress(backend2.getHost(), backend2.getBrpcPort()), InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(InternalService.PKafkaMetaProxyRequest.newBuilder().setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder().setBrokers(str).setTopic(str2).addAllProperties((Iterable) map.entrySet().stream().map(entry -> {
                return InternalService.PStringPair.newBuilder().setKey((String) entry.getKey()).setVal((String) entry.getValue()).build();
            }).collect(Collectors.toList())))).build()).get(60L, TimeUnit.SECONDS);
            if (TStatusCode.findByValue(pProxyResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                throw new UserException("failed to get kafka partition info: " + pProxyResult.getStatus().mo9445getErrorMsgsList());
            }
            return pProxyResult.getKafkaMetaResult().getPartitionIdsList();
        } catch (Exception e) {
            LOG.warn("failed to get partitions from backend[{}].", Long.valueOf(backend.getId()), e);
            throw new LoadException("Failed to get all partitions of kafka topic: " + str2 + " from backend[" + backend.getId() + "]. error: " + e.getMessage());
        }
    }

    public static List<Pair<Integer, Long>> getOffsetsForTimes(String str, String str2, Map<String, String> map, List<Pair<Integer, Long>> list) throws LoadException {
        LOG.debug("begin to get offsets for times of topic: {}, {}", str2, list);
        try {
            List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
            if (allBackendIds.isEmpty()) {
                throw new LoadException("Failed to get offset for times. No alive backends");
            }
            Collections.shuffle(allBackendIds);
            Backend backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0).longValue());
            TNetworkAddress tNetworkAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
            InternalService.PKafkaMetaProxyRequest.Builder kafkaInfo = InternalService.PKafkaMetaProxyRequest.newBuilder().setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder().setBrokers(str).setTopic(str2).addAllProperties((Iterable) map.entrySet().stream().map(entry -> {
                return InternalService.PStringPair.newBuilder().setKey((String) entry.getKey()).setVal((String) entry.getValue()).build();
            }).collect(Collectors.toList())));
            for (Pair<Integer, Long> pair : list) {
                kafkaInfo.addOffsetTimes(InternalService.PIntegerPair.newBuilder().setKey(((Integer) pair.first).intValue()).setVal(((Long) pair.second).longValue()).build());
            }
            InternalService.PProxyResult pProxyResult = BackendServiceProxy.getInstance().getInfo(tNetworkAddress, InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(kafkaInfo).build()).get(5L, TimeUnit.SECONDS);
            if (TStatusCode.findByValue(pProxyResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                throw new UserException("failed to get offsets for times: " + pProxyResult.getStatus().mo9445getErrorMsgsList());
            }
            List<InternalService.PIntegerPair> offsetTimesList = pProxyResult.getPartitionOffsets().getOffsetTimesList();
            ArrayList newArrayList = Lists.newArrayList();
            for (InternalService.PIntegerPair pIntegerPair : offsetTimesList) {
                newArrayList.add(Pair.of(Integer.valueOf(pIntegerPair.getKey()), Long.valueOf(pIntegerPair.getVal())));
            }
            LOG.debug("finish to get offsets for times of topic: {}, {}", str2, newArrayList);
            return newArrayList;
        } catch (Exception e) {
            LOG.warn("failed to get offsets for times.", e);
            throw new LoadException("Failed to get offsets for times of kafka topic: " + str2 + ". error: " + e.getMessage());
        }
    }

    public static List<Pair<Integer, Long>> getLatestOffsets(long j, UUID uuid, String str, String str2, Map<String, String> map, List<Integer> list) throws LoadException {
        LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", list, str2, uuid, Long.valueOf(j));
        try {
            List<Long> allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
            if (allBackendIds.isEmpty()) {
                throw new LoadException("Failed to get latest offsets. No alive backends");
            }
            Collections.shuffle(allBackendIds);
            Backend backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0).longValue());
            TNetworkAddress tNetworkAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
            InternalService.PKafkaMetaProxyRequest.Builder kafkaInfo = InternalService.PKafkaMetaProxyRequest.newBuilder().setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder().setBrokers(str).setTopic(str2).addAllProperties((Iterable) map.entrySet().stream().map(entry -> {
                return InternalService.PStringPair.newBuilder().setKey((String) entry.getKey()).setVal((String) entry.getValue()).build();
            }).collect(Collectors.toList())));
            Iterator<Integer> it = list.iterator();
            while (it.hasNext()) {
                kafkaInfo.addPartitionIdForLatestOffsets(it.next().intValue());
            }
            InternalService.PProxyResult pProxyResult = BackendServiceProxy.getInstance().getInfo(tNetworkAddress, InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(kafkaInfo).build()).get(5L, TimeUnit.SECONDS);
            if (TStatusCode.findByValue(pProxyResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                throw new UserException("failed to get latest offsets: " + pProxyResult.getStatus().mo9445getErrorMsgsList());
            }
            List<InternalService.PIntegerPair> offsetTimesList = pProxyResult.getPartitionOffsets().getOffsetTimesList();
            ArrayList newArrayList = Lists.newArrayList();
            for (InternalService.PIntegerPair pIntegerPair : offsetTimesList) {
                newArrayList.add(Pair.of(Integer.valueOf(pIntegerPair.getKey()), Long.valueOf(pIntegerPair.getVal())));
            }
            LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}", newArrayList, str2, uuid, Long.valueOf(j));
            return newArrayList;
        } catch (Exception e) {
            LOG.warn("failed to get latest offsets.", e);
            throw new LoadException("Failed to get latest offsets of kafka topic: " + str2 + ". error: " + e.getMessage());
        }
    }
}
