package org.apache.doris.qe;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;

/* loaded from: input_file:org/apache/doris/qe/ResultReceiver.class */
public class ResultReceiver {
    private static final Logger LOG = LogManager.getLogger(ResultReceiver.class);
    private boolean isDone = false;
    private boolean isCancel = false;
    private long packetIdx = 0;
    private long timeoutTs;
    private TNetworkAddress address;
    private Types.PUniqueId queryId;
    private Types.PUniqueId finstId;
    private Long backendId;
    private Thread currentThread;

    public ResultReceiver(TUniqueId tUniqueId, TUniqueId tUniqueId2, Long l, TNetworkAddress tNetworkAddress, long j) {
        this.timeoutTs = 0L;
        this.queryId = Types.PUniqueId.newBuilder().setHi(tUniqueId.hi).setLo(tUniqueId.lo).m9713build();
        this.finstId = Types.PUniqueId.newBuilder().setHi(tUniqueId2.hi).setLo(tUniqueId2.lo).m9713build();
        this.backendId = l;
        this.address = tNetworkAddress;
        this.timeoutTs = j;
    }

    public RowBatch getNext(Status status) throws TException {
        if (this.isDone) {
            return null;
        }
        RowBatch rowBatch = new RowBatch();
        while (!this.isDone && !this.isCancel) {
            try {
                try {
                    try {
                        InternalService.PFetchDataRequest build = InternalService.PFetchDataRequest.newBuilder().setFinstId(this.finstId).setRespInAttachment(false).build();
                        this.currentThread = Thread.currentThread();
                        Future<InternalService.PFetchDataResult> fetchDataAsync = BackendServiceProxy.getInstance().fetchDataAsync(this.address, build);
                        InternalService.PFetchDataResult pFetchDataResult = null;
                        while (pFetchDataResult == null) {
                            long currentTimeMillis = System.currentTimeMillis();
                            if (currentTimeMillis >= this.timeoutTs) {
                                throw new TimeoutException("query timeout, query id = " + DebugUtil.printId(this.queryId));
                            }
                            try {
                                pFetchDataResult = fetchDataAsync.get(this.timeoutTs - currentTimeMillis, TimeUnit.MILLISECONDS);
                            } catch (InterruptedException e) {
                                LOG.info("future get interrupted Exception", e);
                                if (this.isCancel) {
                                    status.setStatus(Status.CANCELLED);
                                    synchronized (this) {
                                        this.currentThread = null;
                                        return null;
                                    }
                                }
                            }
                        }
                        if (TStatusCode.findByValue(pFetchDataResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                            status.setPstatus(pFetchDataResult.getStatus());
                            synchronized (this) {
                                this.currentThread = null;
                            }
                            return null;
                        }
                        rowBatch.setQueryStatistics(pFetchDataResult.getQueryStatistics());
                        if (this.packetIdx != pFetchDataResult.getPacketSeq()) {
                            LOG.warn("finistId={}, receive packet failed, expect={}, receive={}", DebugUtil.printId(this.finstId), Long.valueOf(this.packetIdx), Long.valueOf(pFetchDataResult.getPacketSeq()));
                            status.setRpcStatus("receive error packet");
                            synchronized (this) {
                                this.currentThread = null;
                            }
                            return null;
                        }
                        this.packetIdx++;
                        this.isDone = pFetchDataResult.getEos();
                        if (pFetchDataResult.hasEmptyBatch() && pFetchDataResult.getEmptyBatch()) {
                            LOG.info("finistId={}, get first empty rowbatch", DebugUtil.printId(this.finstId));
                            rowBatch.setEos(false);
                            synchronized (this) {
                                this.currentThread = null;
                            }
                            return rowBatch;
                        }
                        if (pFetchDataResult.hasRowBatch() && pFetchDataResult.getRowBatch().size() > 0) {
                            byte[] byteArray = pFetchDataResult.getRowBatch().toByteArray();
                            TResultBatch tResultBatch = new TResultBatch();
                            new TDeserializer().deserialize(tResultBatch, byteArray);
                            rowBatch.setBatch(tResultBatch);
                            rowBatch.setEos(pFetchDataResult.getEos());
                            synchronized (this) {
                                this.currentThread = null;
                            }
                            return rowBatch;
                        }
                    } catch (RpcException e2) {
                        LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(this.finstId), e2);
                        status.setRpcStatus(e2.getMessage());
                        SimpleScheduler.addToBlacklist(this.backendId, e2.getMessage());
                        synchronized (this) {
                            this.currentThread = null;
                        }
                    }
                } catch (ExecutionException e3) {
                    LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(this.finstId), e3);
                    if (e3.getMessage().contains("time out")) {
                        status.setStatus(new Status(TStatusCode.TIMEOUT, e3.getMessage()));
                    } else {
                        status.setRpcStatus(e3.getMessage());
                        SimpleScheduler.addToBlacklist(this.backendId, e3.getMessage());
                    }
                    synchronized (this) {
                        this.currentThread = null;
                    }
                } catch (TimeoutException e4) {
                    LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(this.finstId), e4);
                    status.setStatus(new Status(TStatusCode.TIMEOUT, "query timeout"));
                    synchronized (this) {
                        this.currentThread = null;
                    }
                }
            } catch (Throwable th) {
                synchronized (this) {
                    this.currentThread = null;
                    throw th;
                }
            }
        }
        synchronized (this) {
            this.currentThread = null;
        }
        if (this.isCancel) {
            status.setStatus(Status.CANCELLED);
        }
        return rowBatch;
    }

    public void cancel() {
        this.isCancel = true;
        synchronized (this) {
            if (this.currentThread != null) {
            }
        }
    }
}
