/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.watcher;

import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.watcher.AbstractMockServerTest;
import com.google.cloud.spanner.watcher.CommitTimestampRepository;
import com.google.cloud.spanner.watcher.RandomResultSetGenerator;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerDatabaseTailer;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.SpannerTableTailer;
import com.google.cloud.spanner.watcher.TableId;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.Type;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(value=JUnit4.class)
public class SpannerDatabaseTailerTest
extends AbstractMockServerTest {
    @Test
    public void testReceiveChanges() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerDatabaseTailer tailer = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).allTables().setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(30);
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        tailer.stopAsync().awaitTerminated();
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)30);
    }

    @Test
    public void testTableNotFoundDuringInitialization() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerDatabaseTailer tailer = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).includeTables("Foo", new String[]{"Bar", "NonExistingTable"}).build();
        final SettableApiFuture res = SettableApiFuture.create();
        tailer.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                if (from != ApiService.State.STARTING) {
                    res.setException((Throwable)((Object)new AssertionError((Object)"expected from State to be STARTING")));
                }
                res.set((Object)Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        tailer.startAsync();
        Truth.assertThat((Boolean)((Boolean)res.get(5L, TimeUnit.SECONDS))).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTableDeleted() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerDatabaseTailer tailer = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).allTables().setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(30);
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        final SettableApiFuture res = SettableApiFuture.create();
        tailer.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                if (from != ApiService.State.RUNNING) {
                    res.setException((Throwable)((Object)new AssertionError((Object)"expected from State to be RUNNING")));
                }
                res.set((Object)Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        tailer.startAsync().awaitRunning();
        latch.await(20L, TimeUnit.SECONDS);
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)30);
        Level currentLevel = SpannerTableTailer.logger.getLevel();
        try {
            SpannerTableTailer.logger.setLevel(Level.OFF);
            mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.exception((Statement)this.getCurrentBarPollStatement(), (StatusRuntimeException)Status.NOT_FOUND.withDescription("Table not found").asRuntimeException()));
            Truth.assertThat((Boolean)((Boolean)res.get(5L, TimeUnit.SECONDS))).isTrue();
        }
        finally {
            SpannerTableTailer.logger.setLevel(currentLevel);
        }
        Truth.assertThat((Comparable)tailer.state()).isEqualTo((Object)ApiService.State.FAILED);
    }

    @Test
    public void testCustomExecutor() throws Exception {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        SpannerDatabaseTailer tailer = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).allTables().setExecutor(executor).setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(30);
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        tailer.stopAsync().awaitTerminated();
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)30);
        Truth.assertThat((Boolean)executor.isShutdown()).isFalse();
        executor.shutdown();
    }

    @Test
    public void testBuildWithoutTables() {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        try {
            SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).includeTables(null, new String[0]).build();
            Assert.fail((String)"missing expected exception");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void testTableNotFound() {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerDatabaseTailer tailer = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).includeTables("NonExistingTable", new String[0]).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                Assert.fail((String)"Received unexpected row change");
            }
        });
        try {
            tailer.startAsync().awaitRunning();
            Assert.fail((String)"missing expected exception");
        }
        catch (IllegalStateException e) {
            Truth.assertThat((Throwable)e.getCause()).isInstanceOf(SpannerException.class);
            SpannerException se = (SpannerException)e.getCause();
            Truth.assertThat((Comparable)se.getErrorCode()).isEqualTo((Object)ErrorCode.NOT_FOUND);
        }
    }

    @Test
    public void testNoTablesFound() {
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerDatabaseTailer tailer = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).allTables().except(new String[]{"Foo", "Bar"}).build();
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                Assert.fail((String)"Received unexpected row change");
            }
        });
        try {
            tailer.startAsync().awaitRunning();
            Assert.fail((String)"missing expected exception");
        }
        catch (IllegalStateException e) {
            Truth.assertThat((Throwable)e.getCause()).isInstanceOf(SpannerException.class);
            SpannerException se = (SpannerException)e.getCause();
            Truth.assertThat((Comparable)se.getErrorCode()).isEqualTo((Object)ErrorCode.NOT_FOUND);
        }
    }

    @Test
    public void testCustomCommitTimestampColumn() throws Exception {
        Timestamp ts = Timestamp.now();
        ResultSetMetadata metadata = RandomResultSetGenerator.METADATA.toBuilder().setRowType(RandomResultSetGenerator.METADATA.getRowType().toBuilder().setFields(RandomResultSetGenerator.METADATA.getRowType().getFieldsCount() - 1, StructType.Field.newBuilder().setName("AlternativeCommitTS").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()).build()).build()).build();
        Statement statement = ((Statement.Builder)Statement.newBuilder((String)"SELECT *\nFROM `Foo`\nWHERE `AlternativeCommitTS`>@prevCommitTimestamp\nORDER BY `AlternativeCommitTS`").bind("prevCommitTimestamp").to(Timestamp.MIN_VALUE)).build();
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)statement, (ResultSet)new RandomResultSetGenerator(1).generateWithFixedCommitTimestamp(ts).toBuilder().setMetadata(metadata).build()));
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query((Statement)((Statement.Builder)statement.toBuilder().bind("prevCommitTimestamp").to(ts)).build(), (ResultSet)new RandomResultSetGenerator(0).generate().toBuilder().setMetadata(metadata).build()));
        Spanner spanner = this.getSpanner();
        DatabaseId db = DatabaseId.of((String)"p", (String)"i", (String)"d");
        SpannerDatabaseTailer tailer = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)db).allTables().setPollInterval(Duration.ofMillis((long)10L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)db).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setCommitTimestampColumnFunction(tableId -> tableId.getTable().equals("Foo") ? "AlternativeCommitTS" : null).build();
        final AtomicInteger receivedRows = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(21);
        tailer.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                receivedRows.incrementAndGet();
                latch.countDown();
            }
        });
        tailer.startAsync().awaitRunning();
        latch.await(5L, TimeUnit.SECONDS);
        tailer.stopAsync().awaitTerminated();
        Truth.assertThat((Integer)receivedRows.get()).isEqualTo((Object)21);
    }
}

