/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.client.gateway;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.rest.UrlPrefixDecorator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.table.api.SqlParserEOFException;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.StatementResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeaders;
import org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
import org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders;
import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
import org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody;
import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
import org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.message.util.GetApiVersionResponseBody;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorImpl
implements Executor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorImpl.class);
    private static final long HEARTBEAT_INTERVAL_MILLISECONDS = 60000L;
    private final AutoCloseableRegistry registry = new AutoCloseableRegistry();
    private final URL gatewayUrl;
    private final ExecutorService executorService;
    private final RestClient restClient;
    private final SqlGatewayRestAPIVersion connectionVersion;
    private final SessionHandle sessionHandle;
    private final RowFormat rowFormat;
    private final Collection<HttpHeader> customHttpHeaders;

    public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId) {
        this(defaultContext, NetUtils.socketToUrl((InetSocketAddress)gatewayAddress), sessionId, 60000L, RowFormat.PLAIN_TEXT);
    }

    public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId, RowFormat rowFormat) {
        this(defaultContext, NetUtils.socketToUrl((InetSocketAddress)gatewayAddress), sessionId, 60000L, rowFormat);
    }

    public ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String sessionId) {
        this(defaultContext, gatewayUrl, sessionId, 60000L, RowFormat.PLAIN_TEXT);
    }

    @VisibleForTesting
    ExecutorImpl(DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId, long heartbeatInterval) {
        this(defaultContext, NetUtils.socketToUrl((InetSocketAddress)gatewayAddress), sessionId, heartbeatInterval, RowFormat.PLAIN_TEXT);
    }

    @VisibleForTesting
    ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String sessionId, long heartbeatInterval, RowFormat rowFormat) {
        this.gatewayUrl = gatewayUrl;
        this.rowFormat = rowFormat;
        this.customHttpHeaders = ClientUtils.readHeadersFromEnvironmentVariable((String)"FLINK_REST_CLIENT_HEADERS");
        try {
            this.executorService = Executors.newCachedThreadPool();
            this.registry.registerCloseable(this.executorService::shutdownNow);
            Configuration flinkConfig = defaultContext.getFlinkConfig();
            this.restClient = RestClient.forUrl((Configuration)flinkConfig, (java.util.concurrent.Executor)this.executorService, (URL)gatewayUrl);
            this.registry.registerCloseable((AutoCloseable)this.restClient);
            this.connectionVersion = this.negotiateVersion();
            LOG.info("Open session to {} with connection version: {}.", (Object)gatewayUrl, (Object)this.connectionVersion);
            OpenSessionResponseBody response = (OpenSessionResponseBody)this.sendRequest(OpenSessionHeaders.getInstance(), EmptyMessageParameters.getInstance(), new OpenSessionRequestBody(sessionId, flinkConfig.toMap())).get();
            this.sessionHandle = new SessionHandle(UUID.fromString(response.getSessionHandle()));
            this.registry.registerCloseable(this::closeSession);
            ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
            this.registry.registerCloseable(heartbeatScheduler::shutdownNow);
            heartbeatScheduler.scheduleAtFixedRate(() -> ExecutorImpl.getResponse(this.sendRequest(TriggerSessionHeartbeatHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), EmptyRequestBody.getInstance())), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            try {
                this.registry.close();
            }
            catch (Throwable t) {
                e.addSuppressed(t);
            }
            throw new SqlClientException("Failed to create the executor.", e);
        }
    }

    @Override
    public void configureSession(String statement) {
        try {
            this.sendRequest(ConfigureSessionHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), new ConfigureSessionRequestBody(statement)).get();
        }
        catch (Exception e) {
            throw new SqlExecutionException(String.format("Failed to open session to %s", this.gatewayUrl), e);
        }
    }

    @Override
    public ReadableConfig getSessionConfig() {
        try {
            return Configuration.fromMap(this.getSessionConfigMap());
        }
        catch (Exception e) {
            throw new SqlExecutionException("Failed to get the get session config.", e);
        }
    }

    @Override
    public Map<String, String> getSessionConfigMap() {
        try {
            GetSessionConfigResponseBody response = (GetSessionConfigResponseBody)ExecutorImpl.getResponse(this.sendRequest(GetSessionConfigHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), EmptyRequestBody.getInstance()));
            return response.getProperties();
        }
        catch (Exception e) {
            throw new SqlExecutionException("Failed to get the get session config.", e);
        }
    }

    @Override
    public StatementResult executeStatement(String statement) {
        ExecuteStatementRequestBody request = new ExecuteStatementRequestBody(statement);
        CompletableFuture executeStatementResponse = this.sendRequest(ExecuteStatementHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), request);
        OperationHandle operationHandle = this.getOperationHandle(() -> ((ExecuteStatementResponseBody)ExecutorImpl.getResponse(executeStatementResponse, e -> {
            this.executorService.submit(() -> {
                try {
                    ExecuteStatementResponseBody executeStatementResponseBody = (ExecuteStatementResponseBody)executeStatementResponse.get();
                    this.closeOperationAsync(this.getOperationHandle(() -> ((ExecuteStatementResponseBody)executeStatementResponseBody).getOperationHandle()));
                }
                catch (Exception newException) {
                    e.addSuppressed(newException);
                    LOG.error("Failed to cancel the interrupted exception.", (Throwable)e);
                }
            });
            return new SqlExecutionException("Interrupted to get response.", (Throwable)e);
        })).getOperationHandle());
        FetchResultsResponseBody fetchResultsResponse = this.fetchUtilResultsReady(operationHandle);
        ResultInfo firstResult = fetchResultsResponse.getResults();
        return new StatementResult(fetchResultsResponse.getResults().getResultSchema(), new RowDataInfoIterator(operationHandle, firstResult, SqlGatewayRestEndpointUtils.parseToken((String)fetchResultsResponse.getNextResultUri())), fetchResultsResponse.isQueryResult(), fetchResultsResponse.getResultKind(), fetchResultsResponse.getJobID());
    }

    @Override
    public List<String> completeStatement(String statement, int position) {
        return ((CompleteStatementResponseBody)ExecutorImpl.getResponse(this.sendRequest(CompleteStatementHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), new CompleteStatementRequestBody(statement, position)))).getCandidates();
    }

    @Override
    public void close() {
        if (!this.registry.isClosed()) {
            try {
                this.registry.close();
            }
            catch (Throwable t) {
                Exception e = t instanceof Exception ? (Exception)t : new Exception(t);
                LOG.error("Exception happens when closing the Executor.", ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)t));
            }
        }
    }

    @VisibleForTesting
    public SessionHandle getSessionHandle() {
        return this.sessionHandle;
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) {
        Preconditions.checkNotNull((Object)this.connectionVersion, (String)"The connection version should not be null.");
        CustomHeadersDecorator headers = new CustomHeadersDecorator((MessageHeaders)new UrlPrefixDecorator(messageHeaders, this.gatewayUrl.getPath()));
        headers.setCustomHeaders(this.customHttpHeaders);
        return this.sendRequest(headers, messageParameters, request, this.connectionVersion);
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request, SqlGatewayRestAPIVersion connectionVersion) {
        try {
            return this.restClient.sendRequest(this.gatewayUrl.getHost(), this.gatewayUrl.getPort(), messageHeaders, messageParameters, request, Collections.emptyList(), (RestAPIVersion)connectionVersion);
        }
        catch (IOException ioException) {
            throw new SqlExecutionException("Failed to connect to the SQL Gateway.", ioException);
        }
    }

    private FetchResultsResponseBody fetchUtilResultsReady(OperationHandle operationHandle) {
        FetchResultsResponseBody response;
        while ((response = this.getFetchResultResponse(operationHandle, 0L, true, e -> {
            this.closeOperationAsync(operationHandle);
            return new SqlExecutionException("Interrupted to fetch results.", (Throwable)e);
        })).getResultType().equals((Object)ResultSet.ResultType.NOT_READY)) {
        }
        return response;
    }

    private FetchResultsResponseBody getFetchResultResponse(OperationHandle operationHandle, long token, boolean fetchResultWithInterval, Function<InterruptedException, SqlExecutionException> interruptedExceptionHandler) {
        try {
            if (fetchResultWithInterval) {
                Thread.sleep(100L);
            }
            return (FetchResultsResponseBody)this.sendRequest(FetchResultsHeaders.getDefaultInstance(), new FetchResultsMessageParameters(this.sessionHandle, operationHandle, Long.valueOf(token), this.rowFormat), EmptyRequestBody.getInstance()).get();
        }
        catch (InterruptedException e) {
            throw interruptedExceptionHandler.apply(e);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RestClientException && cause.getMessage().contains("Encountered \"<EOF>\"")) {
                throw new SqlExecutionException("The SQL statement is incomplete.", (Throwable)new SqlParserEOFException(cause.getMessage(), cause));
            }
            throw new SqlExecutionException(String.format("Failed to get response for the operation %s.", operationHandle), cause);
        }
    }

    private static <T> T getResponse(CompletableFuture<T> future) {
        return ExecutorImpl.getResponse(future, e -> new SqlExecutionException("Interrupted to get response.", (Throwable)e));
    }

    private static <T> T getResponse(CompletableFuture<T> future, Function<InterruptedException, SqlExecutionException> interruptedExceptionHandler) {
        try {
            return future.get();
        }
        catch (ExecutionException executionException) {
            Throwable cause = executionException.getCause();
            throw new SqlExecutionException("Failed to get response.", cause);
        }
        catch (InterruptedException e) {
            throw interruptedExceptionHandler.apply(e);
        }
    }

    private CompletableFuture<OperationStatusResponseBody> closeOperationAsync(OperationHandle operationHandle) {
        return this.sendRequest(CloseOperationHeaders.getInstance(), new OperationMessageParameters(this.sessionHandle, operationHandle), EmptyRequestBody.getInstance());
    }

    private OperationHandle getOperationHandle(Supplier<String> handleSupplier) {
        return new OperationHandle(UUID.fromString(handleSupplier.get()));
    }

    private SqlGatewayRestAPIVersion negotiateVersion() throws Exception {
        CustomHeadersDecorator headers = new CustomHeadersDecorator((MessageHeaders)new UrlPrefixDecorator((MessageHeaders)GetApiVersionHeaders.getInstance(), this.gatewayUrl.getPath()));
        headers.setCustomHeaders(this.customHttpHeaders);
        List gatewayVersions = ((GetApiVersionResponseBody)ExecutorImpl.getResponse(this.restClient.sendRequest(this.gatewayUrl.getHost(), this.gatewayUrl.getPort(), (MessageHeaders)headers, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), (RestAPIVersion)Collections.min(SqlGatewayRestAPIVersion.getStableVersions())))).getVersions().stream().map(SqlGatewayRestAPIVersion::valueOf).collect(Collectors.toList());
        SqlGatewayRestAPIVersion clientVersion = SqlGatewayRestAPIVersion.getDefaultVersion();
        if (gatewayVersions.contains(clientVersion)) {
            return clientVersion;
        }
        SqlGatewayRestAPIVersion latestVersion = (SqlGatewayRestAPIVersion)RestAPIVersion.getLatestVersion(gatewayVersions);
        if (latestVersion.equals((Object)SqlGatewayRestAPIVersion.V1)) {
            throw new SqlExecutionException("Currently, SQL Client only supports to connect to the REST endpoint with API version larger than V1.");
        }
        return latestVersion;
    }

    private void closeSession() throws SqlExecutionException {
        if (this.sessionHandle == null) {
            return;
        }
        try {
            CompletableFuture response = this.sendRequest(CloseSessionHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), EmptyRequestBody.getInstance());
            if (!((CloseSessionResponseBody)response.get()).getStatus().equals("CLOSED")) {
                LOG.warn("The status of close session response isn't {}.", (Object)"CLOSED");
            }
        }
        catch (Exception e) {
            LOG.warn(String.format("Unexpected error occurs when closing session %s.", this.sessionHandle), (Throwable)e);
        }
    }

    @VisibleForTesting
    Collection<HttpHeader> getCustomHttpHeaders() {
        return this.customHttpHeaders;
    }

    private class RowDataInfoIterator
    implements CloseableIterator<RowData> {
        private final OperationHandle operationHandle;
        private Iterator<RowData> current;
        @Nullable
        private Long nextToken;

        public RowDataInfoIterator(OperationHandle operationHandle, @Nullable ResultInfo resultInfo, Long nextToken) {
            this.operationHandle = operationHandle;
            this.current = resultInfo.getData().iterator();
            this.nextToken = nextToken;
        }

        public void close() throws Exception {
            ExecutorImpl.getResponse(ExecutorImpl.this.closeOperationAsync(this.operationHandle));
        }

        public boolean hasNext() {
            if (!this.current.hasNext()) {
                while (this.nextToken != null && !this.current.hasNext()) {
                    FetchResultsResponseBody fetchResultsResponseBody = this.fetchResults(this.operationHandle, this.nextToken);
                    this.nextToken = SqlGatewayRestEndpointUtils.parseToken((String)fetchResultsResponseBody.getNextResultUri());
                    this.current = fetchResultsResponseBody.getResults().getData().iterator();
                }
            }
            return this.current.hasNext();
        }

        public RowData next() {
            return this.current.next();
        }

        private FetchResultsResponseBody fetchResults(OperationHandle operationHandle, long token) {
            return ExecutorImpl.this.getFetchResultResponse(operationHandle, token, false, e -> {
                ExecutorImpl.this.sendRequest(CancelOperationHeaders.getInstance(), new OperationMessageParameters(ExecutorImpl.this.sessionHandle, operationHandle), EmptyRequestBody.getInstance());
                return new SqlExecutionException("Interrupted to fetch results.", (Throwable)e);
            });
        }
    }
}

