/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore;

import com.alicloud.openservices.tablestore.AsyncClientInterface;
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.TableStoreReader;
import com.alicloud.openservices.tablestore.model.DescribeTableRequest;
import com.alicloud.openservices.tablestore.model.DescribeTableResponse;
import com.alicloud.openservices.tablestore.model.PrimaryKey;
import com.alicloud.openservices.tablestore.model.RowQueryCriteria;
import com.alicloud.openservices.tablestore.model.TableMeta;
import com.alicloud.openservices.tablestore.reader.PrimaryKeyWithTable;
import com.alicloud.openservices.tablestore.reader.ReaderBucket;
import com.alicloud.openservices.tablestore.reader.ReaderDispatcher;
import com.alicloud.openservices.tablestore.reader.ReaderEvent;
import com.alicloud.openservices.tablestore.reader.ReaderGroup;
import com.alicloud.openservices.tablestore.reader.ReaderResult;
import com.alicloud.openservices.tablestore.reader.ReaderStatistics;
import com.alicloud.openservices.tablestore.reader.ReaderUtils;
import com.alicloud.openservices.tablestore.reader.RowReadResult;
import com.alicloud.openservices.tablestore.reader.TableStoreReaderConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTableStoreReader
implements TableStoreReader {
    private static final int SCHEDULED_CORE_POOL_SIZE = 2;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new ThreadFactory(){
        private final AtomicInteger counter = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "reader-scheduled-pool-%d" + this.counter.getAndIncrement());
        }
    });
    private final Logger logger = LoggerFactory.getLogger(DefaultTableStoreReader.class);
    private final AsyncClientInterface ots;
    private final TableStoreReaderConfig config;
    private final TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback;
    private final Executor executor;
    private final ReaderBucket[] buckets;
    private final Semaphore semaphore;
    private final ReaderStatistics statistics;
    private final ReaderDispatcher dispatcher;
    private final Map<String, TableMeta> metaMap;

    public DefaultTableStoreReader(AsyncClientInterface ots, TableStoreReaderConfig config, Executor executor, TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback) {
        this.ots = ots;
        this.config = config;
        this.executor = executor;
        this.callback = callback;
        this.statistics = new ReaderStatistics();
        this.semaphore = new Semaphore(config.getConcurrency());
        this.metaMap = new HashMap<String, TableMeta>();
        this.buckets = new ReaderBucket[config.getBucketCount()];
        for (int i = 0; i < this.buckets.length; ++i) {
            this.buckets[i] = new ReaderBucket(ots, this.semaphore, config, callback, executor, this.statistics);
        }
        this.dispatcher = new ReaderDispatcher(this.buckets.length);
        this.startFlushTimer(config.getFlushInterval());
        this.startLogTimer(config.getLogInterval());
    }

    @Override
    public void addPrimaryKey(String tableName, PrimaryKey primaryKey) {
        if (this.config.isCheckTableMeta()) {
            this.checkPrimaryKeyWithTable(tableName, primaryKey);
        }
        ReaderGroup group = new ReaderGroup(1);
        PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
        while (!this.addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    @Override
    public Future<ReaderResult> addPrimaryKeyWithFuture(String tableName, PrimaryKey primaryKey) {
        if (this.config.isCheckTableMeta()) {
            this.checkPrimaryKeyWithTable(tableName, primaryKey);
        }
        ReaderGroup group = new ReaderGroup(1);
        PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
        while (!this.addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException interruptedException) {}
        }
        return group.getFuture();
    }

    @Override
    public void addPrimaryKeys(String tableName, List<PrimaryKey> primaryKeys) {
        ReaderGroup group = new ReaderGroup(primaryKeys.size());
        for (PrimaryKey primaryKey : primaryKeys) {
            if (this.config.isCheckTableMeta()) {
                this.checkPrimaryKeyWithTable(tableName, primaryKey);
            }
            PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
            while (!this.addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    @Override
    public Future<ReaderResult> addPrimaryKeysWithFuture(String tableName, List<PrimaryKey> primaryKeys) {
        ReaderGroup group = new ReaderGroup(primaryKeys.size());
        for (PrimaryKey primaryKey : primaryKeys) {
            if (this.config.isCheckTableMeta()) {
                this.checkPrimaryKeyWithTable(tableName, primaryKey);
            }
            PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
            while (!this.addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        return group.getFuture();
    }

    @Override
    public void setRowQueryCriteria(RowQueryCriteria rowQueryCriteria) {
        for (ReaderBucket bucket : this.buckets) {
            bucket.setRowQueryCriteria(rowQueryCriteria);
        }
    }

    @Override
    public void send() {
        this.logger.debug("trigger send data.");
        if (this.closed.get()) {
            throw new ClientException("The reader has been closed.");
        }
        this.triggerEvent(ReaderEvent.EventType.SEND);
        this.logger.debug("user trigger send finished.");
    }

    @Override
    public void flush() {
        this.logger.debug("trigger flush and waiting.");
        if (this.closed.get()) {
            throw new ClientException("The reader has been closed.");
        }
        CountDownLatch latch = this.triggerEvent(ReaderEvent.EventType.FLUSH);
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new ClientException(e);
        }
        this.logger.info("Reader statistics: " + this.statistics);
        this.logger.debug("user trigger flush finished.");
    }

    @Override
    public void setCallback(TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback) {
        for (ReaderBucket bucket : this.buckets) {
            bucket.setCallback(callback);
        }
    }

    private CountDownLatch triggerEvent(ReaderEvent.EventType type) {
        CountDownLatch latch = new CountDownLatch(1);
        for (ReaderBucket bucket : this.buckets) {
            bucket.addSignal(latch, type);
        }
        return latch;
    }

    @Override
    public synchronized void close() {
        if (this.closed.get()) {
            throw new ClientException("The reader has already been closed.");
        }
        this.flush();
        this.scheduledExecutorService.shutdown();
        for (ReaderBucket bucket : this.buckets) {
            bucket.close();
        }
        this.closed.set(true);
    }

    private boolean addPrimaryKeyWithTableInternal(PrimaryKeyWithTable primaryKeyWithTable, ReaderGroup readerGroup) {
        if (this.closed.get()) {
            throw new ClientException("The reader has been closed.");
        }
        int dispatchIndex = this.dispatcher.getDispatchIndex(primaryKeyWithTable.getPrimaryKey());
        return this.buckets[dispatchIndex].addPrimaryKeyWithTable(primaryKeyWithTable, readerGroup);
    }

    public void startFlushTimer(int flushInterval) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                DefaultTableStoreReader.this.triggerEvent(ReaderEvent.EventType.FLUSH);
            }
        }, 0L, flushInterval, TimeUnit.MILLISECONDS);
    }

    private void startLogTimer(int interval) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                StringBuilder ringBufferRemain = new StringBuilder("RingBuffer Remain: ");
                for (ReaderBucket bucket : DefaultTableStoreReader.this.buckets) {
                    ringBufferRemain.append(bucket.getRingBuffer().remainingCapacity());
                    ringBufferRemain.append(", ");
                }
                DefaultTableStoreReader.this.logger.debug(ringBufferRemain.toString());
            }
        }, 0L, interval, TimeUnit.MILLISECONDS);
    }

    private void checkPrimaryKeyWithTable(String tableName, PrimaryKey primaryKey) {
        if (this.metaMap.containsKey(tableName) && this.metaMap.get(tableName) == null) {
            throw new ClientException("The table : {" + tableName + "} does not exist.");
        }
        if (!this.metaMap.containsKey(tableName)) {
            try {
                DescribeTableResponse response = this.ots.asSyncClient().describeTable(new DescribeTableRequest(tableName));
                this.metaMap.put(tableName, response.getTableMeta());
            }
            catch (TableStoreException e) {
                this.metaMap.put(tableName, null);
                throw new ClientException("The table : {" + tableName + "} does not exist.");
            }
        }
        ReaderUtils.checkTableMeta(this.metaMap.get(tableName), primaryKey);
    }

    public ReaderStatistics getStatistics() {
        return this.statistics;
    }

    public TableStoreReaderConfig getConfig() {
        return this.config;
    }
}

