/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.workflow.scheduler;

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.header.materializedtable.RefreshMaterializedTableHeaders;
import org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableParameters;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableRequestBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableResponseBody;
import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
import org.apache.flink.table.gateway.workflow.WorkflowInfo;
import org.apache.flink.table.gateway.workflow.scheduler.QuartzSchedulerUtils;
import org.apache.flink.table.gateway.workflow.scheduler.SchedulerException;
import org.apache.flink.table.shaded.org.quartz.CronScheduleBuilder;
import org.apache.flink.table.shaded.org.quartz.CronTrigger;
import org.apache.flink.table.shaded.org.quartz.Job;
import org.apache.flink.table.shaded.org.quartz.JobBuilder;
import org.apache.flink.table.shaded.org.quartz.JobDataMap;
import org.apache.flink.table.shaded.org.quartz.JobDetail;
import org.apache.flink.table.shaded.org.quartz.JobExecutionContext;
import org.apache.flink.table.shaded.org.quartz.JobExecutionException;
import org.apache.flink.table.shaded.org.quartz.JobKey;
import org.apache.flink.table.shaded.org.quartz.Scheduler;
import org.apache.flink.table.shaded.org.quartz.TriggerBuilder;
import org.apache.flink.table.shaded.org.quartz.TriggerKey;
import org.apache.flink.table.shaded.org.quartz.impl.StdSchedulerFactory;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class EmbeddedQuartzScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedQuartzScheduler.class);
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private Scheduler quartzScheduler;

    public void start() {
        Properties properties = QuartzSchedulerUtils.initializeQuartzSchedulerConfig();
        try {
            this.quartzScheduler = new StdSchedulerFactory(properties).getScheduler();
            this.quartzScheduler.start();
            LOG.info("Start quartz scheduler successfully.");
        }
        catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            String msg = String.format("Failed to start quartz scheduler with config: %s.", properties);
            LOG.error(msg);
            throw new SchedulerException(msg, e);
        }
    }

    public void stop() {
        try {
            this.quartzScheduler.shutdown();
        }
        catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            LOG.error("Failed to shutdown quartz schedule.");
            throw new SchedulerException("Failed to shutdown quartz scheduler.", e);
        }
    }

    public JobDetail createScheduleWorkflow(WorkflowInfo workflowInfo, String cronExpression) throws SchedulerException {
        String materializedTableIdentifier = workflowInfo.getMaterializedTableIdentifier();
        JobKey jobKey = QuartzSchedulerUtils.getJobKey(materializedTableIdentifier);
        this.lock.writeLock().lock();
        try {
            if (this.quartzScheduler.checkExists(jobKey)) {
                LOG.error("Materialized table {} quartz schedule job already exist, job info: {}.", (Object)materializedTableIdentifier, (Object)jobKey);
                throw new SchedulerException(String.format("Materialized table %s quartz schedule job already exist, job info: %s.", materializedTableIdentifier, jobKey));
            }
            JobDetail jobDetail = JobBuilder.newJob(EmbeddedSchedulerJob.class).withIdentity(jobKey).build();
            jobDetail.getJobDataMap().put("workflowInfo", QuartzSchedulerUtils.toJson(workflowInfo));
            TriggerKey triggerKey = TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup());
            CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionIgnoreMisfires()).forJob(jobDetail).build();
            this.quartzScheduler.scheduleJob(jobDetail, cronTrigger);
            LOG.info("Create quartz schedule job for materialized table {} successfully, job info: {}, cron expression: {}.", new Object[]{materializedTableIdentifier, jobKey, cronExpression});
            JobDetail jobDetail2 = jobDetail;
            return jobDetail2;
        }
        catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            LOG.error("Failed to create quartz schedule job for materialized table {}.", (Object)materializedTableIdentifier, (Object)e);
            throw new SchedulerException(String.format("Failed to create quartz schedule job for materialized table %s.", materializedTableIdentifier), e);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public void suspendScheduleWorkflow(String workflowName, String workflowGroup) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(workflowName, workflowGroup);
        this.lock.writeLock().lock();
        try {
            String errorMsg = String.format("Failed to suspend a non-existent quartz schedule job: %s.", jobKey);
            this.checkJobExists(jobKey, errorMsg);
            this.quartzScheduler.pauseJob(jobKey);
        }
        catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            LOG.error("Failed to suspend quartz schedule job: {}.", (Object)jobKey, (Object)e);
            throw new SchedulerException(String.format("Failed to suspend quartz schedule job: %s.", jobKey), e);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public void resumeScheduleWorkflow(String workflowName, String workflowGroup, Map<String, String> dynamicOptions) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(workflowName, workflowGroup);
        this.lock.writeLock().lock();
        try {
            String errorMsg = String.format("Failed to resume a non-existent quartz schedule job: %s.", jobKey);
            this.checkJobExists(jobKey, errorMsg);
            if (dynamicOptions.isEmpty()) {
                this.quartzScheduler.resumeJob(jobKey);
            } else {
                JobDetail jobDetail = this.quartzScheduler.getJobDetail(jobKey);
                WorkflowInfo workflowInfo = QuartzSchedulerUtils.fromJson(jobDetail.getJobDataMap().getString("workflowInfo"), WorkflowInfo.class);
                WorkflowInfo newWorkflowInfo = new WorkflowInfo(workflowInfo.getMaterializedTableIdentifier(), dynamicOptions, workflowInfo.getInitConfig(), workflowInfo.getExecutionConfig(), workflowInfo.getRestEndpointUrl());
                CronTrigger trigger = (CronTrigger)this.quartzScheduler.getTrigger(TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup()));
                String cronExpression = trigger.getCronExpression();
                this.quartzScheduler.deleteJob(jobKey);
                this.createScheduleWorkflow(newWorkflowInfo, cronExpression);
            }
        }
        catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            LOG.error("Failed to resume quartz schedule job: {}.", (Object)jobKey, (Object)e);
            throw new SchedulerException(String.format("Failed to resume quartz schedule job: %s.", jobKey), e);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public void deleteScheduleWorkflow(String workflowName, String workflowGroup) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(workflowName, workflowGroup);
        this.lock.writeLock().lock();
        try {
            this.quartzScheduler.deleteJob(jobKey);
        }
        catch (org.apache.flink.table.shaded.org.quartz.SchedulerException e) {
            LOG.error("Failed to delete quartz schedule job: {}.", (Object)jobKey, (Object)e);
            throw new SchedulerException(String.format("Failed to delete quartz schedule job: %s.", jobKey), e);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private void checkJobExists(JobKey jobKey, String errorMsg) throws org.apache.flink.table.shaded.org.quartz.SchedulerException {
        if (!this.quartzScheduler.checkExists(jobKey)) {
            LOG.error(errorMsg);
            throw new SchedulerException(errorMsg);
        }
    }

    @VisibleForTesting
    public Scheduler getQuartzScheduler() {
        return this.quartzScheduler;
    }

    public static class EmbeddedSchedulerJob
    implements Job {
        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            SessionHandle sessionHandle = null;
            OperationHandle operationHandle = null;
            SqlGatewayRestClient gatewayRestClient = null;
            try {
                JobDataMap dataMap = context.getJobDetail().getJobDataMap();
                String workflowJsonStr = dataMap.getString("workflowInfo");
                WorkflowInfo workflowInfo = QuartzSchedulerUtils.fromJson(workflowJsonStr, WorkflowInfo.class);
                LOG.info("Execute refresh operation for workflow: {}.", (Object)workflowInfo);
                String schedulerTime = QuartzSchedulerUtils.dateToString(context.getScheduledFireTime());
                gatewayRestClient = new SqlGatewayRestClient(workflowInfo.getRestEndpointUrl());
                sessionHandle = gatewayRestClient.openSession(String.format("%s-quartz-refresh-session-%s", workflowInfo.getMaterializedTableIdentifier(), schedulerTime), workflowInfo.getInitConfig());
                operationHandle = gatewayRestClient.refreshMaterializedTable(sessionHandle, workflowInfo.getMaterializedTableIdentifier(), schedulerTime, workflowInfo.getDynamicOptions(), Collections.emptyMap(), workflowInfo.getExecutionConfig());
                List results = gatewayRestClient.fetchOperationAllResults(sessionHandle, operationHandle);
                String jobId = ((RowData)results.get(0)).getString(0).toString();
                LOG.info("Successfully execute refresh operation for materialized table: {} with job id: {}.", (Object)workflowInfo.getMaterializedTableIdentifier(), (Object)jobId);
                context.setResult("Successfully execute refresh operation for materialized table: " + workflowInfo.getMaterializedTableIdentifier() + " with job id: " + jobId);
            }
            catch (Exception e) {
                try {
                    LOG.error("Failed to execute refresh operation for workflow.", (Throwable)e);
                    throw new JobExecutionException(e.getMessage(), e);
                }
                catch (Throwable throwable) {
                    try {
                        if (gatewayRestClient == null) throw throwable;
                        if (operationHandle != null) {
                            gatewayRestClient.closeOperation(sessionHandle, operationHandle);
                        }
                        if (sessionHandle != null) {
                            gatewayRestClient.closeSession(sessionHandle);
                        }
                        gatewayRestClient.close();
                        throw throwable;
                    }
                    catch (Exception e2) {
                        LOG.error("Failed to close session.", (Throwable)e2);
                    }
                    throw throwable;
                }
            }
            try {
                if (gatewayRestClient == null) return;
                if (operationHandle != null) {
                    gatewayRestClient.closeOperation(sessionHandle, operationHandle);
                }
                if (sessionHandle != null) {
                    gatewayRestClient.closeSession(sessionHandle);
                }
                gatewayRestClient.close();
                return;
            }
            catch (Exception e) {
                LOG.error("Failed to close session.", (Throwable)e);
                return;
            }
        }

        private static class SqlGatewayRestClient
        implements AutoCloseable {
            private final String address;
            private final int port;
            private final RestClient restClient;

            private SqlGatewayRestClient(String endpointUrl) throws Exception {
                URL url = new URL(endpointUrl);
                this.address = url.getHost();
                this.port = url.getPort();
                this.restClient = RestClient.forUrl(new Configuration(), Executors.directExecutor(), url);
            }

            private SessionHandle openSession(String sessionName, Map<String, String> initConfig) throws Exception {
                OpenSessionRequestBody requestBody = new OpenSessionRequestBody(sessionName, initConfig);
                OpenSessionHeaders headers = OpenSessionHeaders.getInstance();
                OpenSessionResponseBody responseBody = (OpenSessionResponseBody)this.restClient.sendRequest(this.address, this.port, headers, EmptyMessageParameters.getInstance(), requestBody).get();
                return new SessionHandle(UUID.fromString(responseBody.getSessionHandle()));
            }

            private void closeSession(SessionHandle sessionHandle) throws Exception {
                CloseSessionHeaders closeSessionHeaders = CloseSessionHeaders.getInstance();
                SessionMessageParameters sessionMessageParameters = new SessionMessageParameters(sessionHandle);
                this.restClient.sendRequest(this.address, this.port, closeSessionHeaders, sessionMessageParameters, EmptyRequestBody.getInstance()).get();
            }

            private void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
                CloseOperationHeaders closeOperationHeaders = CloseOperationHeaders.getInstance();
                OperationMessageParameters operationMessageParameters = new OperationMessageParameters(sessionHandle, operationHandle);
                this.restClient.sendRequest(this.address, this.port, closeOperationHeaders, operationMessageParameters, EmptyRequestBody.getInstance()).get();
            }

            private OperationHandle refreshMaterializedTable(SessionHandle sessionHandle, String materializedTableIdentifier, String schedulerTime, Map<String, String> dynamicOptions, Map<String, String> staticPartitions, Map<String, String> executionConfig) throws Exception {
                RefreshMaterializedTableRequestBody requestBody = new RefreshMaterializedTableRequestBody(true, schedulerTime, dynamicOptions, staticPartitions, executionConfig);
                RefreshMaterializedTableHeaders headers = RefreshMaterializedTableHeaders.getInstance();
                RefreshMaterializedTableParameters parameters = new RefreshMaterializedTableParameters(sessionHandle, materializedTableIdentifier);
                RefreshMaterializedTableResponseBody responseBody = (RefreshMaterializedTableResponseBody)this.restClient.sendRequest(this.address, this.port, headers, parameters, requestBody).get();
                return new OperationHandle(UUID.fromString(responseBody.getOperationHandle()));
            }

            private List<RowData> fetchOperationAllResults(SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
                Long token = 0L;
                ArrayList<RowData> results = new ArrayList<RowData>();
                while (token != null) {
                    FetchResultsResponseBody responseBody = this.fetchOperationResults(sessionHandle, operationHandle, token);
                    if (responseBody instanceof NotReadyFetchResultResponse) {
                        Thread.sleep(10L);
                        continue;
                    }
                    responseBody.getNextResultUri();
                    results.addAll(responseBody.getResults().getData());
                    token = SqlGatewayRestEndpointUtils.parseToken(responseBody.getNextResultUri());
                }
                return results;
            }

            private FetchResultsResponseBody fetchOperationResults(SessionHandle sessionHandle, OperationHandle operationHandle, Long token) throws Exception {
                FetchResultsMessageParameters fetchResultsMessageParameters = new FetchResultsMessageParameters(sessionHandle, operationHandle, token, RowFormat.JSON);
                FetchResultsHeaders fetchResultsHeaders = FetchResultsHeaders.getDefaultInstance();
                CompletableFuture response = this.restClient.sendRequest(this.address, this.port, fetchResultsHeaders, fetchResultsMessageParameters, EmptyRequestBody.getInstance());
                return (FetchResultsResponseBody)response.get();
            }

            @Override
            public void close() {
                try {
                    this.restClient.close();
                }
                catch (Exception e) {
                    LOG.error("Failed to close rest client.", (Throwable)e);
                }
            }
        }
    }
}

