/*
 * Decompiled with CFR 0.152.
 */
package net.snowflake.client.loader;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.snowflake.client.jdbc.SnowflakeType;
import net.snowflake.client.jdbc.internal.apache.commons.lang3.RandomStringUtils;
import net.snowflake.client.loader.BufferStage;
import net.snowflake.client.loader.LoadResultListener;
import net.snowflake.client.loader.Loader;
import net.snowflake.client.loader.LoaderProperty;
import net.snowflake.client.loader.LoadingError;
import net.snowflake.client.loader.Operation;
import net.snowflake.client.loader.ProcessQueue;
import net.snowflake.client.loader.PutQueue;
import net.snowflake.client.loader.Utils;

public class StreamLoader
implements Loader,
Runnable {
    private static final Logger LOGGER = Logger.getLogger(StreamLoader.class.getName());
    public static final String FILE_PREFIX = "stream_";
    public static final String FILE_SUFFIX = ".gz";
    public static final long DEFAULT_BATCH_ROW_SIZE = -1L;
    public static DatabaseMetaData metadata;
    private BufferStage _stage = null;
    private Operation _op = null;
    private boolean _startTransaction = false;
    private boolean _is_first_start_call = true;
    private boolean _is_last_finish_call = true;
    private boolean _oneBatch = false;
    private boolean _truncate = false;
    private String _before = null;
    private String _after = null;
    private ArrayBlockingQueue<byte[]> _queueData;
    private Thread _thread;
    private ArrayBlockingQueue<BufferStage> _queuePut;
    private PutQueue _put;
    private ArrayBlockingQueue<BufferStage> _queueProcess;
    private ProcessQueue _process;
    private String _remoteStage = "~";
    private String _table;
    private String _schema;
    private String _database;
    private List<String> _columns;
    private List<String> _keys;
    private long _batchRowSize = -1L;
    private long _csvFileBucketSize = 64L;
    private long _csvFileSize = 50000000L;
    boolean _testRemoteBadCSV = false;
    boolean _preserveStageFile = false;
    boolean _useLocalTimezone = false;
    boolean _testMode = false;
    private final Connection _putConn;
    private final Connection _processConn;
    private final String _noise;
    private AtomicBoolean _active = new AtomicBoolean(false);
    private AtomicBoolean _aborted = new AtomicBoolean(false);
    private RuntimeException _abortCause = new Loader.ConnectionError("Unknown exception");
    AtomicInteger _throttleCounter = new AtomicInteger(0);
    private LoadResultListener _listener = new LoadResultListener(){
        private final AtomicInteger errorCount = new AtomicInteger(0);
        private final AtomicInteger errorRecordCount = new AtomicInteger(0);
        private final AtomicInteger submittedRowCount = new AtomicInteger(0);

        @Override
        public boolean needErrors() {
            return false;
        }

        @Override
        public boolean needSuccessRecords() {
            return false;
        }

        @Override
        public void addError(LoadingError error) {
        }

        @Override
        public boolean throwOnError() {
            return false;
        }

        @Override
        public void recordProvided(Operation op, Object[] record) {
        }

        @Override
        public void addProcessedRecordCount(Operation op, int i) {
        }

        @Override
        public void addOperationRecordCount(Operation op, int i) {
        }

        @Override
        public int getErrorCount() {
            return this.errorCount.get();
        }

        @Override
        public int getErrorRecordCount() {
            return this.errorRecordCount.get();
        }

        @Override
        public void resetErrorCount() {
            this.errorCount.set(0);
        }

        @Override
        public void resetErrorRecordCount() {
            this.errorRecordCount.set(0);
        }

        @Override
        public void addErrorCount(int count) {
            this.errorCount.addAndGet(count);
        }

        @Override
        public void addErrorRecordCount(int count) {
            this.errorRecordCount.addAndGet(count);
        }

        @Override
        public void resetSubmittedRowCount() {
            this.submittedRowCount.set(0);
        }

        @Override
        public void addSubmittedRowCount(int count) {
            this.submittedRowCount.addAndGet(count);
        }

        @Override
        public int getSubmittedRowCount() {
            return this.submittedRowCount.get();
        }
    };

    public StreamLoader(Map<LoaderProperty, Object> properties, Connection putConnection, Connection processConnection) {
        this._putConn = putConnection;
        this._processConn = processConnection;
        for (Map.Entry<LoaderProperty, Object> e : properties.entrySet()) {
            this.setProperty(e.getKey(), e.getValue());
        }
        this._noise = RandomStringUtils.randomAlphanumeric(6);
    }

    @Override
    public void setProperty(LoaderProperty property, Object value) {
        switch (property) {
            case tableName: {
                this._table = (String)value;
                break;
            }
            case schemaName: {
                this._schema = (String)value;
                break;
            }
            case databaseName: {
                this._database = (String)value;
                break;
            }
            case remoteStage: {
                this._remoteStage = (String)value;
                break;
            }
            case columns: {
                this._columns = (List)value;
                break;
            }
            case keys: {
                this._keys = (List)value;
                break;
            }
            case operation: {
                this._op = (Operation)((Object)value);
                break;
            }
            case startTransaction: {
                this._startTransaction = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case oneBatch: {
                this._oneBatch = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case truncateTable: {
                this._truncate = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case executeBefore: {
                this._before = String.valueOf(value);
                break;
            }
            case executeAfter: {
                this._after = String.valueOf(value);
                break;
            }
            case isFirstStartCall: {
                this._is_first_start_call = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case isLastFinishCall: {
                this._is_last_finish_call = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case batchRowSize: {
                this._batchRowSize = new Long((String)value);
                break;
            }
            case csvFileBucketSize: {
                this._csvFileBucketSize = new Long((String)value);
                break;
            }
            case csvFileSize: {
                this._csvFileSize = new Long((String)value);
                break;
            }
            case preserveStageFile: {
                this._preserveStageFile = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case useLocalTimezone: {
                this._useLocalTimezone = Boolean.valueOf(String.valueOf(value));
                break;
            }
            case testRemoteBadCSV: {
                this._testRemoteBadCSV = Boolean.valueOf(String.valueOf(value));
                break;
            }
        }
    }

    @Override
    public void start() {
        LOGGER.log(Level.FINER, "Start Loading");
        this.validateParameters();
        if (this._op == null) {
            this.abort(new Loader.ConnectionError("Loader started with no operation"));
            return;
        }
        this.initQueues();
        if (this._is_first_start_call) {
            try {
                if (this._startTransaction) {
                    LOGGER.log(Level.INFO, "Begin Transaction");
                    this._processConn.createStatement().execute("begin transaction");
                } else {
                    LOGGER.log(Level.INFO, "No Transaction started");
                }
            }
            catch (SQLException ex) {
                this.abort(new Loader.ConnectionError("Failed to start Transaction", Utils.getCause(ex)));
            }
            if (this._truncate) {
                this.truncateTargetTable();
            }
            try {
                if (this._before != null) {
                    LOGGER.log(Level.INFO, "Running Execute Before SQL");
                    this._processConn.createStatement().execute(this._before);
                }
            }
            catch (SQLException ex) {
                this.abort(new Loader.ConnectionError(String.format("Execute Before SQL failed to run: %s", this._before), Utils.getCause(ex)));
            }
        }
    }

    private void validateParameters() {
        LOGGER.log(Level.FINER, "Validate Parameters");
        if (Operation.INSERT != this._op && (this._keys == null || this._keys.isEmpty())) {
            throw new Loader.ConnectionError("Updating operations require keys");
        }
    }

    public String getNoise() {
        return this._noise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abort(RuntimeException t) {
        StreamLoader streamLoader = this;
        synchronized (streamLoader) {
            LOGGER.log(Level.WARNING, "Exception received. Aborting...", t);
            if (this._aborted.getAndSet(true)) {
                return;
            }
            if (t != null) {
                this._abortCause = t;
            }
            this.rollback();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isAborted() {
        StreamLoader streamLoader = this;
        synchronized (streamLoader) {
            return this._aborted.get();
        }
    }

    @Override
    public void rollback() {
        LOGGER.log(Level.FINER, "Rollback");
        try {
            this.terminate();
            LOGGER.log(Level.WARNING, "Rollback");
            this._processConn.createStatement().execute("rollback");
        }
        catch (SQLException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage(), ex);
        }
    }

    @Override
    public void submitRow(Object[] row) {
        try {
            if (this._aborted.get()) {
                if (this._listener.throwOnError()) {
                    throw this._abortCause;
                }
                return;
            }
        }
        catch (Exception ex) {
            this.abort(new Loader.ConnectionError("Thowing Error", Utils.getCause(ex)));
        }
        byte[] data = null;
        try {
            if (!this._active.get()) {
                LOGGER.log(Level.WARNING, "Inactive loader. Row ignored");
                return;
            }
            data = this.createCSVRecord(row);
        }
        catch (Exception ex) {
            this.abort(new Loader.ConnectionError("Creating data set for CSV", Utils.getCause(ex)));
        }
        try {
            this.writeBytes(data);
            this._listener.addSubmittedRowCount(1);
            if (this._listener.needSuccessRecords()) {
                this._listener.recordProvided(this._op, row);
            }
        }
        catch (Exception ex) {
            this.abort(new Loader.ConnectionError("Writing Bytes to CSV files", Utils.getCause(ex)));
        }
        if (this._batchRowSize > 0L && this._listener.getSubmittedRowCount() > 0 && (long)this._listener.getSubmittedRowCount() % this._batchRowSize == 0L) {
            LOGGER.log(Level.FINER, String.format("Flushing Queue: Submitted Row Count: %s, Batch Row Size: %s", this._listener.getSubmittedRowCount(), this._batchRowSize));
            try {
                this.flushQueues();
            }
            catch (Exception ex) {
                this.abort(new Loader.ConnectionError("Flush Queues", Utils.getCause(ex)));
            }
            try {
                this.initQueues();
            }
            catch (Exception ex) {
                this.abort(new Loader.ConnectionError("Init Queues", Utils.getCause(ex)));
            }
        }
    }

    private void initQueues() {
        LOGGER.log(Level.FINER, "Init Queues");
        if (this._active.getAndSet(true)) {
            return;
        }
        this._queuePut = new ArrayBlockingQueue(48);
        this._queueProcess = new ArrayBlockingQueue(48);
        this._put = new PutQueue(this);
        this._process = new ProcessQueue(this);
        this._queueData = new ArrayBlockingQueue(1024);
        this._thread = new Thread(this);
        this._thread.setName("StreamLoaderThread");
        this._thread.start();
        this._stage = new BufferStage(this, this._op, this._csvFileBucketSize, this._csvFileSize);
    }

    private void flushQueues() {
        LOGGER.log(Level.FINER, "Flush Queues");
        try {
            this._queueData.put(new byte[0]);
            this._thread.join(10000L);
            if (this._thread.isAlive()) {
                this._thread.interrupt();
            }
        }
        catch (Exception ex) {
            String msg = "Failed to join StreamLoader queue: " + ex.getMessage();
            LOGGER.log(Level.SEVERE, String.format(msg, new Object[0]), ex);
            throw new Loader.DataError(msg, Utils.getCause(ex));
        }
        this.terminate();
        this._put.join();
        this._process.join();
        if (this._aborted.get()) {
            throw this._abortCause;
        }
    }

    private void writeBytes(byte[] data) throws IOException, InterruptedException {
        if (this._aborted.get()) {
            return;
        }
        boolean full = this._stage.stageData(data, true);
        if (full && !this._oneBatch) {
            this.queuePut(this._stage);
            this._stage = new BufferStage(this, this._op, this._csvFileBucketSize, this._csvFileSize);
        }
    }

    void truncateTargetTable() {
        try {
            this._processConn.createStatement().execute("DELETE FROM " + this.getFullTableName());
        }
        catch (SQLException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage(), ex);
            this.abort(new Loader.ConnectionError(Utils.getCause(ex)));
        }
    }

    @Override
    public void run() {
        try {
            byte[] data;
            while ((data = this._queueData.take()).length != 0) {
                this.writeBytes(data);
            }
        }
        catch (Exception ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage(), ex);
            this.abort(new Loader.ConnectionError(Utils.getCause(ex)));
        }
    }

    private byte[] createCSVRecord(Object[] data) throws Exception {
        StringBuilder sb = new StringBuilder(1024);
        for (int i = 0; i < data.length; ++i) {
            if (i > 0) {
                sb.append(',');
            }
            sb.append(SnowflakeType.escapeForCSV(SnowflakeType.lexicalValue(data[i], this._useLocalTimezone)));
        }
        return sb.toString().getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public void finish() throws Exception {
        LOGGER.log(Level.FINER, "Finish Loading");
        this.flushQueues();
        if (this._is_last_finish_call) {
            try {
                if (this._after != null) {
                    LOGGER.log(Level.INFO, "Running Execute After SQL");
                    this._processConn.createStatement().execute(this._after);
                }
                this._processConn.createStatement().execute("commit");
                LOGGER.log(Level.INFO, "Committed");
            }
            catch (SQLException ex) {
                try {
                    this._processConn.createStatement().execute("rollback");
                }
                catch (SQLException ex0) {
                    LOGGER.log(Level.WARNING, "Failed to rollback");
                }
                LOGGER.log(Level.WARNING, String.format("Execute After SQL failed to run: %s", this._after), ex);
                throw new Loader.ConnectionError(Utils.getCause(ex));
            }
        }
    }

    @Override
    public void close() {
        LOGGER.log(Level.INFO, "Close Loader");
        try {
            this._processConn.close();
            this._putConn.close();
        }
        catch (SQLException ex) {
            LOGGER.log(Level.SEVERE, ex.getMessage(), ex);
            throw new Loader.ConnectionError(Utils.getCause(ex));
        }
    }

    private void terminate() {
        LOGGER.log(Level.FINER, "Terminate Loader");
        boolean active = this._active.getAndSet(false);
        if (!active) {
            return;
        }
        if (this._stage == null) {
            this._stage = new BufferStage(this, Operation.INSERT, this._csvFileBucketSize, this._csvFileSize);
        }
        this._stage.setTerminate(true);
        try {
            this.queuePut(this._stage);
        }
        catch (InterruptedException ex) {
            LOGGER.log(Level.SEVERE, "Unknown Error", ex);
        }
        LOGGER.log(Level.FINE, "Snowflake loader terminating");
    }

    @Override
    public void resetOperation(Operation op) {
        LOGGER.log(Level.FINER, "Reset Loader");
        if (op.equals((Object)this._op)) {
            return;
        }
        LOGGER.finer(String.format("Operation is changing from %s to %s", new Object[]{this._op, op}));
        this._op = op;
        if (this._stage != null) {
            try {
                this.queuePut(this._stage);
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, this._stage.getId(), ex);
            }
        }
        this._stage = new BufferStage(this, this._op, this._csvFileBucketSize, this._csvFileSize);
    }

    String getTable() {
        return this._table;
    }

    String getBase() {
        return BASE;
    }

    Connection getPutConnection() {
        return this._putConn;
    }

    Connection getProcessConnection() {
        return this._processConn;
    }

    String getRemoteStage() {
        return this._remoteStage;
    }

    List<String> getKeys() {
        return this._keys;
    }

    List<String> getColumns() {
        return this._columns;
    }

    String getColumnsAsString() {
        StringBuilder sb = new StringBuilder("\"");
        for (int i = 0; i < this._columns.size(); ++i) {
            if (i > 0) {
                sb.append("\",\"");
            }
            sb.append(this._columns.get(i));
        }
        sb.append("\"");
        return sb.toString();
    }

    public String getFullTableName() {
        return (this._database == null ? "" : "\"" + this._database + "\".") + (this._schema == null ? "" : "\"" + this._schema + "\".") + "\"" + this._table + "\"";
    }

    @Override
    public LoadResultListener getListener() {
        return this._listener;
    }

    @Override
    public void setListener(LoadResultListener _listener) {
        this._listener = _listener;
    }

    void queuePut(BufferStage stage) throws InterruptedException {
        this._queuePut.put(stage);
    }

    BufferStage takePut() throws InterruptedException {
        return this._queuePut.take();
    }

    void queueProcess(BufferStage stage) throws InterruptedException {
        this._queueProcess.put(stage);
    }

    BufferStage takeProcess() throws InterruptedException {
        return this._queueProcess.take();
    }

    int throttleUp() {
        int open = this._throttleCounter.incrementAndGet();
        LOGGER.log(Level.FINER, String.format("Throttle Up: %s", open));
        if (open > 8) {
            LOGGER.log(Level.INFO, "Will retry scheduling file for upload after " + Math.pow(2.0, open - 7) + " seconds");
            try {
                Thread.sleep(1000 * (int)Math.pow(2.0, open - 7));
            }
            catch (InterruptedException ex) {
                LOGGER.log(Level.SEVERE, "Exception occurs while waiting", ex);
            }
        }
        return open;
    }

    int throttleDown() {
        int throttleLevel = this._throttleCounter.decrementAndGet();
        LOGGER.log(Level.FINER, String.format("Throttle Down: %s", throttleLevel));
        if (throttleLevel < 0) {
            LOGGER.log(Level.WARNING, "Unbalanced throttle");
            this._throttleCounter.set(0);
        }
        LOGGER.log(Level.FINER, String.format("Connector throttle %s", throttleLevel));
        return throttleLevel;
    }

    public void setTestMode(boolean mode) {
        this._testMode = mode;
    }
}

