/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.watcher.it;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.AsyncRunner;
import com.google.cloud.spanner.AsyncTransactionManager;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.CommitTimestampRepository;
import com.google.cloud.spanner.watcher.DatabaseClientWithChangeSets;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerTableChangeSetPoller;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Arrays;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class ITSpannerTableChangeSetPollerTest {
    private static final Logger logger = Logger.getLogger(ITSpannerTableChangeSetPollerTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();
    private static Database database;
    private final Queue<Struct> receivedChanges = new ConcurrentLinkedQueue<Struct>();
    private volatile Timestamp lastCommitTimestamp;
    private volatile CountDownLatch latch = new CountDownLatch(0);

    @BeforeClass
    public static void setup() throws Exception {
        SpannerTestHelper.setupSpanner(env);
        database = env.createTestDb((Iterable<String>)ImmutableList.of((Object)"CREATE TABLE NUMBERS (ID INT64 NOT NULL, NAME STRING(100), CHANGE_SET_ID STRING(MAX)) PRIMARY KEY (ID)", (Object)"CREATE TABLE CHANGE_SETS (CHANGE_SET_ID STRING(MAX), COMMIT_TIMESTAMP TIMESTAMP OPTIONS (allow_commit_timestamp=true)) PRIMARY KEY (CHANGE_SET_ID)", (Object)"CREATE INDEX IDX_NUMBERS_CHANGE_SET ON NUMBERS (CHANGE_SET_ID)"));
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    @Before
    public void deleteRowsInNumbers() {
        Spanner spanner = env.getSpanner();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        client.writeAtLeastOnce((Iterable)ImmutableList.of((Object)Mutation.delete((String)"NUMBERS", (KeySet)KeySet.all())));
    }

    @Test
    public void testSpannerChangeSetPoller() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerTableChangeSetPoller poller = SpannerTableChangeSetPoller.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)database.getId(), (String)"NUMBERS")).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        poller.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                logger.info(String.format("Received changed for table %s: %s", table, row.asStruct().toString()));
                ITSpannerTableChangeSetPollerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerTableChangeSetPollerTest.this.lastCommitTimestamp = commitTimestamp;
                ITSpannerTableChangeSetPollerTest.this.latch.countDown();
            }
        });
        poller.startAsync().awaitRunning();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        this.latch = new CountDownLatch(3);
        String changeSetId = UUID.randomUUID().toString();
        Timestamp commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"CHANGE_SETS").set("CHANGE_SET_ID").to(changeSetId)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        ImmutableList<Struct> inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(3);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        this.latch = new CountDownLatch(2);
        changeSetId = UUID.randomUUID().toString();
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"CHANGE_SETS").set("CHANGE_SET_ID").to(changeSetId)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(2);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        this.latch = new CountDownLatch(2);
        changeSetId = UUID.randomUUID().toString();
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"CHANGE_SETS").set("CHANGE_SET_ID").to(changeSetId)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        commitTs = client.writeAtLeastOnce(Arrays.asList(Mutation.delete((String)"NUMBERS", (Key)Key.of((Object[])new Object[]{2L})), Mutation.delete((String)"NUMBERS", (Key)Key.of((Object[])new Object[]{3L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        poller.stopAsync().awaitTerminated();
    }

    @Test
    public void testSpannerChangeSetPollerUsingChangeSetDatabaseClient() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerTableChangeSetPoller poller = SpannerTableChangeSetPoller.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)database.getId(), (String)"NUMBERS")).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        poller.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                logger.info(String.format("Received changed for table %s: %s", table, row.asStruct().toString()));
                ITSpannerTableChangeSetPollerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerTableChangeSetPollerTest.this.lastCommitTimestamp = commitTimestamp;
                ITSpannerTableChangeSetPollerTest.this.latch.countDown();
            }
        });
        poller.startAsync().awaitRunning();
        DatabaseClientWithChangeSets client = DatabaseClientWithChangeSets.of((DatabaseClient)spanner.getDatabaseClient(database.getId()));
        this.latch = new CountDownLatch(3);
        String changeSetId = client.newChangeSetId();
        Timestamp commitTs = client.writeAtLeastOnce(changeSetId, Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        ImmutableList<Struct> inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(3);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        this.latch = new CountDownLatch(2);
        changeSetId = client.newChangeSetId();
        commitTs = client.writeAtLeastOnce(changeSetId, Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(2);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        this.latch = new CountDownLatch(2);
        changeSetId = client.newChangeSetId();
        commitTs = client.writeAtLeastOnce(changeSetId, Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(changeSetId)).build()));
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(changeSetId)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(changeSetId)).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        this.testMutationsWithTransactionRunner(client);
        this.testDmlWithTransactionRunner(client);
        this.testMutationsWithTransactionManager(client);
        this.testDmlWithTransactionManager(client);
        this.testMutationsWithAsyncRunner(client);
        this.testDmlWithAsyncRunner(client);
        this.testMutationsWithAsyncTransactionManager(client);
        this.testDmlWithAsyncTransactionManager(client);
        commitTs = client.writeAtLeastOnce(Arrays.asList(Mutation.delete((String)"NUMBERS", (Key)Key.of((Object[])new Object[]{2L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        poller.stopAsync().awaitTerminated();
    }

    private void testMutationsWithTransactionRunner(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        final DatabaseClientWithChangeSets.TransactionRunnerWithChangeSet runner = client.readWriteTransaction(new Options.TransactionOption[0]);
        runner.run((TransactionRunner.TransactionCallable)new TransactionRunner.TransactionCallable<Void>(){

            public Void run(TransactionContext transaction) throws Exception {
                transaction.buffer(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build()));
                return null;
            }
        });
        Timestamp commitTs = runner.getCommitTimestamp();
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
    }

    private void testDmlWithTransactionRunner(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        final DatabaseClientWithChangeSets.TransactionRunnerWithChangeSet runner = client.readWriteTransaction(new Options.TransactionOption[0]);
        runner.run((TransactionRunner.TransactionCallable)new TransactionRunner.TransactionCallable<Void>(){

            public Void run(TransactionContext transaction) throws Exception {
                String sql = "UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id";
                transaction.batchUpdate(Arrays.asList(((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Tre")).bind("id").to(3L)).bind("changeSet").to(runner.getChangeSetId())).build(), ((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Fire")).bind("id").to(4L)).bind("changeSet").to(runner.getChangeSetId())).build()), new Options.UpdateOption[0]);
                return null;
            }
        });
        Timestamp commitTs = runner.getCommitTimestamp();
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tre")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Fire")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
    }

    private void testMutationsWithTransactionManager(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        try (DatabaseClientWithChangeSets.TransactionManagerWithChangeSet manager = client.transactionManager(new Options.TransactionOption[0]);){
            TransactionContext txn = manager.begin();
            while (true) {
                try {
                    txn.buffer(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build()));
                    manager.commit();
                }
                catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000L);
                    txn = manager.resetForRetry();
                    continue;
                }
                break;
            }
            Timestamp commitTs = manager.getCommitTimestamp();
            ImmutableList<Struct> updates = this.drainChanges();
            Truth.assertThat(updates).hasSize(2);
            Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build()});
            Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        }
    }

    private void testDmlWithTransactionManager(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        try (DatabaseClientWithChangeSets.TransactionManagerWithChangeSet manager = client.transactionManager(new Options.TransactionOption[0]);){
            TransactionContext txn = manager.begin();
            while (true) {
                try {
                    String sql = "UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id";
                    txn.batchUpdate(Arrays.asList(((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Tres")).bind("id").to(3L)).bind("changeSet").to(manager.getChangeSetId())).build(), ((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Cuatro")).bind("id").to(4L)).bind("changeSet").to(manager.getChangeSetId())).build()), new Options.UpdateOption[0]);
                    manager.commit();
                }
                catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000L);
                    txn = manager.resetForRetry();
                    continue;
                }
                break;
            }
            Timestamp commitTs = manager.getCommitTimestamp();
            ImmutableList<Struct> updates = this.drainChanges();
            Truth.assertThat(updates).hasSize(2);
            Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tres")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Cuatro")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build()});
            Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        }
    }

    private void testMutationsWithAsyncRunner(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        ExecutorService exec = Executors.newSingleThreadExecutor();
        final DatabaseClientWithChangeSets.AsyncRunnerWithChangeSet runner = client.runAsync(new Options.TransactionOption[0]);
        runner.runAsync((AsyncRunner.AsyncWork)new AsyncRunner.AsyncWork<Void>(){

            public ApiFuture<Void> doWorkAsync(TransactionContext txn) {
                txn.buffer(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build()));
                return ApiFutures.immediateFuture(null);
            }
        }, (Executor)exec);
        ApiFuture commitTs = runner.getCommitTimestamp();
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo(commitTs.get());
        exec.shutdown();
    }

    private void testDmlWithAsyncRunner(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        ExecutorService exec = Executors.newSingleThreadExecutor();
        final DatabaseClientWithChangeSets.AsyncRunnerWithChangeSet runner = client.runAsync(new Options.TransactionOption[0]);
        runner.runAsync((AsyncRunner.AsyncWork)new AsyncRunner.AsyncWork<Void>(){

            public ApiFuture<Void> doWorkAsync(TransactionContext txn) {
                String sql = "UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id";
                txn.batchUpdateAsync(Arrays.asList(((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Tre")).bind("id").to(3L)).bind("changeSet").to(runner.getChangeSetId())).build(), ((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Fire")).bind("id").to(4L)).bind("changeSet").to(runner.getChangeSetId())).build()), new Options.UpdateOption[0]);
                return ApiFutures.immediateFuture(null);
            }
        }, (Executor)exec);
        ApiFuture commitTs = runner.getCommitTimestamp();
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tre")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Fire")).set("CHANGE_SET_ID").to(runner.getChangeSetId())).build()});
        Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo(commitTs.get());
        exec.shutdown();
    }

    private void testMutationsWithAsyncTransactionManager(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        try (DatabaseClientWithChangeSets.AsyncTransactionManagerWithChangeSet manager = client.transactionManagerAsync(new Options.TransactionOption[0]);){
            Timestamp commitTs;
            AsyncTransactionManager.TransactionContextFuture txn = manager.beginAsync();
            while (true) {
                try {
                    commitTs = txn.then((context, v) -> {
                        context.buffer(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build()));
                        return ApiFutures.immediateFuture(null);
                    }, MoreExecutors.directExecutor()).commitAsync().get();
                }
                catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000L);
                    txn = manager.resetForRetryAsync();
                    continue;
                }
                break;
            }
            ImmutableList<Struct> updates = this.drainChanges();
            Truth.assertThat(updates).hasSize(2);
            Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build()});
            Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        }
    }

    private void testDmlWithAsyncTransactionManager(DatabaseClientWithChangeSets client) throws Exception {
        this.latch = new CountDownLatch(2);
        try (DatabaseClientWithChangeSets.AsyncTransactionManagerWithChangeSet manager = client.transactionManagerAsync(new Options.TransactionOption[0]);){
            Timestamp commitTs;
            AsyncTransactionManager.TransactionContextFuture txn = manager.beginAsync();
            while (true) {
                try {
                    String sql = "UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id";
                    commitTs = txn.then((context, v) -> context.batchUpdateAsync(Arrays.asList(((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Tres")).bind("id").to(3L)).bind("changeSet").to(manager.getChangeSetId())).build(), ((Statement.Builder)((Statement.Builder)((Statement.Builder)Statement.newBuilder((String)sql).bind("name").to("Cuatro")).bind("id").to(4L)).bind("changeSet").to(manager.getChangeSetId())).build()), new Options.UpdateOption[0]), MoreExecutors.directExecutor()).commitAsync().get();
                }
                catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000L);
                    txn = manager.resetForRetryAsync();
                    continue;
                }
                break;
            }
            ImmutableList<Struct> updates = this.drainChanges();
            Truth.assertThat(updates).hasSize(2);
            Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tres")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Cuatro")).set("CHANGE_SET_ID").to(manager.getChangeSetId())).build()});
            Truth.assertThat((Comparable)this.lastCommitTimestamp).isEqualTo((Object)commitTs);
        }
    }

    private ImmutableList<Struct> drainChanges() throws Exception {
        Truth.assertThat((Boolean)this.latch.await(10L, TimeUnit.SECONDS)).isTrue();
        ImmutableList changes = ImmutableList.copyOf(this.receivedChanges);
        this.receivedChanges.clear();
        return changes;
    }
}

