/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore;

import com.alicloud.openservices.tablestore.AsyncClientInterface;
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.utils.ParamChecker;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.ConsumedCapacity;
import com.alicloud.openservices.tablestore.model.DescribeTableRequest;
import com.alicloud.openservices.tablestore.model.DescribeTableResponse;
import com.alicloud.openservices.tablestore.model.RowChange;
import com.alicloud.openservices.tablestore.model.TableMeta;
import com.alicloud.openservices.tablestore.writer.DefaultWriterStatistics;
import com.alicloud.openservices.tablestore.writer.RowChangeEvent;
import com.alicloud.openservices.tablestore.writer.RowChangeEventHandler;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterStatistics;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTableStoreWriter
implements TableStoreWriter {
    private Logger logger = LoggerFactory.getLogger(TableStoreWriter.class);
    private AsyncClientInterface ots;
    private Executor executor;
    private WriterConfig writerConfig;
    private TableStoreCallback<RowChange, ConsumedCapacity> callback;
    private String tableName;
    private TableMeta tableMeta;
    private Timer flushTimer;
    private ReentrantLock lock;
    private Disruptor<RowChangeEvent> disruptor;
    private RingBuffer<RowChangeEvent> ringBuffer;
    private RowChangeEventHandler eventHandler;
    private ExecutorService disruptorExecutor;
    private DefaultWriterStatistics writerStatistics;

    public DefaultTableStoreWriter(AsyncClientInterface ots, String tableName, WriterConfig config, TableStoreCallback<RowChange, ConsumedCapacity> callback, Executor executor) {
        Preconditions.checkNotNull(ots, "The ots client can not be null.");
        Preconditions.checkArgument(tableName != null && !tableName.isEmpty(), "The table name can not be null or empty.");
        Preconditions.checkNotNull(executor, "The executor service can not be null.");
        this.writerStatistics = new DefaultWriterStatistics();
        this.ots = ots;
        this.tableName = tableName;
        this.writerConfig = config;
        this.callback = callback;
        this.executor = executor;
        this.flushTimer = new Timer();
        this.lock = new ReentrantLock();
        this.initialize();
    }

    private void initialize() {
        this.logger.info("Start initialize ots writer, table name: {}.", (Object)this.tableName);
        DescribeTableRequest request = new DescribeTableRequest();
        request.setTableName(this.tableName);
        Future<DescribeTableResponse> result = this.ots.describeTable(request, null);
        DescribeTableResponse res = null;
        try {
            res = result.get();
        }
        catch (Exception e) {
            throw new ClientException(e);
        }
        this.tableMeta = res.getTableMeta();
        this.logger.info("End initialize with table meta: {}.", (Object)this.tableMeta);
        RowChangeEvent.RowChangeEventFactory factory = new RowChangeEvent.RowChangeEventFactory();
        this.disruptorExecutor = Executors.newFixedThreadPool(1);
        this.disruptor = new Disruptor((EventFactory)factory, this.writerConfig.getBufferSize(), (Executor)this.disruptorExecutor);
        this.ringBuffer = this.disruptor.getRingBuffer();
        this.eventHandler = new RowChangeEventHandler(this.ots, this.writerConfig, this.callback, this.executor, this.writerStatistics);
        this.disruptor.handleEventsWith(new EventHandler[]{this.eventHandler});
        this.disruptor.start();
        this.startFlushTimer(this.writerConfig.getFlushInterval());
    }

    public void startFlushTimer(int flushInterval) {
        this.flushTimer.cancel();
        this.flushTimer = new Timer();
        this.flushTimer.scheduleAtFixedRate(new TimerTask(){

            @Override
            public void run() {
                DefaultTableStoreWriter.this.triggerFlush();
            }
        }, flushInterval, (long)flushInterval);
    }

    @Override
    public void addRowChange(RowChange rowChange) {
        if (this.writerConfig.isEnableSchemaCheck()) {
            ParamChecker.checkRowChange(this.tableMeta, rowChange, this.writerConfig);
        }
        while (true) {
            try {
                long sequence = this.ringBuffer.tryNext();
                RowChangeEvent event = (RowChangeEvent)this.ringBuffer.get(sequence);
                event.setValue(rowChange);
                this.ringBuffer.publish(sequence);
                return;
            }
            catch (InsufficientCapacityException e) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException interruptedException) {
                }
                continue;
            }
            break;
        }
    }

    private void addSignal(ReentrantLock lock, Condition condition) {
        while (true) {
            try {
                long sequence = this.ringBuffer.tryNext();
                RowChangeEvent event = (RowChangeEvent)this.ringBuffer.get(sequence);
                event.setValue(lock, condition);
                this.ringBuffer.publish(sequence);
                return;
            }
            catch (InsufficientCapacityException e) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException exp) {
                }
                continue;
            }
            break;
        }
    }

    @Override
    public void addRowChange(List<RowChange> rowChanges, List<RowChange> dirtyRows) throws ClientException {
        dirtyRows.clear();
        for (RowChange rowChange : rowChanges) {
            try {
                this.addRowChange(rowChange);
            }
            catch (ClientException e) {
                dirtyRows.add(rowChange);
            }
        }
        if (!dirtyRows.isEmpty()) {
            throw new ClientException("There is dirty rows.");
        }
    }

    @Override
    public void setCallback(TableStoreCallback<RowChange, ConsumedCapacity> callback) {
        this.callback = callback;
    }

    @Override
    public TableStoreCallback<RowChange, ConsumedCapacity> getCallback() {
        return this.callback;
    }

    @Override
    public WriterConfig getWriterConfig() {
        return this.writerConfig;
    }

    @Override
    public WriterStatistics getWriterStatistics() {
        return this.writerStatistics;
    }

    private void triggerFlush() {
        Condition cond = this.lock.newCondition();
        this.addSignal(this.lock, cond);
    }

    @Override
    public synchronized void flush() throws ClientException {
        this.logger.debug("trigger flush and waiting.");
        Condition cond = this.lock.newCondition();
        this.lock.lock();
        try {
            this.addSignal(this.lock, cond);
            cond.await();
        }
        catch (InterruptedException e) {
            throw new ClientException(e);
        }
        finally {
            this.lock.unlock();
        }
        this.logger.debug("user trigger flush finished.");
    }

    @Override
    public synchronized void close() {
        this.flushTimer.cancel();
        this.flush();
        this.disruptor.shutdown();
        this.disruptorExecutor.shutdown();
    }
}

