/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TestUtil;
import com.hazelcast.jet.impl.JetClientInstanceImpl;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.test.TestBatchSqlConnector;
import com.hazelcast.sql.HazelcastSqlException;
import com.hazelcast.sql.SqlService;
import java.util.List;
import java.util.stream.Stream;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class SqlJobManagementTest
extends SqlTestSupport {
    private static final String COMPLETED_JOB_NAME = "completedJob";
    private static SqlService sqlService;

    @BeforeClass
    public static void beforeClass() {
        SqlJobManagementTest.initializeWithClient((int)1, null, null);
        sqlService = SqlJobManagementTest.instance().getSql();
    }

    @Test
    public void when_streamingDmlWithoutCreateJob_then_fail() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        Assertions.assertThatThrownBy(() -> sqlService.execute("SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0])).hasMessageContaining("You must use CREATE JOB statement for a streaming DML query");
    }

    @Test
    public void when_ddlStatementWithCreateJob_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB job AS CREATE MAPPING src TYPE TestStream", new Object[0])).hasMessageContaining("Encountered \"CREATE\" at line 1, column 19");
    }

    @Test
    public void when_dqlStatementWithCreateJob_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB job AS SELECT 42 FROM my_map", new Object[0])).hasMessageContaining("Encountered \"SELECT\" at line 1, column 19." + System.lineSeparator() + "Was expecting one of:" + System.lineSeparator() + "    \"INSERT\" ..." + System.lineSeparator() + "    \"SINK\" ...");
    }

    @Test
    public void when_createOrReplaceJob_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE OR REPLACE JOB fooJob AS INSERT INTO t1 SELECT * FROM t2", new Object[0])).hasMessageContaining("OR REPLACE is not supported for CREATE JOB");
    }

    @Test
    public void when_createJobUnknownOption_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB foo OPTIONS ('badOption'='value') AS INSERT INTO t1 VALUES(1)", new Object[0])).hasMessage("From line 1, column 25 to line 1, column 35: Unknown job option: badOption");
    }

    @Test
    public void when_createJobDuplicateOption_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB foo OPTIONS ('autoScaling'='false', 'autoScaling'='false') AS INSERT INTO t1 VALUES(1)", new Object[0])).hasMessageContaining("Option 'autoScaling' specified more than once");
    }

    @Test
    public void when_snapshotIntervalNotNumber_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB foo OPTIONS ('snapshotIntervalMillis'='foo') AS INSERT INTO t1 VALUES(1)", new Object[0])).hasMessage("From line 1, column 50 to line 1, column 54: Invalid number for snapshotIntervalMillis: foo");
    }

    @Test
    public void when_badProcessingGuarantee_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB foo OPTIONS ('processingGuarantee'='foo') AS INSERT INTO t1 VALUES(1)", new Object[0])).hasMessage("From line 1, column 47 to line 1, column 51: Unsupported value for processingGuarantee: foo");
    }

    @Test
    public void when_wrongProcessingGuaranteeForBatchJob_then_fail() {
        SqlJobManagementTest.createMapping("t1", Long.class, Long.class);
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB foo OPTIONS ('processingGuarantee'='exactlyOnce') AS INSERT INTO t1 VALUES(1, 1)", new Object[0])).hasMessage("Only NONE guarantee is allowed for batch job");
    }

    @Test
    public void when_jobSubmitted_then_exists() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Job testJob = SqlJobManagementTest.instance().getJet().getJob("testJob");
        Assert.assertNotNull((String)"job doesn't exist", (Object)testJob);
        sqlService.execute("DROP JOB testJob", new Object[0]);
    }

    @Test
    public void when_jobDropped_then_markedAsUserCancelled() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        sqlService.execute("DROP JOB testJob", new Object[0]);
        Job testJob = SqlJobManagementTest.instance().getJet().getJob("testJob");
        Assert.assertNotNull((String)"job doesn't exist", (Object)testJob);
        Assert.assertTrue((String)"job not user cancelled", (boolean)testJob.isUserCancelled());
    }

    @Test
    public void when_duplicateName_then_fails() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0])).hasMessageContaining("Another active job with equal name (testJob) exists");
    }

    @Test
    public void when_duplicateName_and_ifNotExists_then_secondSubmissionIgnored() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Assert.assertEquals((long)1L, (long)this.countActiveJobs());
        sqlService.execute("CREATE JOB IF NOT EXISTS testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Assert.assertEquals((long)1L, (long)this.countActiveJobs());
    }

    @Test
    public void when_dropNonExistingJob_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("DROP JOB nonExistingJob", new Object[0])).hasMessageContaining("Job doesn't exist: nonExistingJob");
    }

    @Test
    public void when_dropNonExistingJob_and_ifExists_then_ignore() {
        sqlService.execute("DROP JOB IF EXISTS nonExistingJob", new Object[0]);
    }

    @Test
    public void when_dropCompletedJob_then_fail() {
        this.createCompletedJob();
        Assertions.assertThatThrownBy(() -> sqlService.execute("DROP JOB completedJob", new Object[0])).hasMessage("Job already terminated: completedJob");
    }

    @Test
    public void when_dropCompletedJob_and_ifExists_then_ignore() {
        this.createCompletedJob();
        sqlService.execute("DROP JOB IF EXISTS completedJob", new Object[0]);
    }

    @Test
    public void when_dropJobWithParameters_then_fail() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sqlService.execute("DROP JOB j", new Object[]{"param"})).isInstanceOf(HazelcastSqlException.class)).hasMessage("DROP JOB does not support dynamic parameters");
    }

    @Test
    public void test_jobOptions() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob OPTIONS ('processingGuarantee'='exactlyOnce','snapshotIntervalMillis'='6000','autoScaling'='false','splitBrainProtectionEnabled'='true','metricsEnabled'='false','initialSnapshotName'='fooSnapshot','storeMetricsAfterJobCompletion'='true','maxProcessorAccumulatedRecords'='10')AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        JobConfig config = SqlJobManagementTest.instance().getJet().getJob("testJob").getConfig();
        Assert.assertEquals((Object)ProcessingGuarantee.EXACTLY_ONCE, (Object)config.getProcessingGuarantee());
        Assert.assertEquals((long)6000L, (long)config.getSnapshotIntervalMillis());
        Assert.assertFalse((String)"isAutoScaling", (boolean)config.isAutoScaling());
        Assert.assertTrue((String)"isSplitBrainProtectionEnabled", (boolean)config.isSplitBrainProtectionEnabled());
        Assert.assertFalse((String)"isMetricsEnabled", (boolean)config.isMetricsEnabled());
        Assert.assertEquals((Object)"fooSnapshot", (Object)config.getInitialSnapshotName());
        Assert.assertEquals((long)10L, (long)config.getMaxProcessorAccumulatedRecords());
    }

    @Test
    public void test_insert() {
        TestBatchSqlConnector.create(sqlService, "src", 3);
        SqlJobManagementTest.createMapping("dest", Integer.class, String.class);
        sqlService.execute("CREATE JOB testJob AS INSERT INTO dest SELECT v * 2, 'value-' || v FROM src WHERE v < 2", new Object[0]);
        Job job = SqlJobManagementTest.instance().getJet().getJob("testJob");
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.COMPLETED);
        SqlJobManagementTest.assertMapEventually("dest", "SELECT * FROM dest", TestUtil.createMap((Object[])new Object[]{0, "value-0", 2, "value-1"}));
    }

    @Test
    public void test_insertFromValues() {
        SqlJobManagementTest.createMapping("dest", Integer.class, String.class);
        sqlService.execute("CREATE JOB testJob AS INSERT INTO dest SELECT * FROM (VALUES (1, '1'))", new Object[0]);
        Job job = SqlJobManagementTest.instance().getJet().getJob("testJob");
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.COMPLETED);
        SqlJobManagementTest.assertMapEventually("dest", "SELECT * FROM dest", TestUtil.createMap((Object[])new Object[]{1, "1"}));
    }

    @Test
    public void test_sink() {
        TestBatchSqlConnector.create(sqlService, "src", 3);
        SqlJobManagementTest.createMapping("dest", Integer.class, String.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v * 2, 'value-' || v FROM src WHERE v > 0", new Object[0]);
        Job job = SqlJobManagementTest.instance().getJet().getJob("testJob");
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.COMPLETED);
        SqlJobManagementTest.assertMapEventually("dest", "SELECT * FROM dest", TestUtil.createMap((Object[])new Object[]{2, "value-1", 4, "value-2"}));
    }

    @Test
    public void test_sinkFromValues() {
        SqlJobManagementTest.createMapping("dest", Integer.class, String.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT * FROM (VALUES (1, '1'), (2, '2'))", new Object[0]);
        Job job = SqlJobManagementTest.instance().getJet().getJob("testJob");
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.COMPLETED);
        SqlJobManagementTest.assertMapEventually("dest", "SELECT * FROM dest", TestUtil.createMap((Object[])new Object[]{1, "1", 2, "2"}));
    }

    @Test
    public void test_dynamicParameters() {
        TestBatchSqlConnector.create(sqlService, "src", 3);
        SqlJobManagementTest.createMapping("dest", Integer.class, String.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v * ?, ? || v FROM src WHERE v > ?", new Object[]{2, "value-", 0});
        SqlJobManagementTest.assertMapEventually("dest", "SELECT * FROM dest", TestUtil.createMap((Object[])new Object[]{2, "value-1", 4, "value-2"}));
    }

    @Test
    public void when_clientDisconnects_then_jobContinues() {
        HazelcastInstance client = SqlJobManagementTest.factory().newHazelcastClient();
        SqlService sqlService = client.getSql();
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Job job = SqlJobManagementTest.instance().getJet().getJob("testJob");
        Assert.assertNotNull((Object)job);
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.RUNNING);
        client.shutdown();
        SqlJobManagementTest.sleepSeconds((int)1);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)job.getStatus());
    }

    @Test
    public void test_suspendResume() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Job job = SqlJobManagementTest.instance().getJet().getJob("testJob");
        long executionId = JobAssertions.assertThat((Job)job).eventuallyJobRunning(SqlJobManagementTest.instance(), null);
        sqlService.execute("ALTER JOB testJob SUSPEND", new Object[0]);
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.SUSPENDED);
        sqlService.execute("ALTER JOB testJob RESUME", new Object[0]);
        executionId = JobAssertions.assertThat((Job)job).eventuallyJobRunning(SqlJobManagementTest.instance(), Long.valueOf(executionId));
        sqlService.execute("ALTER JOB testJob RESTART", new Object[0]);
        JobAssertions.assertThat((Job)job).eventuallyJobRunning(SqlJobManagementTest.instance(), Long.valueOf(executionId));
    }

    @Test
    public void when_suspendResumeNonExistingJob_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB foo SUSPEND", new Object[0])).hasMessageContaining("The job 'foo' doesn't exist");
        Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB foo RESUME", new Object[0])).hasMessageContaining("The job 'foo' doesn't exist");
        Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB foo RESTART", new Object[0])).hasMessageContaining("The job 'foo' doesn't exist");
    }

    @Test
    public void when_suspendResumeCompletedJob_then_fail() {
        this.createCompletedJob();
        Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB completedJob SUSPEND", new Object[0])).hasMessageMatching("Cannot SUSPEND_GRACEFUL job [0-9a-f\\-]{19} because it already has a result: .*");
        Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB completedJob RESUME", new Object[0])).hasMessage("Job already completed");
        Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB completedJob RESTART", new Object[0])).hasMessageMatching("Cannot RESTART_GRACEFUL job [0-9a-f\\-]{19} because it already has a result: .*");
    }

    @Test
    public void when_alterJobWithParameters_then_fail() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB j SUSPEND", new Object[]{"param"})).isInstanceOf(HazelcastSqlException.class)).hasMessage("ALTER JOB does not support dynamic parameters");
    }

    @Test
    public void when_alterJobWithUnsupportedOptions_then_fail() {
        Stream.of("processingGuarantee", "initialSnapshotName").forEach(option -> ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sqlService.execute("ALTER JOB j OPTIONS ('" + option + "'='value')", new Object[0])).isInstanceOf(HazelcastSqlException.class)).hasMessageContaining(option + " is not supported for ALTER JOB"));
    }

    @Test
    public void when_snapshotExportWithoutOrReplace_then_orReplaceRequired() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE SNAPSHOT mySnapshot FOR JOB testJob", new Object[0])).hasMessageContaining("The OR REPLACE option is required for CREATE SNAPSHOT");
    }

    @Test
    public void when_createSnapshotWithParameters_then_fail() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE OR REPLACE SNAPSHOT s FOR JOB j", new Object[]{"param"})).isInstanceOf(HazelcastSqlException.class)).hasMessage("CREATE SNAPSHOT does not support dynamic parameters");
    }

    @Test
    public void when_snapshotExport_then_failNotEnterprise() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE OR REPLACE SNAPSHOT mySnapshot FOR JOB testJob", new Object[0])).hasMessageContaining("You need Hazelcast Enterprise to use this feature");
    }

    @Test
    public void when_snapshotExport_jobDoesNotExist_then_fail() {
        Assertions.assertThatThrownBy(() -> sqlService.execute("CREATE OR REPLACE SNAPSHOT mySnapshot FOR JOB nonExistentJob", new Object[0])).hasMessageContaining("The job 'nonExistentJob' doesn't exist");
    }

    @Test
    public void when_dropSnapshotWithParameters_then_fail() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> sqlService.execute("DROP SNAPSHOT s", new Object[]{"param"})).isInstanceOf(HazelcastSqlException.class)).hasMessage("DROP SNAPSHOT does not support dynamic parameters");
    }

    @Test
    public void when_dropJobWithSnapshot_then_failNotEnterprise() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Assertions.assertThatThrownBy(() -> sqlService.execute("DROP JOB testJob WITH SNAPSHOT mySnapshot", new Object[0])).hasMessageContaining("You need Hazelcast Enterprise to use this feature");
    }

    @Test
    public void test_createDropJobInQuickSuccession() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        for (int i = 0; i < 10; ++i) {
            sqlService.execute("CREATE JOB testJob AS SINK INTO dest SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
            sqlService.execute("DROP JOB testJob", new Object[0]);
        }
    }

    @Test
    public void test_planCache() {
        SqlJobManagementTest.createMapping("target", Long.class, Long.class);
        sqlService.execute("CREATE JOB job AS SINK INTO target SELECT v, v FROM TABLE(GENERATE_STREAM(100))", new Object[0]);
        Assertions.assertThat((int)SqlJobManagementTest.planCache(SqlJobManagementTest.instance()).size()).isEqualTo(1);
        sqlService.execute("DROP MAPPING target", new Object[0]);
        Assertions.assertThat((int)SqlJobManagementTest.planCache(SqlJobManagementTest.instance()).size()).isZero();
    }

    @Test
    public void test_listRunningJobWithSqlSummary() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        String sql = "CREATE JOB job as SINK INTO dest SELECT v, v from table(generate_stream(1))";
        sqlService.execute("CREATE JOB job as SINK INTO dest SELECT v, v from table(generate_stream(1))", new Object[0]);
        List jobSummaries = ((JetClientInstanceImpl)SqlJobManagementTest.client().getJet()).getJobAndSqlSummaryList();
        Assertions.assertThat((List)jobSummaries).hasOnlyOneElementSatisfying(jobSummary -> {
            Assertions.assertThat((Object)jobSummary.getSqlSummary()).isNotNull();
            Assert.assertEquals((Object)"CREATE JOB job as SINK INTO dest SELECT v, v from table(generate_stream(1))", (Object)jobSummary.getSqlSummary().getQuery());
            Assert.assertEquals((Object)Boolean.TRUE, (Object)jobSummary.getSqlSummary().isUnbounded());
        });
    }

    @Test
    public void test_listFinishedJobWithSqlSummary() {
        SqlJobManagementTest.createMapping("dest", Long.class, Long.class);
        String sql = "CREATE JOB job as SINK INTO dest SELECT v, v from table(generate_series(1, 2))";
        sqlService.execute("CREATE JOB job as SINK INTO dest SELECT v, v from table(generate_series(1, 2))", new Object[0]);
        SqlJobManagementTest.instance().getJet().getJob("job").join();
        List jobSummaries = ((JetClientInstanceImpl)SqlJobManagementTest.client().getJet()).getJobAndSqlSummaryList();
        Assertions.assertThat((List)jobSummaries).hasOnlyOneElementSatisfying(jobSummary -> {
            Assertions.assertThat((Object)jobSummary.getSqlSummary()).isNotNull();
            Assert.assertEquals((Object)"CREATE JOB job as SINK INTO dest SELECT v, v from table(generate_series(1, 2))", (Object)jobSummary.getSqlSummary().getQuery());
            Assert.assertEquals((Object)Boolean.FALSE, (Object)jobSummary.getSqlSummary().isUnbounded());
        });
    }

    private void createCompletedJob() {
        TestBatchSqlConnector.create(sqlService, "t", 1);
        SqlJobManagementTest.createMapping("m", Integer.class, Integer.class);
        sqlService.execute("create job completedJob as sink into m select v, v from t", new Object[0]);
        Job job = SqlJobManagementTest.instance().getJet().getJob(COMPLETED_JOB_NAME);
        Assert.assertNotNull((Object)job);
        job.join();
    }

    private long countActiveJobs() {
        return SqlJobManagementTest.instance().getJet().getJobs().stream().filter(j -> !j.getStatus().isTerminal()).count();
    }
}

