package org.apache.doris.load;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.UnmodifiableIterator;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.doris.analysis.ShowStreamLoadStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.StreamLoadAuditEvent;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStreamLoadRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/doris/load/StreamLoadRecordMgr.class */
public class StreamLoadRecordMgr extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
    Queue<StreamLoadItem> streamLoadRecordHeap;
    private Map<Long, Map<String, StreamLoadRecord>> dbIdToLabelToStreamLoadRecord;
    private ReentrantReadWriteLock lock;

    /* loaded from: input_file:org/apache/doris/load/StreamLoadRecordMgr$FetchStreamLoadRecord.class */
    public static class FetchStreamLoadRecord implements Writable {

        @SerializedName("beIdToLastStreamLoad")
        private Map<Long, Long> beIdToLastStreamLoad;

        public FetchStreamLoadRecord(Map<Long, Long> map) {
            this.beIdToLastStreamLoad = map;
        }

        public void setBeIdToLastStreamLoad(Map<Long, Long> map) {
            this.beIdToLastStreamLoad = map;
        }

        public Map<Long, Long> getBeIdToLastStreamLoad() {
            return this.beIdToLastStreamLoad;
        }

        public void write(DataOutput dataOutput) throws IOException {
            Text.writeString(dataOutput, GsonUtils.GSON.toJson(this));
        }

        public static FetchStreamLoadRecord read(DataInput dataInput) throws IOException {
            return (FetchStreamLoadRecord) GsonUtils.GSON.fromJson(Text.readString(dataInput), FetchStreamLoadRecord.class);
        }
    }

    /* loaded from: input_file:org/apache/doris/load/StreamLoadRecordMgr$StreamLoadComparator.class */
    class StreamLoadComparator implements Comparator<StreamLoadItem> {
        StreamLoadComparator() {
        }

        @Override // java.util.Comparator
        public int compare(StreamLoadItem streamLoadItem, StreamLoadItem streamLoadItem2) {
            return streamLoadItem.getFinishTime().compareTo(streamLoadItem2.getFinishTime());
        }
    }

    /* loaded from: input_file:org/apache/doris/load/StreamLoadRecordMgr$StreamLoadItem.class */
    public class StreamLoadItem {
        private String label;
        private long dbId;
        private String finishTime;

        public StreamLoadItem(String str, long j, String str2) {
            this.label = str;
            this.dbId = j;
            this.finishTime = str2;
        }

        public String getLabel() {
            return this.label;
        }

        public long getDbId() {
            return this.dbId;
        }

        public String getFinishTime() {
            return this.finishTime;
        }

        public List<String> getStatistics() {
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.add(this.label);
            newArrayList.add(String.valueOf(this.dbId));
            newArrayList.add(this.finishTime);
            return newArrayList;
        }
    }

    public StreamLoadRecordMgr(String str, long j) {
        super(str, j);
        this.streamLoadRecordHeap = new PriorityQueue(new StreamLoadComparator());
        this.dbIdToLabelToStreamLoadRecord = Maps.newConcurrentMap();
        this.lock = new ReentrantReadWriteLock();
    }

    public void addStreamLoadRecord(long j, String str, StreamLoadRecord streamLoadRecord) {
        writeLock();
        while (isQueueFull()) {
            StreamLoadItem poll = this.streamLoadRecordHeap.poll();
            if (poll != null) {
                String label = poll.getLabel();
                Iterator<Map.Entry<String, StreamLoadRecord>> it = this.dbIdToLabelToStreamLoadRecord.get(Long.valueOf(poll.getDbId())).entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    } else if (it.next().getKey().equals(label)) {
                        it.remove();
                        break;
                    }
                }
            }
        }
        this.streamLoadRecordHeap.offer(new StreamLoadItem(str, j, streamLoadRecord.getFinishTime()));
        if (!this.dbIdToLabelToStreamLoadRecord.containsKey(Long.valueOf(j))) {
            this.dbIdToLabelToStreamLoadRecord.put(Long.valueOf(j), new ConcurrentHashMap());
        }
        Map<String, StreamLoadRecord> map = this.dbIdToLabelToStreamLoadRecord.get(Long.valueOf(j));
        if (!map.containsKey(str)) {
            map.put(str, streamLoadRecord);
        } else if (map.get(str).getFinishTime().compareTo(streamLoadRecord.getFinishTime()) < 0) {
            map.put(str, streamLoadRecord);
        }
        writeUnlock();
    }

    public List<StreamLoadItem> getStreamLoadRecords() {
        return new ArrayList(this.streamLoadRecordHeap);
    }

    public List<List<Comparable>> getStreamLoadRecordByDb(long j, String str, boolean z, ShowStreamLoadStmt.StreamLoadState streamLoadState) {
        LinkedList linkedList = new LinkedList();
        readLock();
        try {
            if (!this.dbIdToLabelToStreamLoadRecord.containsKey(Long.valueOf(j))) {
                return linkedList;
            }
            ArrayList<StreamLoadRecord> newArrayList = Lists.newArrayList();
            Map<String, StreamLoadRecord> map = this.dbIdToLabelToStreamLoadRecord.get(Long.valueOf(j));
            if (Strings.isNullOrEmpty(str)) {
                newArrayList.addAll((Collection) map.values().stream().collect(Collectors.toList()));
            } else if (!z) {
                for (Map.Entry<String, StreamLoadRecord> entry : map.entrySet()) {
                    if (entry.getKey().contains(str)) {
                        newArrayList.add(entry.getValue());
                    }
                }
            } else {
                if (!map.containsKey(str)) {
                    readUnlock();
                    return linkedList;
                }
                newArrayList.add(map.get(str));
            }
            for (StreamLoadRecord streamLoadRecord : newArrayList) {
                if (streamLoadState != null) {
                    if (!String.valueOf(streamLoadState).equalsIgnoreCase(streamLoadRecord.getStatus())) {
                    }
                }
                linkedList.add(streamLoadRecord.getStreamLoadInfo());
            }
            readUnlock();
            return linkedList;
        } finally {
            readUnlock();
        }
    }

    public void clearStreamLoadRecord() {
        writeLock();
        if (this.streamLoadRecordHeap.size() > 0 || this.dbIdToLabelToStreamLoadRecord.size() > 0) {
            this.streamLoadRecordHeap.clear();
            this.dbIdToLabelToStreamLoadRecord.clear();
        }
        writeUnlock();
    }

    public boolean isQueueFull() {
        return this.streamLoadRecordHeap.size() >= Config.max_stream_load_record_size;
    }

    private void readLock() {
        this.lock.readLock().lock();
    }

    private void readUnlock() {
        this.lock.readLock().unlock();
    }

    private void writeLock() {
        this.lock.writeLock().lock();
    }

    private void writeUnlock() {
        this.lock.writeLock().unlock();
    }

    @Override // org.apache.doris.common.util.MasterDaemon
    protected void runAfterCatalogReady() {
        ImmutableMap<Long, Backend> idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        HashMap newHashMap = Maps.newHashMap();
        UnmodifiableIterator it = idToBackend.values().iterator();
        while (it.hasNext()) {
            Backend backend = (Backend) it.next();
            BackendService.Client client = null;
            TNetworkAddress tNetworkAddress = null;
            try {
                try {
                    tNetworkAddress = new TNetworkAddress(backend.getHost(), backend.getBePort());
                    client = ClientPool.backendPool.borrowObject(tNetworkAddress);
                    Map streamLoadRecord = client.getStreamLoadRecord(backend.getLastStreamLoadTime()).getStreamLoadRecord();
                    i += streamLoadRecord.size();
                    long j = -1;
                    for (Map.Entry entry : streamLoadRecord.entrySet()) {
                        TStreamLoadRecord tStreamLoadRecord = (TStreamLoadRecord) entry.getValue();
                        String longToTimeString = TimeUtils.longToTimeString(tStreamLoadRecord.getStartTime(), TimeUtils.DATETIME_MS_FORMAT);
                        String longToTimeString2 = TimeUtils.longToTimeString(tStreamLoadRecord.getFinishTime(), TimeUtils.DATETIME_MS_FORMAT);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}, status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", new Object[]{backend.getHost(), tStreamLoadRecord.getLabel(), tStreamLoadRecord.getDb(), tStreamLoadRecord.getTbl(), tStreamLoadRecord.getUser(), tStreamLoadRecord.getUserIp(), tStreamLoadRecord.getStatus(), tStreamLoadRecord.getMessage(), tStreamLoadRecord.getUrl(), Long.valueOf(tStreamLoadRecord.getTotalRows()), Long.valueOf(tStreamLoadRecord.getLoadedRows()), Long.valueOf(tStreamLoadRecord.getFilteredRows()), Long.valueOf(tStreamLoadRecord.getUnselectedRows()), Long.valueOf(tStreamLoadRecord.getLoadBytes()), longToTimeString, longToTimeString2});
                        }
                        Env.getCurrentEnv().getAuditEventProcessor().handleAuditEvent(new StreamLoadAuditEvent.AuditEventBuilder().setEventType(AuditEvent.EventType.STREAM_LOAD_FINISH).setLabel(tStreamLoadRecord.getLabel()).setDb(tStreamLoadRecord.getDb()).setTable(tStreamLoadRecord.getTbl()).setUser(tStreamLoadRecord.getUser()).setClientIp(tStreamLoadRecord.getUserIp()).setStatus(tStreamLoadRecord.getStatus()).setMessage(tStreamLoadRecord.getMessage()).setUrl(tStreamLoadRecord.getUrl()).setTotalRows(tStreamLoadRecord.getTotalRows()).setLoadedRows(tStreamLoadRecord.getLoadedRows()).setFilteredRows(tStreamLoadRecord.getFilteredRows()).setUnselectedRows(tStreamLoadRecord.getUnselectedRows()).setLoadBytes(tStreamLoadRecord.getLoadBytes()).setStartTime(longToTimeString).setFinishTime(longToTimeString2).build());
                        if (((TStreamLoadRecord) entry.getValue()).getFinishTime() > j) {
                            j = ((TStreamLoadRecord) entry.getValue()).getFinishTime();
                        }
                        if (!Config.disable_show_stream_load) {
                            StreamLoadRecord streamLoadRecord2 = new StreamLoadRecord(tStreamLoadRecord.getLabel(), tStreamLoadRecord.getDb(), tStreamLoadRecord.getTbl(), tStreamLoadRecord.getUserIp(), tStreamLoadRecord.getStatus(), tStreamLoadRecord.getMessage(), tStreamLoadRecord.getUrl(), String.valueOf(tStreamLoadRecord.getTotalRows()), String.valueOf(tStreamLoadRecord.getLoadedRows()), String.valueOf(tStreamLoadRecord.getFilteredRows()), String.valueOf(tStreamLoadRecord.getUnselectedRows()), String.valueOf(tStreamLoadRecord.getLoadBytes()), longToTimeString, longToTimeString2, tStreamLoadRecord.getUser(), tStreamLoadRecord.getComment());
                            String cluster = tStreamLoadRecord.getCluster();
                            if (Strings.isNullOrEmpty(cluster)) {
                                cluster = "default_cluster";
                            }
                            String fullName = ClusterNamespace.getFullName(cluster, tStreamLoadRecord.getDb());
                            Database dbNullable = Env.getCurrentInternalCatalog().getDbNullable(fullName);
                            if (dbNullable == null) {
                                String str = fullName;
                                if (Strings.isNullOrEmpty(tStreamLoadRecord.getCluster())) {
                                    str = tStreamLoadRecord.getDb();
                                }
                                throw new UserException("unknown database, database=" + str);
                                break;
                            }
                            Env.getCurrentEnv().getStreamLoadRecordMgr().addStreamLoadRecord(dbNullable.getId(), tStreamLoadRecord.getLabel(), streamLoadRecord2);
                        }
                    }
                    if (streamLoadRecord.size() > 0) {
                        backend.setLastStreamLoadTime(j);
                        newHashMap.put(Long.valueOf(backend.getId()), Long.valueOf(j));
                    } else {
                        newHashMap.put(Long.valueOf(backend.getId()), Long.valueOf(backend.getLastStreamLoadTime()));
                    }
                    if (1 != 0) {
                        ClientPool.backendPool.returnObject(tNetworkAddress, client);
                    } else {
                        ClientPool.backendPool.invalidateObject(tNetworkAddress, client);
                    }
                } catch (Exception e) {
                    LOG.warn("task exec error. backend[{}]", Long.valueOf(backend.getId()), e);
                    if (0 != 0) {
                        ClientPool.backendPool.returnObject(tNetworkAddress, client);
                    } else {
                        ClientPool.backendPool.invalidateObject(tNetworkAddress, client);
                    }
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    ClientPool.backendPool.returnObject(tNetworkAddress, client);
                } else {
                    ClientPool.backendPool.invalidateObject(tNetworkAddress, client);
                }
                throw th;
            }
        }
        LOG.info("finished to pull stream load records of all backends. record size: {}, cost: {} ms", Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        if (i > 0) {
            Env.getCurrentEnv().getEditLog().logFetchStreamLoadRecord(new FetchStreamLoadRecord(newHashMap));
        }
        if (Config.disable_show_stream_load) {
            Env.getCurrentEnv().getStreamLoadRecordMgr().clearStreamLoadRecord();
        }
    }

    public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) {
        ImmutableMap<Long, Backend> idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
        Map<Long, Long> beIdToLastStreamLoad = fetchStreamLoadRecord.getBeIdToLastStreamLoad();
        UnmodifiableIterator it = idToBackend.values().iterator();
        while (it.hasNext()) {
            Backend backend = (Backend) it.next();
            if (beIdToLastStreamLoad.containsKey(Long.valueOf(backend.getId()))) {
                long longValue = beIdToLastStreamLoad.get(Long.valueOf(backend.getId())).longValue();
                LOG.info("Replay stream load bdbje. backend: {}, last stream load time: {}", backend.getHost(), Long.valueOf(longValue));
                backend.setLastStreamLoadTime(longValue);
            }
        }
    }
}
