/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.watcher.it;

import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.CommitTimestampRepository;
import com.google.cloud.spanner.watcher.FixedShardProvider;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerDatabaseChangeWatcher;
import com.google.cloud.spanner.watcher.SpannerDatabaseTailer;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.it.ITSpannerTableTailerStressTest;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.cloud.spanner.watcher.it.StressIntegrationTest;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.threeten.bp.Duration;

@Category(value={StressIntegrationTest.class})
@RunWith(value=Parameterized.class)
public class ITSpannerDatabaseTailerStressTest {
    private static final String[] TABLE_NAMES = new String[]{"TABLE1", "TABLE2", "TABLE3", "TABLE4", "TABLE5", "TABLE6", "TABLE7"};
    private static final String CREATE_INDEX = "CREATE INDEX IDX_SHARD_%s ON %s (%s)";
    private static final String[] FIXED_SHARD_COLS = new String[]{"ShardInt64", "ShardFloat64", "ShardBool", "ShardString", "ShardBytes", "ShardDate", "ShardTimestamp"};
    private static final ImmutableList<Value[]> SHARD_VALUES = ImmutableList.of((Object)new Value[]{Value.int64((long)1L), Value.float64((double)3.14), Value.bool((boolean)true), Value.string((String)"EAST"), Value.bytes((ByteArray)ByteArray.copyFrom((String)"EAST")), Value.date((Date)Date.fromYearMonthDay((int)2020, (int)6, (int)5)), Value.timestamp((Timestamp)Timestamp.now())}, (Object)new Value[]{Value.int64((long)-1L), Value.float64((double)6.662), Value.bool((boolean)false), Value.string((String)"WEST"), Value.bytes((ByteArray)ByteArray.copyFrom((String)"WEST")), Value.date((Date)Date.fromYearMonthDay((int)2019, (int)6, (int)5)), Value.timestamp((Timestamp)Timestamp.ofTimeSecondsAndNanos((long)10000L, (int)0))});
    @Parameterized.Parameter(value=0)
    public int changeCount;
    @Parameterized.Parameter(value=1)
    public int changeRunners;
    private static final Logger logger = Logger.getLogger(ITSpannerTableTailerStressTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();
    private static Database database;
    private final Object lock = new Object();
    private final ConcurrentMap<TestKey, Timestamp> lastWrittenTimestamps = new ConcurrentHashMap<TestKey, Timestamp>();
    private final ConcurrentMap<TestKey, Timestamp> lastReceivedTimestamps = new ConcurrentHashMap<TestKey, Timestamp>();
    private final AtomicInteger sentChanges = new AtomicInteger();
    private final Map<Long, Integer> idToShardValueMapping = new HashMap<Long, Integer>();

    @Parameterized.Parameters(name="change count= {0}, runners= {1}")
    public static Collection<Object[]> parameters() {
        ArrayList<Object[]> params = new ArrayList<Object[]>();
        for (int runners = 8; runners <= 256; runners *= 2) {
            for (int changeCount = runners; changeCount <= 1024; changeCount *= 2) {
                params.add(new Object[]{changeCount, runners});
            }
        }
        return params;
    }

    @BeforeClass
    public static void setup() throws Exception {
        Preconditions.checkState((TABLE_NAMES.length == FIXED_SHARD_COLS.length ? 1 : 0) != 0);
        for (Object[] shardValues : SHARD_VALUES) {
            Preconditions.checkState((FIXED_SHARD_COLS.length == shardValues.length ? 1 : 0) != 0);
        }
        SpannerTestHelper.setupSpanner(env);
        LinkedList<String> statements = new LinkedList<String>();
        for (Value table : TABLE_NAMES) {
            statements.add(String.format("CREATE TABLE %s (\n  ColInt64       INT64       NOT NULL,\n  ColFloat64     FLOAT64     NOT NULL,\n  ColBool        BOOL        NOT NULL,\n  ColString      STRING(100) NOT NULL,\n  ColStringMax   STRING(MAX) NOT NULL,\n  ColJson        JSON        NOT NULL,\n  ColBytes       BYTES(100)  NOT NULL,\n  ColBytesMax    BYTES(MAX)  NOT NULL,\n  ColDate        DATE        NOT NULL,\n  ColTimestamp   TIMESTAMP   NOT NULL,\n  ColShardId     STRING(MAX)         ,\n  ShardInt64     INT64               ,\n  ShardFloat64   FLOAT64             ,\n  ShardBool      BOOL                ,\n  ShardString    STRING(100)         ,\n  ShardBytes     BYTES(100)          ,\n  ShardDate      DATE                ,\n  ShardTimestamp TIMESTAMP           ,\n  ColCommitTS    TIMESTAMP   NOT NULL OPTIONS (allow_commit_timestamp=true),\n  \n  ColInt64Array     ARRAY<INT64>,\n  ColFloat64Array   ARRAY<FLOAT64>,\n  ColBoolArray      ARRAY<BOOL>,\n  ColStringArray    ARRAY<STRING(100)>,\n  ColStringMaxArray ARRAY<STRING(MAX)>,\n  ColJsonArray      ARRAY<JSON>,\n  ColBytesArray     ARRAY<BYTES(100)>,\n  ColBytesMaxArray  ARRAY<BYTES(MAX)>,\n  ColDateArray      ARRAY<DATE>,\n  ColTimestampArray ARRAY<TIMESTAMP>\n) PRIMARY KEY (ColInt64)\n", table));
        }
        for (int i = 0; i < TABLE_NAMES.length; ++i) {
            String table = TABLE_NAMES[i];
            statements.add(String.format(CREATE_INDEX, table, table, FIXED_SHARD_COLS[i]));
        }
        database = env.createTestDb(statements);
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    private static int getTableIndex(String table) {
        for (int i = 0; i < TABLE_NAMES.length; ++i) {
            if (!TABLE_NAMES[i].equals(table)) continue;
            return i;
        }
        return -1;
    }

    @After
    public void deleteTestData() {
        Spanner spanner = env.getSpanner();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        for (String table : TABLE_NAMES) {
            client.write(Collections.singleton(Mutation.delete((String)table, (KeySet)KeySet.all())));
        }
        this.sentChanges.set(0);
        this.lastReceivedTimestamps.clear();
        this.lastWrittenTimestamps.clear();
    }

    @Test
    public void testStressSpannerTailer() throws Exception {
        System.out.printf("Starting test (changeCount=%d, runners=%d)\n", this.changeCount, this.changeRunners);
        Spanner spanner = env.getSpanner();
        final ListeningScheduledExecutorService executor = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newScheduledThreadPool(this.changeRunners + 1));
        final CountDownLatch latch = new CountDownLatch(1);
        LinkedList<SpannerDatabaseTailer> watchers = new LinkedList<SpannerDatabaseTailer>();
        for (final Value[] shardValues : SHARD_VALUES) {
            HashMap<TableId, FixedShardProvider> shardProviders = new HashMap<TableId, FixedShardProvider>();
            for (int i = 0; i < TABLE_NAMES.length; ++i) {
                shardProviders.put(TableId.of((DatabaseId)database.getId(), (String)TABLE_NAMES[i]), FixedShardProvider.create((String)FIXED_SHARD_COLS[i], (Value)shardValues[i]));
            }
            SpannerDatabaseTailer watcher = SpannerDatabaseTailer.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).allTables().setPollInterval(Duration.ofMillis((long)1L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setShardProviders(shardProviders).build();
            watcher.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                    int tableIndex = ITSpannerDatabaseTailerStressTest.getTableIndex(table.getTable());
                    TestKey key = new TestKey(table, shardValues[tableIndex], row.getLong("ColInt64"));
                    Object object = ITSpannerDatabaseTailerStressTest.this.lock;
                    synchronized (object) {
                        ITSpannerDatabaseTailerStressTest.this.lastReceivedTimestamps.put(key, commitTimestamp);
                        if (ITSpannerDatabaseTailerStressTest.this.sentChanges.get() == ITSpannerDatabaseTailerStressTest.this.changeCount * TABLE_NAMES.length && latch.getCount() > 0L) {
                            if (ITSpannerDatabaseTailerStressTest.this.lastReceivedTimestamps.equals(ITSpannerDatabaseTailerStressTest.this.lastWrittenTimestamps)) {
                                latch.countDown();
                            } else {
                                executor.schedule(new Runnable(){

                                    @Override
                                    public void run() {
                                        latch.countDown();
                                    }
                                }, 5L, TimeUnit.SECONDS);
                            }
                        }
                    }
                }
            });
            watchers.add(watcher);
            watcher.startAsync();
        }
        for (SpannerDatabaseChangeWatcher watcher : watchers) {
            watcher.awaitRunning();
        }
        System.out.printf("Change watcher started (changeCount=%d, runners=%d)\n", this.changeCount, this.changeRunners);
        Stopwatch watch = Stopwatch.createStarted();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>(this.changeRunners);
        for (int i = 0; i < this.changeRunners; ++i) {
            futures.add(executor.submit((Callable)new GenerateChangesCallable(client, this.changeCount / this.changeRunners)));
        }
        Futures.allAsList(futures).get(300L, TimeUnit.SECONDS);
        System.out.printf("Finished writing changes in %d seconds (changeCount=%d, runners=%d)\n", watch.elapsed(TimeUnit.SECONDS), this.changeCount, this.changeRunners);
        latch.await(300L, TimeUnit.SECONDS);
        System.out.printf("Finished test in %d seconds (changeCount=%d, runners=%d)\n", watch.elapsed(TimeUnit.SECONDS), this.changeCount, this.changeRunners);
        Truth.assertThat(this.lastReceivedTimestamps).isEqualTo(this.lastWrittenTimestamps);
        for (SpannerDatabaseChangeWatcher watcher : watchers) {
            watcher.stopAsync();
        }
        for (SpannerDatabaseChangeWatcher watcher : watchers) {
            watcher.awaitTerminated();
        }
        executor.shutdown();
    }

    final class GenerateChangesCallable
    implements Callable<Void> {
        private final Random rnd = new Random();
        private final DatabaseClient client;
        private final int numChanges;

        GenerateChangesCallable(DatabaseClient client, int numChanges) {
            this.client = client;
            this.numChanges = numChanges;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() {
            for (int i = 0; i < this.numChanges; ++i) {
                LinkedList<Mutation> mutations = new LinkedList<Mutation>();
                LinkedList<Integer> shardValueIndexes = new LinkedList<Integer>();
                for (int tableIndex = 0; tableIndex < TABLE_NAMES.length; ++tableIndex) {
                    Integer shardValueIndex;
                    String table = TABLE_NAMES[tableIndex];
                    String shardColumn = FIXED_SHARD_COLS[tableIndex];
                    long id = this.rnd.nextInt(ITSpannerDatabaseTailerStressTest.this.changeCount / 2);
                    Object object = ITSpannerDatabaseTailerStressTest.this.lock;
                    synchronized (object) {
                        shardValueIndex = (Integer)ITSpannerDatabaseTailerStressTest.this.idToShardValueMapping.get(id);
                        if (shardValueIndex == null) {
                            shardValueIndex = this.rnd.nextInt(SHARD_VALUES.size());
                            ITSpannerDatabaseTailerStressTest.this.idToShardValueMapping.put(id, shardValueIndex);
                        }
                    }
                    shardValueIndexes.add(shardValueIndex);
                    Value shardValue = ((Value[])SHARD_VALUES.get(shardValueIndex.intValue()))[tableIndex];
                    mutations.add(ITSpannerTableTailerStressTest.createRandomMutation(table, id, shardColumn, shardValue));
                }
                Timestamp ts = this.client.write(mutations);
                ITSpannerDatabaseTailerStressTest.this.sentChanges.addAndGet(TABLE_NAMES.length);
                Object object = ITSpannerDatabaseTailerStressTest.this.lock;
                synchronized (object) {
                    int tableIndex = 0;
                    for (Mutation mutation : mutations) {
                        Long id = ((Value)mutation.asMap().get("ColInt64")).getInt64();
                        int shardValueIndex = (Integer)shardValueIndexes.get(tableIndex);
                        TestKey key = new TestKey(TableId.of((DatabaseId)database.getId(), (String)TABLE_NAMES[tableIndex]), ((Value[])SHARD_VALUES.get(shardValueIndex))[tableIndex], id);
                        Timestamp current = (Timestamp)ITSpannerDatabaseTailerStressTest.this.lastWrittenTimestamps.get(key);
                        if (current == null || ts.compareTo(current) > 0) {
                            ITSpannerDatabaseTailerStressTest.this.lastWrittenTimestamps.put(key, ts);
                        }
                        ++tableIndex;
                    }
                    continue;
                }
            }
            return null;
        }
    }

    static class TestKey {
        private final TableId tableId;
        private final Value shardId;
        private final Long id;

        TestKey(TableId tableId, Value shardId, Long id) {
            this.tableId = tableId;
            this.shardId = shardId;
            this.id = id;
        }

        public String toString() {
            return this.tableId.getTable() + ":" + this.shardId.toString() + ":" + this.id;
        }

        public boolean equals(Object o) {
            if (!(o instanceof TestKey)) {
                return false;
            }
            TestKey other = (TestKey)o;
            return Objects.equals(this.tableId, other.tableId) && Objects.equals(this.shardId, other.shardId) && Objects.equals(this.id, other.id);
        }

        public int hashCode() {
            return Objects.hash(this.tableId, this.shardId, this.id);
        }
    }
}

