/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineconnplugin.flink.client.sql.operation.impl;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.AbstractJobOperation;
import org.apache.linkis.engineconnplugin.flink.client.sql.operation.result.ColumnInfo;
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext;
import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InsertOperation
extends AbstractJobOperation {
    private static final Logger LOG = LoggerFactory.getLogger(InsertOperation.class);
    private final String statement;
    private final List<ColumnInfo> columnInfos;
    private boolean fetched = false;

    public InsertOperation(FlinkEngineConnContext context, String statement, String tableIdentifier) {
        super(context);
        this.statement = statement;
        this.columnInfos = Collections.singletonList(ColumnInfo.create(tableIdentifier, (LogicalType)new BigIntType(false)));
    }

    @Override
    protected JobID submitJob() throws SqlExecutionException {
        return this.executeUpdateInternal(this.context.getExecutionContext());
    }

    @Override
    protected Optional<Tuple2<List<Row>, List<Boolean>>> fetchJobResults() {
        if (this.fetched) {
            return Optional.empty();
        }
        this.fetched = true;
        return Optional.of(Tuple2.of(Collections.singletonList(Row.of((Object[])new Object[]{-2L})), null));
    }

    @Override
    protected List<ColumnInfo> getColumnInfos() {
        return this.columnInfos;
    }

    private JobID executeUpdateInternal(ExecutionContext executionContext) throws SqlExecutionException {
        TableResult tableResult;
        TableEnvironment tableEnv = executionContext.getTableEnvironment();
        try {
            tableResult = executionContext.wrapClassLoader(() -> tableEnv.executeSql(this.statement));
        }
        catch (Exception t) {
            LOG.error(String.format("Invalid SQL query, sql is: %s.", this.statement), (Throwable)t);
            throw new SqlExecutionException(FlinkErrorCodeSummary.INVALID_SQL_STATEMENT.getErrorDesc(), (Throwable)t);
        }
        this.asyncNotify(tableResult);
        return ((JobClient)tableResult.getJobClient().get()).getJobID();
    }

    protected void asyncNotify(TableResult tableResult) {
        ((CompletableFuture)CompletableFuture.completedFuture(tableResult).thenApply(result -> {
            CloseableIterator iterator = result.collect();
            int affected = 0;
            while (iterator.hasNext()) {
                Row row = (Row)iterator.next();
                affected = Integer.parseInt(row.getField(0).toString());
            }
            return affected;
        })).whenComplete((affected, throwable) -> {
            if (throwable != null) {
                this.getFlinkStatusListeners().forEach(listener -> listener.onFailed("Error while submitting job.", (Throwable)throwable, RowsType.Affected()));
            } else {
                this.getFlinkStatusListeners().forEach(listener -> listener.onSuccess((int)affected, RowsType.Affected()));
            }
        });
    }
}

