/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.watcher.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.CommitTimestampRepository;
import com.google.cloud.spanner.watcher.FixedShardProvider;
import com.google.cloud.spanner.watcher.ShardProvider;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.SpannerTableTailer;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.TimebasedShardProvider;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class ITSpannerTableTailerTest {
    private static final Logger logger = Logger.getLogger(ITSpannerTableTailerTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();
    private static Database database;
    private final Queue<Struct> receivedChanges = new ConcurrentLinkedQueue<Struct>();
    private volatile CountDownLatch latch = new CountDownLatch(0);

    @BeforeClass
    public static void setup() throws Exception {
        SpannerTestHelper.setupSpanner(env);
        database = env.createTestDb((Iterable<String>)ImmutableList.of((Object)"CREATE TABLE NUMBERS (ID INT64 NOT NULL, NAME STRING(100), LAST_MODIFIED TIMESTAMP OPTIONS (allow_commit_timestamp=true)) PRIMARY KEY (ID)", (Object)"CREATE TABLE NUMBERS_WITH_SHARDS (ID INT64 NOT NULL, NAME STRING(100), SHARD_ID STRING(MAX), LAST_MODIFIED TIMESTAMP OPTIONS (allow_commit_timestamp=true)) PRIMARY KEY (ID)", (Object)"CREATE INDEX IDX_NUMBERS_SHARDS ON NUMBERS_WITH_SHARDS (SHARD_ID, LAST_MODIFIED DESC)"));
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    @Before
    public void deleteRowsInNumbersWithShards() {
        Spanner spanner = env.getSpanner();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        client.writeAtLeastOnce((Iterable)ImmutableList.of((Object)Mutation.delete((String)"NUMBERS_WITH_SHARDS", (KeySet)KeySet.all())));
    }

    @Test
    public void testSpannerTailer() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)database.getId(), (String)"NUMBERS")).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                logger.info(String.format("Received changed for table %s: %s", table, row.asStruct().toString()));
                ITSpannerTableTailerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerTableTailerTest.this.latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        this.latch = new CountDownLatch(3);
        Timestamp commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("ONE")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(2L)).set("NAME").to("TWO")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(3L)).set("NAME").to("THREE")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(3);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("LAST_MODIFIED").to(commitTs)).build()});
        this.latch = new CountDownLatch(2);
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(4L)).set("NAME").to("FOUR")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("FIVE")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(2);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("LAST_MODIFIED").to(commitTs)).build()});
        this.latch = new CountDownLatch(2);
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(1L)).set("NAME").to("one")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS").set("ID").to(5L)).set("NAME").to("five")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("LAST_MODIFIED").to(commitTs)).build()});
        commitTs = client.writeAtLeastOnce(Arrays.asList(Mutation.delete((String)"NUMBERS", (Key)Key.of((Object[])new Object[]{2L})), Mutation.delete((String)"NUMBERS", (Key)Key.of((Object[])new Object[]{3L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        tailer.stopAsync().awaitTerminated();
    }

    @Test
    public void testSpannerTailerWithTimebasedShard() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)database.getId(), (String)"NUMBERS_WITH_SHARDS")).setShardProvider((ShardProvider)TimebasedShardProvider.create((String)"SHARD_ID", (TimebasedShardProvider.Interval)TimebasedShardProvider.Interval.DAY)).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                logger.info(String.format("Received changed for table %s: %s", table, row.asStruct().toString()));
                ITSpannerTableTailerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerTableTailerTest.this.latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        this.latch = new CountDownLatch(3);
        TimebasedShardProvider.TimebasedShardId currentShard = TimebasedShardProvider.Interval.DAY.getCurrentShardId(client.singleUse());
        Timestamp commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(1L)).set("NAME").to("ONE")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(2L)).set("NAME").to("TWO")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(3L)).set("NAME").to("THREE")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(3);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(commitTs)).build()});
        this.latch = new CountDownLatch(2);
        if (currentShard.shouldRefresh()) {
            currentShard = TimebasedShardProvider.Interval.DAY.getCurrentShardId(client.singleUse());
        }
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(4L)).set("NAME").to("FOUR")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(5L)).set("NAME").to("FIVE")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(2);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(commitTs)).build()});
        this.latch = new CountDownLatch(2);
        if (currentShard.shouldRefresh()) {
            currentShard = TimebasedShardProvider.Interval.DAY.getCurrentShardId(client.singleUse());
        }
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(1L)).set("NAME").to("one")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(5L)).set("NAME").to("five")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("SHARD_ID").to(currentShard.getValue())).set("LAST_MODIFIED").to(commitTs)).build()});
        commitTs = client.writeAtLeastOnce(Arrays.asList(Mutation.delete((String)"NUMBERS_WITH_SHARDS", (Key)Key.of((Object[])new Object[]{2L})), Mutation.delete((String)"NUMBERS_WITH_SHARDS", (Key)Key.of((Object[])new Object[]{3L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        tailer.stopAsync().awaitTerminated();
    }

    @Test
    public void testSpannerTailerWithFixedShard() throws Exception {
        ImmutableList shards = ImmutableList.of((Object)"EAST", (Object)"WEST");
        Spanner spanner = env.getSpanner();
        ArrayList<SpannerTableTailer> tailers = new ArrayList<SpannerTableTailer>();
        for (String shard : shards) {
            SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)database.getId(), (String)"NUMBERS_WITH_SHARDS")).setShardProvider((ShardProvider)FixedShardProvider.create((String)"SHARD_ID", (String)shard)).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
            tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

                public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                    logger.info(String.format("Received changed for table %s: %s", table, row.asStruct().toString()));
                    ITSpannerTableTailerTest.this.receivedChanges.add(row.asStruct());
                    ITSpannerTableTailerTest.this.latch.countDown();
                }
            });
            tailer.startAsync();
            tailers.add(tailer);
        }
        for (SpannerTableTailer tailer : tailers) {
            tailer.awaitRunning();
        }
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        this.latch = new CountDownLatch(3);
        Timestamp commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(1L)).set("NAME").to("ONE")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(2L)).set("NAME").to("TWO")).set("SHARD_ID").to((String)shards.get(1))).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(3L)).set("NAME").to("THREE")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(3);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("SHARD_ID").to((String)shards.get(1))).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(commitTs)).build()});
        this.latch = new CountDownLatch(2);
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(4L)).set("NAME").to("FOUR")).set("SHARD_ID").to((String)shards.get(1))).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(5L)).set("NAME").to("FIVE")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        inserts = this.drainChanges();
        Truth.assertThat(inserts).hasSize(2);
        Truth.assertThat(inserts).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("SHARD_ID").to((String)shards.get(1))).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(commitTs)).build()});
        this.latch = new CountDownLatch(2);
        commitTs = client.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(1L)).set("NAME").to("one")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newUpdateBuilder((String)"NUMBERS_WITH_SHARDS").set("ID").to(5L)).set("NAME").to("five")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> updates = this.drainChanges();
        Truth.assertThat(updates).hasSize(2);
        Truth.assertThat(updates).containsExactly(new Object[]{((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(commitTs)).build(), ((Struct.Builder)((Struct.Builder)((Struct.Builder)((Struct.Builder)Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("SHARD_ID").to((String)shards.get(0))).set("LAST_MODIFIED").to(commitTs)).build()});
        commitTs = client.writeAtLeastOnce(Arrays.asList(Mutation.delete((String)"NUMBERS_WITH_SHARDS", (Key)Key.of((Object[])new Object[]{2L})), Mutation.delete((String)"NUMBERS_WITH_SHARDS", (Key)Key.of((Object[])new Object[]{3L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        for (SpannerTableTailer tailer : tailers) {
            tailer.stopAsync();
        }
        for (SpannerTableTailer tailer : tailers) {
            tailer.awaitTerminated();
        }
    }

    private ImmutableList<Struct> drainChanges() throws Exception {
        Truth.assertThat((Boolean)this.latch.await(5L, TimeUnit.SECONDS)).isTrue();
        ImmutableList changes = ImmutableList.copyOf(this.receivedChanges);
        this.receivedChanges.clear();
        return changes;
    }
}

