/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.watcher.it;

import com.google.api.client.util.Base64;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.StructReader;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.CommitTimestampRepository;
import com.google.cloud.spanner.watcher.ShardProvider;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.SpannerTableTailer;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.TimebasedShardProvider;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.cloud.spanner.watcher.it.StressIntegrationTest;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.threeten.bp.Duration;

@Category(value={StressIntegrationTest.class})
@RunWith(value=Parameterized.class)
public class ITSpannerTableTailerStressTest {
    static final String TABLE_NAME = "TEST_TABLE";
    static final String CREATE_TABLE = "CREATE TABLE %s (\n  ColInt64       INT64       NOT NULL,\n  ColFloat64     FLOAT64     NOT NULL,\n  ColBool        BOOL        NOT NULL,\n  ColString      STRING(100) NOT NULL,\n  ColStringMax   STRING(MAX) NOT NULL,\n  ColBytes       BYTES(100)  NOT NULL,\n  ColBytesMax    BYTES(MAX)  NOT NULL,\n  ColDate        DATE        NOT NULL,\n  ColTimestamp   TIMESTAMP   NOT NULL,\n  ColShardId     STRING(MAX)         ,\n  ShardInt64     INT64               ,\n  ShardFloat64   FLOAT64             ,\n  ShardBool      BOOL                ,\n  ShardString    STRING(100)         ,\n  ShardBytes     BYTES(100)          ,\n  ShardDate      DATE                ,\n  ShardTimestamp TIMESTAMP           ,\n  ColCommitTS    TIMESTAMP   NOT NULL OPTIONS (allow_commit_timestamp=true),\n  \n  ColInt64Array     ARRAY<INT64>,\n  ColFloat64Array   ARRAY<FLOAT64>,\n  ColBoolArray      ARRAY<BOOL>,\n  ColStringArray    ARRAY<STRING(100)>,\n  ColStringMaxArray ARRAY<STRING(MAX)>,\n  ColBytesArray     ARRAY<BYTES(100)>,\n  ColBytesMaxArray  ARRAY<BYTES(MAX)>,\n  ColDateArray      ARRAY<DATE>,\n  ColTimestampArray ARRAY<TIMESTAMP>\n) PRIMARY KEY (ColInt64)\n";
    static final String CREATE_SHARD_INDEX = "CREATE INDEX IDX_%s_SHARD ON %s (ColShardId)";
    private static final Random rnd = new Random();
    @Parameterized.Parameter(value=0)
    public int changeCount;
    @Parameterized.Parameter(value=1)
    public int changeRunners;
    private static final Logger logger = Logger.getLogger(ITSpannerTableTailerStressTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();
    private static Database database;
    private final Object lock = new Object();
    private final ConcurrentMap<Long, Timestamp> lastWrittenTimestamps = new ConcurrentHashMap<Long, Timestamp>();
    private final ConcurrentMap<Long, Timestamp> lastReceivedTimestamps = new ConcurrentHashMap<Long, Timestamp>();
    private final AtomicInteger sentChanges = new AtomicInteger();

    @Parameterized.Parameters(name="change count= {0}, runners= {1}")
    public static Collection<Object[]> parameters() {
        ArrayList<Object[]> params = new ArrayList<Object[]>();
        for (int runners = 8; runners <= 256; runners *= 2) {
            for (int changeCount = runners; changeCount <= 1024; changeCount *= 2) {
                params.add(new Object[]{changeCount, runners});
            }
        }
        return params;
    }

    @BeforeClass
    public static void setup() throws Exception {
        SpannerTestHelper.setupSpanner(env);
        database = env.createTestDb((Iterable<String>)ImmutableList.of((Object)String.format(CREATE_TABLE, TABLE_NAME), (Object)String.format(CREATE_SHARD_INDEX, TABLE_NAME, TABLE_NAME)));
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    @After
    public void deleteTestData() {
        Spanner spanner = env.getSpanner();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        client.write(Collections.singleton(Mutation.delete((String)TABLE_NAME, (KeySet)KeySet.all())));
        this.sentChanges.set(0);
    }

    @Test
    public void testStressSpannerTailer() throws Exception {
        System.out.printf("Starting test (changeCount=%d, runners=%d)\n", this.changeCount, this.changeRunners);
        Spanner spanner = env.getSpanner();
        SpannerTableTailer watcher = SpannerTableTailer.newBuilder((Spanner)spanner, (TableId)TableId.of((DatabaseId)database.getId(), (String)TABLE_NAME)).setPollInterval(Duration.ofMillis((long)1L)).setCommitTimestampRepository((CommitTimestampRepository)SpannerCommitTimestampRepository.newBuilder((Spanner)spanner, (DatabaseId)database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setShardProvider((ShardProvider)TimebasedShardProvider.create((String)"ColShardId", (TimebasedShardProvider.Interval)TimebasedShardProvider.Interval.MINUTE_OF_HOUR)).build();
        final CountDownLatch latch = new CountDownLatch(1);
        watcher.addCallback(new SpannerTableChangeWatcher.RowChangeCallback(){

            public void rowChange(TableId table, SpannerTableChangeWatcher.Row row, Timestamp commitTimestamp) {
                ITSpannerTableTailerStressTest.this.lastReceivedTimestamps.put(row.getLong("ColInt64"), commitTimestamp);
                if (ITSpannerTableTailerStressTest.this.sentChanges.get() == ITSpannerTableTailerStressTest.this.changeCount && ITSpannerTableTailerStressTest.this.lastReceivedTimestamps.equals(ITSpannerTableTailerStressTest.this.lastWrittenTimestamps)) {
                    latch.countDown();
                }
            }
        });
        watcher.startAsync().awaitRunning();
        System.out.printf("Change watcher started (changeCount=%d, runners=%d)\n", this.changeCount, this.changeRunners);
        Stopwatch watch = Stopwatch.createStarted();
        DatabaseClient client = spanner.getDatabaseClient(database.getId());
        ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(this.changeRunners));
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>(this.changeRunners);
        for (int i = 0; i < this.changeRunners; ++i) {
            futures.add(executor.submit((Callable)new GenerateChangesCallable(client, this.changeCount / this.changeRunners)));
        }
        Futures.allAsList(futures).get(300L, TimeUnit.SECONDS);
        System.out.printf("Finished writing changes in %d seconds (changeCount=%d, runners=%d)\n", watch.elapsed(TimeUnit.SECONDS), this.changeCount, this.changeRunners);
        latch.await(300L, TimeUnit.SECONDS);
        System.out.printf("Finished test in %d seconds (changeCount=%d, runners=%d)\n", watch.elapsed(TimeUnit.SECONDS), this.changeCount, this.changeRunners);
        Truth.assertThat(this.lastReceivedTimestamps).isEqualTo(this.lastWrittenTimestamps);
        watcher.stopAsync().awaitTerminated();
        executor.shutdown();
    }

    static Mutation createRandomMutation(String table, String shardColumn, Value shardValue, int changeCount) {
        return ITSpannerTableTailerStressTest.createRandomMutation(table, rnd.nextInt(changeCount / 2), shardColumn, shardValue);
    }

    static Mutation createRandomMutation(String table, long id, String shardColumn, Value shardValue) {
        return ((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)((Mutation.WriteBuilder)Mutation.newInsertOrUpdateBuilder((String)table).set("ColInt64").to(id)).set("ColFloat64").to(rnd.nextDouble())).set("ColBool").to(rnd.nextBoolean())).set("ColString").to(ITSpannerTableTailerStressTest.randomString(100))).set("ColStringMax").to(ITSpannerTableTailerStressTest.randomString(1000))).set("ColBytes").to(ITSpannerTableTailerStressTest.randomBytes(100))).set("ColBytesMax").to(ITSpannerTableTailerStressTest.randomBytes(1000))).set("ColDate").to(ITSpannerTableTailerStressTest.randomDate())).set("ColTimestamp").to(ITSpannerTableTailerStressTest.randomTimestamp())).set(shardColumn).to(shardValue)).set("ColCommitTS").to(Value.COMMIT_TIMESTAMP)).set("ColInt64Array").toInt64Array(ITSpannerTableTailerStressTest.randomLongs(1000))).set("ColFloat64Array").toFloat64Array(ITSpannerTableTailerStressTest.randomDoubles(1000))).set("ColBoolArray").toBoolArray(ITSpannerTableTailerStressTest.randomBooleans(1000))).set("ColStringArray").toStringArray(ITSpannerTableTailerStressTest.randomStrings(100))).set("ColStringMaxArray").toStringArray(ITSpannerTableTailerStressTest.randomStrings(100))).set("ColBytesArray").toBytesArray(ITSpannerTableTailerStressTest.randomBytesArray(100))).set("ColBytesMaxArray").toBytesArray(ITSpannerTableTailerStressTest.randomBytesArray(100))).set("ColDateArray").toDateArray(ITSpannerTableTailerStressTest.randomDates(100))).set("ColTimestampArray").toTimestampArray(ITSpannerTableTailerStressTest.randomTimestamps(100))).build();
    }

    static String randomString(int maxLength) {
        int length = rnd.nextInt(maxLength / 2) + 1;
        byte[] stringBytes = new byte[length];
        rnd.nextBytes(stringBytes);
        return Base64.encodeBase64String((byte[])stringBytes);
    }

    static ByteArray randomBytes(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        byte[] bytes = new byte[length];
        rnd.nextBytes(bytes);
        return ByteArray.copyFrom((byte[])bytes);
    }

    static Date randomDate() {
        return Date.fromYearMonthDay((int)(rnd.nextInt(2020) + 1), (int)(rnd.nextInt(11) + 1), (int)(rnd.nextInt(28) + 1));
    }

    static Timestamp randomTimestamp() {
        return Timestamp.ofTimeMicroseconds((long)(rnd.nextInt(100000000) + 1));
    }

    static long[] randomLongs(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        return rnd.longs(length).toArray();
    }

    static double[] randomDoubles(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        return rnd.doubles(length).toArray();
    }

    static Iterable<Boolean> randomBooleans(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        return IntStream.range(0, length).mapToObj(i -> rnd.nextBoolean()).collect(Collectors.toList());
    }

    static Iterable<String> randomStrings(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        return IntStream.range(0, length).mapToObj(i -> ITSpannerTableTailerStressTest.randomString(100)).collect(Collectors.toList());
    }

    static Iterable<ByteArray> randomBytesArray(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        return IntStream.range(0, length).mapToObj(i -> ITSpannerTableTailerStressTest.randomBytes(100)).collect(Collectors.toList());
    }

    static Iterable<Date> randomDates(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        return IntStream.range(0, length).mapToObj(i -> ITSpannerTableTailerStressTest.randomDate()).collect(Collectors.toList());
    }

    static Iterable<Timestamp> randomTimestamps(int maxLength) {
        int length = rnd.nextInt(maxLength) + 1;
        return IntStream.range(0, length).mapToObj(i -> ITSpannerTableTailerStressTest.randomTimestamp()).collect(Collectors.toList());
    }

    static final class MapStructReader
    implements StructReader {
        private final Map<String, Value> values;

        MapStructReader(Map<String, Value> values) {
            this.values = values;
        }

        public Type getType() {
            throw new UnsupportedOperationException();
        }

        public int getColumnCount() {
            return this.values.size();
        }

        public int getColumnIndex(String columnName) {
            throw new UnsupportedOperationException();
        }

        public Type getColumnType(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public Type getColumnType(String columnName) {
            throw new UnsupportedOperationException();
        }

        public boolean isNull(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public boolean isNull(String columnName) {
            return this.values.get(columnName).isNull();
        }

        public boolean getBoolean(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public boolean getBoolean(String columnName) {
            return this.values.get(columnName).getBool();
        }

        public long getLong(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public long getLong(String columnName) {
            return this.values.get(columnName).getInt64();
        }

        public double getDouble(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public double getDouble(String columnName) {
            return this.values.get(columnName).getFloat64();
        }

        public String getString(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public String getString(String columnName) {
            return this.values.get(columnName).getString();
        }

        public ByteArray getBytes(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public ByteArray getBytes(String columnName) {
            return this.values.get(columnName).getBytes();
        }

        public Timestamp getTimestamp(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public Timestamp getTimestamp(String columnName) {
            return this.values.get(columnName).getTimestamp();
        }

        public Date getDate(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public Date getDate(String columnName) {
            return this.values.get(columnName).getDate();
        }

        public boolean[] getBooleanArray(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public boolean[] getBooleanArray(String columnName) {
            throw new UnsupportedOperationException();
        }

        public List<Boolean> getBooleanList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<Boolean> getBooleanList(String columnName) {
            return this.values.get(columnName).getBoolArray();
        }

        public long[] getLongArray(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public long[] getLongArray(String columnName) {
            throw new UnsupportedOperationException();
        }

        public List<Long> getLongList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<Long> getLongList(String columnName) {
            return this.values.get(columnName).getInt64Array();
        }

        public double[] getDoubleArray(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public double[] getDoubleArray(String columnName) {
            throw new UnsupportedOperationException();
        }

        public List<Double> getDoubleList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<Double> getDoubleList(String columnName) {
            return this.values.get(columnName).getFloat64Array();
        }

        public List<String> getStringList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<String> getStringList(String columnName) {
            return this.values.get(columnName).getStringArray();
        }

        public List<ByteArray> getBytesList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<ByteArray> getBytesList(String columnName) {
            return this.values.get(columnName).getBytesArray();
        }

        public List<Timestamp> getTimestampList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<Timestamp> getTimestampList(String columnName) {
            return this.values.get(columnName).getTimestampArray();
        }

        public List<Date> getDateList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<Date> getDateList(String columnName) {
            return this.values.get(columnName).getDateArray();
        }

        public List<Struct> getStructList(int columnIndex) {
            throw new UnsupportedOperationException();
        }

        public List<Struct> getStructList(String columnName) {
            throw new UnsupportedOperationException();
        }
    }

    final class GenerateChangesCallable
    implements Callable<Void> {
        private final DatabaseClient client;
        private final int numChanges;
        private final TimebasedShardProvider.Interval shardInterval = TimebasedShardProvider.Interval.MINUTE_OF_HOUR;
        private TimebasedShardProvider.TimebasedShardId currentShardId;

        GenerateChangesCallable(DatabaseClient client, int numChanges) {
            this.client = client;
            this.numChanges = numChanges;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() {
            for (int i = 0; i < this.numChanges; ++i) {
                if (this.currentShardId == null || this.currentShardId.shouldRefresh()) {
                    this.currentShardId = this.shardInterval.getCurrentShardId(this.client.singleUse());
                }
                Mutation mutation = ITSpannerTableTailerStressTest.createRandomMutation(ITSpannerTableTailerStressTest.TABLE_NAME, "ColShardId", this.currentShardId.getValue(), ITSpannerTableTailerStressTest.this.changeCount);
                Timestamp ts = this.client.write(Collections.singleton(mutation));
                ITSpannerTableTailerStressTest.this.sentChanges.incrementAndGet();
                Long key = ((Value)mutation.asMap().get("ColInt64")).getInt64();
                Object object = ITSpannerTableTailerStressTest.this.lock;
                synchronized (object) {
                    Timestamp current = (Timestamp)ITSpannerTableTailerStressTest.this.lastWrittenTimestamps.get(key);
                    if (current == null || ts.compareTo(current) > 0) {
                        ITSpannerTableTailerStressTest.this.lastWrittenTimestamps.put(key, ts);
                    }
                    continue;
                }
            }
            return null;
        }
    }
}

