/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.watcher.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.CommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerDatabaseChangeSetPoller;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import java.util.Arrays;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class ITSpannerDatabaseChangeSetPollerTest {
    private static final Logger logger = Logger.getLogger(ITSpannerDatabaseChangeSetPollerTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();
    private static Database database;
    private final Queue<Struct> receivedChanges = new ConcurrentLinkedQueue<Struct>();
    private volatile Timestamp lastCommitTimestamp;
    private volatile CountDownLatch latch = new CountDownLatch(0);

    @BeforeClass
    public static void setup() throws Exception {
        SpannerTestHelper.setupSpanner(env);
        database = env.createTestDb(Arrays.asList("CREATE TABLE NUMBERS1 (ID INT64 NOT NULL, NAME STRING(100), CHANGE_SET_ID STRING(MAX)) PRIMARY KEY (ID)", "CREATE TABLE NUMBERS2 (ID INT64 NOT NULL, NAME STRING(100), CHANGE_SET_ID STRING(MAX)) PRIMARY KEY (ID)", "CREATE TABLE CHANGE_SETS (CHANGE_SET_ID STRING(MAX), COMMIT_TIMESTAMP TIMESTAMP OPTIONS (allow_commit_timestamp=true)) PRIMARY KEY (CHANGE_SET_ID)", "CREATE INDEX IDX_NUMBERS1_CHANGE_SET ON NUMBERS1 (CHANGE_SET_ID)", "CREATE INDEX IDX_NUMBERS2_CHANGE_SET ON NUMBERS2 (CHANGE_SET_ID)"));
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    @Test
    public void testSpannerTailer() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerDatabaseChangeSetPoller watcher = SpannerDatabaseChangeSetPoller.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).allTables().setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        watcher.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                logger.info(String.format("Received changed for table %s: %s", table, row.asStruct().toString()));
                ITSpannerDatabaseChangeSetPollerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerDatabaseChangeSetPollerTest.this.lastCommitTimestamp = commitTimestamp;
                ITSpannerDatabaseChangeSetPollerTest.this.latch.countDown();
            }
        });
        watcher.startAsync().awaitRunning();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        this.latch = new CountDownLatch(3);
        String changeSetId = UUID.randomUUID().toString();
        Timestamp commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"CHANGE_SETS").set("CHANGE_SET_ID").to(changeSetId)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS1").set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS2").set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS1").set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        ImmutableList<Struct> inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(3);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        this.latch = new CountDownLatch(2);
        changeSetId = UUID.randomUUID().toString();
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"CHANGE_SETS").set("CHANGE_SET_ID").to(changeSetId)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS2").set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS1").set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(2);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        this.latch = new CountDownLatch(2);
        changeSetId = UUID.randomUUID().toString();
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"CHANGE_SETS").set("CHANGE_SET_ID").to(changeSetId)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS1").set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS1").set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        commitTs = client.writeAtLeastOnce(Arrays.asList(Mutation.delete((String)"NUMBERS2", (Key)Key.of((Object[])new Object[]{2L})), Mutation.delete((String)"NUMBERS1", (Key)Key.of((Object[])new Object[]{3L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        watcher.stopAsync().awaitTerminated();
    }

    private ImmutableList<Struct> drainChanges() throws Exception {
        Truth.assertThat((Boolean)this.latch.await(5L, TimeUnit.SECONDS)).isTrue();
        ImmutableList changes = ImmutableList.copyOf(this.receivedChanges);
        this.receivedChanges.clear();
        return changes;
    }
}

