package org.apache.doris.qe;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.thrift.TException;

/* loaded from: input_file:org/apache/doris/qe/InsertStreamTxnExecutor.class */
public class InsertStreamTxnExecutor {
    private long txnId;
    private TUniqueId loadId;
    private TransactionEntry txnEntry;

    public InsertStreamTxnExecutor(TransactionEntry transactionEntry) {
        this.txnEntry = transactionEntry;
    }

    public void beginTransaction(TStreamLoadPutRequest tStreamLoadPutRequest) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException {
        TTxnParams txnConf = this.txnEntry.getTxnConf();
        StreamLoadTask fromTStreamLoadPutRequest = StreamLoadTask.fromTStreamLoadPutRequest(tStreamLoadPutRequest);
        StreamLoadPlanner streamLoadPlanner = new StreamLoadPlanner(this.txnEntry.getDb(), (OlapTable) this.txnEntry.getTable(), fromTStreamLoadPutRequest);
        if (Config.enable_pipeline_load) {
            TPipelineFragmentParams planForPipeline = streamLoadPlanner.planForPipeline(fromTStreamLoadPutRequest.getId());
            BeSelectionPolicy build = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
            List<Long> selectBackendIdsByPolicy = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(build, 1);
            if (selectBackendIdsByPolicy.isEmpty()) {
                throw new UserException("No available backend to match the policy: " + build);
            }
            planForPipeline.setTxnConf(txnConf).setImportLabel(this.txnEntry.getLabel());
            Iterator it = ((TPipelineInstanceParams) planForPipeline.local_params.get(0)).per_node_scan_ranges.entrySet().iterator();
            while (it.hasNext()) {
                for (TScanRangeParams tScanRangeParams : (List) ((Map.Entry) it.next()).getValue()) {
                    tScanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(TFileFormatType.FORMAT_PROTO);
                    tScanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(TFileCompressType.PLAIN);
                }
            }
            txnConf.setFragmentInstanceId(((TPipelineInstanceParams) planForPipeline.local_params.get(0)).fragment_instance_id);
            this.loadId = tStreamLoadPutRequest.getLoadId();
            this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder().setHi(this.loadId.getHi()).setLo(this.loadId.getLo()).m9713build());
            Backend backend = (Backend) Env.getCurrentSystemInfo().getIdToBackend().get(selectBackendIdsByPolicy.get(0));
            txnConf.setUserIp(backend.getHost());
            this.txnEntry.setBackend(backend);
            TNetworkAddress tNetworkAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
            try {
                TPipelineFragmentParamsList tPipelineFragmentParamsList = new TPipelineFragmentParamsList();
                tPipelineFragmentParamsList.addToParamsList(planForPipeline);
                InternalService.PExecPlanFragmentResult pExecPlanFragmentResult = BackendServiceProxy.getInstance().execPlanFragmentsAsync(tNetworkAddress, tPipelineFragmentParamsList, false).get(5L, TimeUnit.SECONDS);
                if (TStatusCode.findByValue(pExecPlanFragmentResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                    throw new TException("failed to execute plan fragment: " + pExecPlanFragmentResult.getStatus().mo9445getErrorMsgsList());
                }
                return;
            } catch (RpcException e) {
                throw new TException(e);
            }
        }
        TExecPlanFragmentParams plan = streamLoadPlanner.plan(fromTStreamLoadPutRequest.getId());
        BeSelectionPolicy build2 = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
        List<Long> selectBackendIdsByPolicy2 = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(build2, 1);
        if (selectBackendIdsByPolicy2.isEmpty()) {
            throw new UserException("No available backend to match the policy: " + build2);
        }
        plan.setTxnConf(txnConf).setImportLabel(this.txnEntry.getLabel());
        Iterator it2 = plan.params.per_node_scan_ranges.entrySet().iterator();
        while (it2.hasNext()) {
            for (TScanRangeParams tScanRangeParams2 : (List) ((Map.Entry) it2.next()).getValue()) {
                tScanRangeParams2.scan_range.ext_scan_range.file_scan_range.params.setFormatType(TFileFormatType.FORMAT_PROTO);
                tScanRangeParams2.scan_range.ext_scan_range.file_scan_range.params.setCompressType(TFileCompressType.PLAIN);
            }
        }
        txnConf.setFragmentInstanceId(plan.params.fragment_instance_id);
        this.loadId = tStreamLoadPutRequest.getLoadId();
        this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder().setHi(this.loadId.getHi()).setLo(this.loadId.getLo()).m9713build());
        Backend backend2 = (Backend) Env.getCurrentSystemInfo().getIdToBackend().get(selectBackendIdsByPolicy2.get(0));
        txnConf.setUserIp(backend2.getHost());
        this.txnEntry.setBackend(backend2);
        TNetworkAddress tNetworkAddress2 = new TNetworkAddress(backend2.getHost(), backend2.getBrpcPort());
        try {
            TExecPlanFragmentParamsList tExecPlanFragmentParamsList = new TExecPlanFragmentParamsList();
            tExecPlanFragmentParamsList.addToParamsList(plan);
            InternalService.PExecPlanFragmentResult pExecPlanFragmentResult2 = BackendServiceProxy.getInstance().execPlanFragmentsAsync(tNetworkAddress2, tExecPlanFragmentParamsList, false).get(5L, TimeUnit.SECONDS);
            if (TStatusCode.findByValue(pExecPlanFragmentResult2.getStatus().getStatusCode()) != TStatusCode.OK) {
                throw new TException("failed to execute plan fragment: " + pExecPlanFragmentResult2.getStatus().mo9445getErrorMsgsList());
            }
        } catch (RpcException e2) {
            throw new TException(e2);
        }
    }

    public void commitTransaction() throws TException, TimeoutException, InterruptedException, ExecutionException {
        TTxnParams txnConf = this.txnEntry.getTxnConf();
        Types.PUniqueId m9713build = Types.PUniqueId.newBuilder().setHi(txnConf.getFragmentInstanceId().getHi()).setLo(txnConf.getFragmentInstanceId().getLo()).m9713build();
        Backend backend = this.txnEntry.getBackend();
        try {
            InternalService.PCommitResult pCommitResult = BackendServiceProxy.getInstance().commit(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), m9713build, this.txnEntry.getpLoadId()).get(5L, TimeUnit.SECONDS);
            if (TStatusCode.findByValue(pCommitResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                throw new TException("failed to commit txn: " + pCommitResult.getStatus().mo9445getErrorMsgsList());
            }
        } catch (RpcException e) {
            throw new TException(e);
        }
    }

    public void abortTransaction() throws TException, TimeoutException, InterruptedException, ExecutionException {
        TTxnParams txnConf = this.txnEntry.getTxnConf();
        Types.PUniqueId m9713build = Types.PUniqueId.newBuilder().setHi(txnConf.getFragmentInstanceId().getHi()).setLo(txnConf.getFragmentInstanceId().getLo()).m9713build();
        Backend backend = this.txnEntry.getBackend();
        try {
            InternalService.PRollbackResult pRollbackResult = BackendServiceProxy.getInstance().rollback(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), m9713build, this.txnEntry.getpLoadId()).get(5L, TimeUnit.SECONDS);
            if (TStatusCode.findByValue(pRollbackResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                throw new TException("failed to rollback txn: " + pRollbackResult.getStatus().mo9445getErrorMsgsList());
            }
        } catch (RpcException e) {
            throw new TException(e);
        }
    }

    public void sendData() throws TException, TimeoutException, InterruptedException, ExecutionException {
        if (this.txnEntry.getDataToSend() == null || this.txnEntry.getDataToSend().isEmpty()) {
            return;
        }
        TTxnParams txnConf = this.txnEntry.getTxnConf();
        Types.PUniqueId m9713build = Types.PUniqueId.newBuilder().setHi(txnConf.getFragmentInstanceId().getHi()).setLo(txnConf.getFragmentInstanceId().getLo()).m9713build();
        Backend backend = this.txnEntry.getBackend();
        try {
            try {
                InternalService.PSendDataResult pSendDataResult = BackendServiceProxy.getInstance().sendData(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), m9713build, this.txnEntry.getpLoadId(), this.txnEntry.getDataToSend()).get(5L, TimeUnit.SECONDS);
                if (TStatusCode.findByValue(pSendDataResult.getStatus().getStatusCode()) != TStatusCode.OK) {
                    throw new TException("failed to insert data: " + pSendDataResult.getStatus().mo9445getErrorMsgsList());
                }
            } catch (RpcException e) {
                throw new TException(e);
            }
        } finally {
            this.txnEntry.clearDataToSend();
        }
    }

    public TUniqueId getLoadId() {
        return this.loadId;
    }

    public void setLoadId(TUniqueId tUniqueId) {
        this.loadId = tUniqueId;
        this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder().setHi(tUniqueId.getHi()).setLo(tUniqueId.getLo()).m9713build());
    }

    public long getTxnId() {
        return this.txnId;
    }

    public void setTxnId(long j) {
        this.txnId = j;
    }

    public TransactionEntry getTxnEntry() {
        return this.txnEntry;
    }
}
