/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.watcher;

import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.watcher.AbstractMockServerTest;
import com.google.cloud.spanner.watcher.CommitTimestampRepository;
import com.google.cloud.spanner.watcher.RandomResultSetGenerator;
import com.google.cloud.spanner.watcher.ShardProvider;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.SpannerTableTailer;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.TimebasedShardProvider;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.Type;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class SpannerTableTailerTest
extends AbstractMockServerTest {
    private static final int STRESS_TEST_RUNS = 1;

    @Test
    public void testReceiveChanges() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(10);
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"Foo")).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        tailer.stopAsync().awaitTerminated();
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)10);
    }

    @Test
    public void testTableNotFoundDuringInitialization() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"NonExistingTable")).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final SettableApiFuture res = SettableApiFuture.create();
        tailer.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                if (from != ApiService.State.STARTING) {
                    res.setException((Throwable)((Object)new AssertionError((Object)"expected from State to be STARTING")));
                }
                res.set((Object)Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        tailer.startAsync();
        Truth.assertThat((Boolean)((Boolean)res.get(5L, TimeUnit.SECONDS))).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTableDeleted() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(10);
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"Foo")).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        final SettableApiFuture res = SettableApiFuture.create();
        tailer.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                SpannerTableTailer.logger.warning(String.format("Database change watcher failed.%n    State before failure: %s%n    Error: %s%n", from, failure.getMessage()));
                if (from != ApiService.State.RUNNING) {
                    res.setException((Throwable)((Object)new AssertionError((Object)"expected from State to be RUNNING")));
                }
                res.set((Object)Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)10);
        Level currentLevel = SpannerTableTailer.logger.getLevel();
        try {
            SpannerTableTailer.logger.setLevel(Level.OFF);
            mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.exception((Statement)this.getCurrentFooPollStatement(), (StatusRuntimeException)Status.NOT_FOUND.withDescription("Table not found").asRuntimeException()));
            Truth.assertThat((Boolean)((Boolean)res.get(5L, TimeUnit.SECONDS))).isTrue();
        }
        finally {
            SpannerTableTailer.logger.setLevel(currentLevel);
        }
        Truth.assertThat((Comparable)tailer.state()).isEqualTo((Object)ApiService.State.FAILED);
    }

    @Test
    public void testStressReceiveMultipleChanges() throws Exception {
        Random random = new Random();
        for (int i = 0; i < 1; ++i) {
            Spanner spanner = this.getSpanner();
            DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
            TestChangeCallback callback = new TestChangeCallback(10);
            SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"Foo")).setPollInterval(Duration.ofMillis((long)1L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
            tailer.addCallback((SpannerTableChangeWatcher.RowChangeCallback)callback);
            tailer.startAsync();
            CountDownLatch latch = callback.getLatch();
            latch.await(5L, TimeUnit.SECONDS);
            Truth.assertThat((Integer)callback.receivedRows.get()).isEqualTo((Object)10);
            int expectedTotalChangeCount = 10;
            for (int change = 0; change < 50; ++change) {
                int numChanges = random.nextInt(10) + 1;
                expectedTotalChangeCount += numChanges;
                callback.setCountDown(numChanges);
                Timestamp lastSeenCommitTimestamp = callback.lastSeenCommitTimestamp;
                Timestamp nextCommitTimestamp = Timestamp.ofTimeSecondsAndNanos((long)(lastSeenCommitTimestamp.getSeconds() + 1L), (int)lastSeenCommitTimestamp.getNanos());
                Statement pollStatement1 = ((Statement.Builder)SELECT_FOO_STATEMENT.toBuilder().bind("prevCommitTimestamp").to(lastSeenCommitTimestamp)).build();
                Statement pollStatement2 = ((Statement.Builder)SELECT_FOO_STATEMENT.toBuilder().bind("prevCommitTimestamp").to(nextCommitTimestamp)).build();
                mockSpanner.putStatementResults(new MockSpannerServiceImpl.StatementResult[]{MockSpannerServiceImpl.StatementResult.query((Statement)pollStatement1, (ResultSet)new RandomResultSetGenerator(numChanges).generateWithFixedCommitTimestamp(nextCommitTimestamp)), MockSpannerServiceImpl.StatementResult.query((Statement)pollStatement2, (ResultSet)new RandomResultSetGenerator(0).generate())});
                latch = callback.getLatch();
                latch.await(5L, TimeUnit.SECONDS);
                Truth.assertThat((Integer)callback.receivedRows.get()).isEqualTo((Object)expectedTotalChangeCount);
            }
            tailer.stopAsync().awaitTerminated();
            if (i >= 0) continue;
            SpannerTableTailerTest.stopServer();
            SpannerTableTailerTest.startStaticServer();
            this.setupResults();
        }
    }

    @Test
    public void testCustomExecutor() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(10);
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(8);
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"Foo")).setExecutor(executor).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        tailer.stopAsync().awaitTerminated();
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)10);
        Truth.assertThat((Boolean)executor.isShutdown()).isFalse();
        executor.shutdown();
    }

    @Test
    public void testAutomaticSharding() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(5);
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"Foo")).setShardProvider((ShardProvider)TimebasedShardProvider.create((String)"ShardId", (TimebasedShardProvider.Interval)TimebasedShardProvider.Interval.DAY)).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        tailer.stopAsync().awaitTerminated();
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)5);
    }

    @Test
    public void testFixedCommitTimestampColumn() throws Exception {
        Timestamp ts = Timestamp.now();
        ResultSetMetadata metadata = RandomResultSetGenerator.METADATA.toBuilder().setRowType(RandomResultSetGenerator.METADATA.getRowType().toBuilder().setFields(RandomResultSetGenerator.METADATA.getRowType().getFieldsCount() - 1, StructType.Field.newBuilder().setName("AlternativeCommitTS").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()).build()).build()).build();
        Statement statement = ((Statement.Builder)Statement.newBuilder((String)"SELECT *\nFROM `Foo`\nWHERE `AlternativeCommitTS`>@prevCommitTimestamp\nORDER BY `AlternativeCommitTS`").bind("prevCommitTimestamp").to(Timestamp.MIN_VALUE)).build();
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)statement, (ResultSet)new RandomResultSetGenerator(1).generateWithFixedCommitTimestamp(ts).toBuilder().setMetadata(metadata).build()));
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)((Statement.Builder)statement.toBuilder().bind("prevCommitTimestamp").to(ts)).build(), (ResultSet)new RandomResultSetGenerator(0).generate().toBuilder().setMetadata(metadata).build()));
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(1);
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"Foo")).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setCommitTimestampColumn("AlternativeCommitTS").build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        tailer.stopAsync().awaitTerminated();
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)1);
    }

    @Test
    public void testInvalidCommitTimestampColumn() throws Exception {
        Statement statement = ((Statement.Builder)Statement.newBuilder((String)"SELECT *\nFROM `Foo`\nWHERE `AlternativeCommitTS`>@prevCommitTimestamp\nORDER BY `AlternativeCommitTS`").bind("prevCommitTimestamp").to(Timestamp.MIN_VALUE)).build();
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.exception((Statement)statement, (StatusRuntimeException)Status.NOT_FOUND.withDescription("Column not found").asRuntimeException()));
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerTableTailer tailer = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)db, (String)"Foo")).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setCommitTimestampColumn("AlternativeCommitTS").build();
        final SettableApiFuture res = SettableApiFuture.create();
        tailer.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                SpannerException e;
                if (from != ApiService.State.RUNNING) {
                    res.setException((Throwable)((Object)new AssertionError((Object)"expected from State to be STARTING")));
                }
                if (!(failure instanceof SpannerException)) {
                    res.setException((Throwable)((Object)new AssertionError((Object)"expected SpannerException")));
                }
                if ((e = (SpannerException)failure).getErrorCode() != ErrorCode.NOT_FOUND) {
                    res.setException((Throwable)((Object)new AssertionError((Object)"expected NOT_FOUND")));
                }
                res.set((Object)Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        tailer.startAsync();
        Truth.assertThat((Boolean)((Boolean)res.get(500L, TimeUnit.SECONDS))).isTrue();
    }

    private static final class TestChangeCallback
    implements SpannerTableChangeWatcher.RowChangeCallback {
        private final AtomicInteger receivedRows = new AtomicInteger();
        private CountDownLatch latch;
        private Timestamp lastSeenCommitTimestamp = Timestamp.MIN_VALUE;

        private TestChangeCallback(int initialCountDown) {
            this.latch = new CountDownLatch(initialCountDown);
        }

        private void setCountDown(int count) {
            this.latch = new CountDownLatch(count);
        }

        private CountDownLatch getLatch() {
            return this.latch;
        }

        public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
            if (commitTimestamp.compareTo(this.lastSeenCommitTimestamp) > 0) {
                this.lastSeenCommitTimestamp = commitTimestamp;
            }
            this.receivedRows.incrementAndGet();
            this.latch.countDown();
        }
    }
}

