/*
 * Decompiled with CFR 0.152.
 */
package com.snowflake.client.jdbc;

import com.snowflake.client.core.HttpUtil;
import com.snowflake.client.jdbc.ErrorCode;
import com.snowflake.client.jdbc.RestRequest;
import com.snowflake.client.jdbc.SnowflakeResultChunk;
import com.snowflake.client.jdbc.SnowflakeSQLException;
import com.snowflake.client.jdbc.SnowflakeUtil;
import com.snowflake.client.jdbc.internal.apache.http.Header;
import com.snowflake.client.jdbc.internal.apache.http.HttpEntity;
import com.snowflake.client.jdbc.internal.apache.http.HttpResponse;
import com.snowflake.client.jdbc.internal.apache.http.client.HttpClient;
import com.snowflake.client.jdbc.internal.apache.http.client.methods.HttpGet;
import com.snowflake.client.jdbc.internal.apache.http.client.utils.URIBuilder;
import com.snowflake.client.jdbc.internal.apache.http.entity.ContentType;
import com.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonFactory;
import com.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonParser;
import com.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonToken;
import com.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import com.snowflake.client.jdbc.internal.fasterxml.jackson.databind.MappingJsonFactory;
import com.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import com.snowflake.client.jdbc.internal.google.common.base.Charsets;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.SequenceInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPInputStream;

public class SnowflakeChunkDownloader {
    private static final String SSE_C_ALGORITHM = "x-amz-server-side-encryption-customer-algorithm";
    private static final String SSE_C_KEY = "x-amz-server-side-encryption-customer-key";
    private static final String SSE_C_AES = "AES256";
    static final ObjectMapper mapper = new ObjectMapper();
    private static final JsonFactory jsonFactory = new MappingJsonFactory();
    static final Logger logger = Logger.getLogger(SnowflakeChunkDownloader.class.getName());
    private List<SnowflakeResultChunk> chunks = null;
    private int nextChunkToConsume = 0;
    private int nextChunkToDownload = 0;
    private static int DEFAULT_PREFETCH_THREADS = 1;
    private int prefetchThreads = DEFAULT_PREFETCH_THREADS;
    private static int DEFAULT_PREFETCH_SLOTS = 2;
    private int prefetchSlots = DEFAULT_PREFETCH_SLOTS;
    private boolean useJsonParser = false;
    private ThreadPoolExecutor executor;
    private long numberMillisWaitingForChunks = 0L;
    private boolean terminated = false;
    private long totalMillisDownloadingChunks = 0L;
    private long totalMillisParsingChunks = 0L;
    private String qrmk = null;
    private Map chunkHeadersMap = null;
    private int networkTimeoutInMilli;

    public static ThreadPoolExecutor createChunkDownloaderExecutorService(final String threadNamePrefix, int parallel) {
        ThreadFactory threadFactory = new ThreadFactory(){
            private int threadCount = 1;

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(threadNamePrefix + this.threadCount++);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        logger.log(Level.SEVERE, "uncaughtException in thread: " + t + " {0}", e);
                    }
                });
                return thread;
            }
        };
        return (ThreadPoolExecutor)Executors.newFixedThreadPool(parallel, threadFactory);
    }

    public SnowflakeChunkDownloader(int colCount, JsonNode chunksData, Integer prefetchSlots, Integer prefetchThreads, String qrmk, JsonNode chunkHeaders, int networkTimeoutInMilli, Boolean useJsonParser) {
        this.qrmk = qrmk;
        this.networkTimeoutInMilli = networkTimeoutInMilli;
        logger.log(Level.FINE, "qrmk = {0}", qrmk);
        if (chunkHeaders != null && !chunkHeaders.isMissingNode()) {
            this.chunkHeadersMap = new HashMap(2);
            Iterator<Map.Entry<String, JsonNode>> chunkHeadersIter = chunkHeaders.fields();
            while (chunkHeadersIter.hasNext()) {
                Map.Entry<String, JsonNode> chunkHeader = chunkHeadersIter.next();
                logger.log(Level.FINE, "add header key={0}, value={1}", new Object[]{chunkHeader.getKey(), chunkHeader.getValue().asText()});
                this.chunkHeadersMap.put(chunkHeader.getKey(), chunkHeader.getValue().asText());
            }
        }
        if (chunksData == null) {
            logger.log(Level.INFO, "no chunk data");
            return;
        }
        int numChunks = chunksData.size();
        this.chunks = new ArrayList<SnowflakeResultChunk>(numChunks);
        for (int idx = 0; idx < numChunks; ++idx) {
            JsonNode chunkNode = chunksData.get(idx);
            String url = chunkNode.path("url").asText();
            int rowCount = chunkNode.path("rowCount").asInt();
            logger.log(Level.INFO, "add chunk, url={0} rowCount={1}", new Object[]{url, rowCount});
            this.chunks.add(new SnowflakeResultChunk(url, rowCount, colCount));
        }
        if (prefetchSlots != null) {
            this.prefetchSlots = prefetchSlots;
        }
        if (prefetchThreads != null) {
            this.prefetchThreads = prefetchThreads;
        }
        if (useJsonParser != null) {
            this.useJsonParser = useJsonParser;
        }
        int effectiveThreads = Math.min(this.prefetchThreads, numChunks);
        logger.log(Level.INFO, "#chunks: {0} #threads:{1} #slots:{2} -> pool:{3}", new Object[]{numChunks, prefetchThreads, prefetchSlots, effectiveThreads});
        this.executor = SnowflakeChunkDownloader.createChunkDownloaderExecutorService("result-chunk-downloader-", effectiveThreads);
        int numberChunksToPrefetch = Math.min(this.prefetchSlots, this.chunks.size());
        logger.log(Level.INFO, "Submit {0} chunks to be pre-fetched", numberChunksToPrefetch);
        while (this.nextChunkToDownload < numberChunksToPrefetch) {
            logger.log(Level.INFO, "submit chunk #{0} for downloading, url={1}", new Object[]{this.nextChunkToDownload, this.chunks.get(this.nextChunkToDownload).getUrl()});
            this.executor.submit(SnowflakeChunkDownloader.getDownloadChunkCallable(this, this.chunks.get(this.nextChunkToDownload), qrmk, this.nextChunkToDownload, this.chunkHeadersMap, networkTimeoutInMilli));
            ++this.nextChunkToDownload;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SnowflakeResultChunk getNextChunkToConsume() throws InterruptedException, SnowflakeSQLException {
        int currentChunkIdx;
        SnowflakeResultChunk currentChunk;
        if (this.nextChunkToConsume > 0) {
            int prevChunk = this.nextChunkToConsume - 1;
            logger.log(Level.INFO, "free chunk data for chunk #{0}", prevChunk);
            if (this.nextChunkToDownload < this.chunks.size()) {
                logger.log(Level.INFO, "submit chunk #{0} for downloading", new Object[]{this.nextChunkToDownload, this.chunks.get(this.nextChunkToDownload).getUrl()});
                this.chunks.get(this.nextChunkToDownload).setRowSet(this.chunks.get(prevChunk).getRowset());
                this.executor.submit(SnowflakeChunkDownloader.getDownloadChunkCallable(this, this.chunks.get(this.nextChunkToDownload), this.qrmk, this.nextChunkToDownload, this.chunkHeadersMap, this.networkTimeoutInMilli));
                ++this.nextChunkToDownload;
            }
            this.chunks.get(prevChunk).setResultData(null);
            this.chunks.get(prevChunk).setRowSet(null);
        }
        if (this.nextChunkToConsume >= this.chunks.size()) {
            logger.log(Level.INFO, "no more chunk");
            return null;
        }
        if ((currentChunk = this.chunks.get(currentChunkIdx = this.nextChunkToConsume++)).getDownloadState() == SnowflakeResultChunk.DownloadState.SUCCESS) {
            logger.log(Level.INFO, "chunk #{0} is ready to consume", currentChunkIdx);
            return currentChunk;
        }
        try {
            logger.log(Level.INFO, "chunk #{0} is not ready to consume", currentChunkIdx);
            currentChunk.getLock().lock();
            logger.log(Level.INFO, "consumer get lock to check chunk state");
            while (currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.SUCCESS && currentChunk.getDownloadState() != SnowflakeResultChunk.DownloadState.FAILURE) {
                logger.log(Level.INFO, "wait for chunk #{0} to be ready, currentchunk state is: {1}", new Object[]{currentChunkIdx, currentChunk.getDownloadState()});
                long startTime = System.currentTimeMillis();
                currentChunk.getDownloadCondition().await();
                this.numberMillisWaitingForChunks += System.currentTimeMillis() - startTime;
                logger.log(Level.INFO, "woken up from waiting for chunk #{0} to be ready", currentChunkIdx);
            }
            if (currentChunk.getDownloadState() == SnowflakeResultChunk.DownloadState.FAILURE) {
                logger.log(Level.SEVERE, "downloader encountered error: {0}", currentChunk.getDownloadError());
                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), currentChunk.getDownloadError());
            }
            logger.log(Level.INFO, "chunk #{0} is ready to consume", currentChunkIdx);
            SnowflakeResultChunk snowflakeResultChunk = currentChunk;
            return snowflakeResultChunk;
        }
        finally {
            logger.log(Level.INFO, "consumer free lock");
            currentChunk.getLock().unlock();
        }
    }

    public void terminate() {
        if (!this.terminated) {
            logger.log(Level.INFO, "Total milliseconds waiting for chunks: {0}, Total memory used: {1}, total download time: {2} millisec, total parsing time: {3} milliseconds, total chunks: {4}", new Object[]{this.numberMillisWaitingForChunks, Runtime.getRuntime().totalMemory(), this.totalMillisDownloadingChunks, this.totalMillisParsingChunks, this.chunks.size()});
            this.executor.shutdown();
            this.terminated = true;
        }
    }

    public void addDownloadTime(long downloadTime) {
        this.totalMillisDownloadingChunks += downloadTime;
    }

    public void addParsingTime(long parsingTime) {
        this.totalMillisParsingChunks += parsingTime;
    }

    public static Callable<Void> getDownloadChunkCallable(final SnowflakeChunkDownloader downloader, final SnowflakeResultChunk resultChunk, final String qrmk, final int chunkIndex, final Map<String, String> chunkHeadersMap, final int networkTimeoutInMilli) {
        return new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws Exception {
                try {
                    try {
                        resultChunk.getLock().lock();
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.IN_PROGRESS);
                    }
                    finally {
                        resultChunk.getLock().unlock();
                    }
                    logger.log(Level.INFO, "Downloading chunk {0}, url={1}", new Object[]{chunkIndex, resultChunk.getUrl()});
                    long startTime = System.currentTimeMillis();
                    URIBuilder uriBuilder = new URIBuilder(resultChunk.getUrl());
                    HttpGet httpRequest = new HttpGet(uriBuilder.build());
                    if (chunkHeadersMap != null && chunkHeadersMap.size() != 0) {
                        for (Map.Entry entry : chunkHeadersMap.entrySet()) {
                            logger.log(Level.FINE, "Adding header key={0}, value={1}", new Object[]{entry.getKey(), entry.getValue()});
                            httpRequest.addHeader((String)entry.getKey(), (String)entry.getValue());
                        }
                    } else if (qrmk != null) {
                        httpRequest.addHeader(SnowflakeChunkDownloader.SSE_C_ALGORITHM, SnowflakeChunkDownloader.SSE_C_AES);
                        httpRequest.addHeader(SnowflakeChunkDownloader.SSE_C_KEY, qrmk);
                        logger.log(Level.FINE, "Adding SSE-C headers");
                    }
                    logger.log(Level.FINE, "Fetching result: {0}", resultChunk.getUrl());
                    HttpClient httpClient = HttpUtil.getHttpClient();
                    HttpResponse response = RestRequest.execute(httpClient, httpRequest, networkTimeoutInMilli / 1000, 0, null);
                    logger.log(Level.INFO, "Call returned for URL: {0}", resultChunk.getUrl());
                    if (response == null || response.getStatusLine().getStatusCode() != 200) {
                        logger.log(Level.SEVERE, "Error fetching chunk from: {0}", resultChunk.getUrl());
                        SnowflakeUtil.logResponseDetails(response, logger);
                        throw new SnowflakeSQLException("58030", ErrorCode.NETWORK_ERROR.getMessageCode(), "Error encountered when downloading a result chunk: HTTP status=" + (response != null ? Integer.valueOf(response.getStatusLine().getStatusCode()) : "null response"));
                    }
                    BufferedReader bufferedReader = null;
                    HttpEntity entity = response.getEntity();
                    try {
                        ContentType contentType = ContentType.getOrDefault(response.getEntity());
                        Charset charSet = contentType.getCharset();
                        if (charSet == null) {
                            charSet = Charsets.UTF_8;
                        }
                        InputStream is = new HttpUtil.HttpInputStream(entity.getContent());
                        Header encoding = response.getFirstHeader("Content-Encoding");
                        if (encoding != null) {
                            if (encoding.getValue().equalsIgnoreCase("gzip")) {
                                is = new GZIPInputStream(is, 65536);
                            } else {
                                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: unexpected compression got " + encoding.getValue());
                            }
                        }
                        bufferedReader = new BufferedReader(new InputStreamReader((InputStream)new SequenceInputStream(Collections.enumeration(Arrays.asList(new ByteArrayInputStream("[".getBytes()), is, new ByteArrayInputStream("]".getBytes())))), charSet));
                    }
                    catch (Exception ex) {
                        logger.log(Level.SEVERE, "Failed to uncompress data: {0}", response);
                        throw ex;
                    }
                    resultChunk.setDownloadTime(System.currentTimeMillis() - startTime);
                    downloader.addDownloadTime(resultChunk.getDownloadTime());
                    startTime = System.currentTimeMillis();
                    if (logger.isLoggable(Level.FINER)) {
                        logger.log(Level.FINER, "Time: {0} Json response: {1}", new Object[]{System.currentTimeMillis(), response});
                    }
                    JsonNode resultData = null;
                    try {
                        if (downloader.useJsonParser) {
                            JsonParser jp = jsonFactory.createParser(bufferedReader);
                            if (resultChunk.rowSet == null) {
                                resultChunk.rowSet = new ArrayList(resultChunk.getRowCount());
                            } else if (resultChunk.rowSet.size() < resultChunk.getRowCount()) {
                                resultChunk.rowSet.ensureCapacity(resultChunk.getRowCount());
                            }
                            JsonToken curTok = jp.nextToken();
                            if (curTok != JsonToken.START_ARRAY) {
                                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception1: expected '[' got " + curTok.asString());
                            }
                            int currentRow = 0;
                            while (jp.nextToken() != JsonToken.END_ARRAY) {
                                Object[] row;
                                if (currentRow == resultChunk.rowSet.size()) {
                                    row = new Object[resultChunk.getColCount()];
                                    resultChunk.rowSet.add(row);
                                } else {
                                    row = resultChunk.rowSet.get(currentRow);
                                }
                                ++currentRow;
                                curTok = jp.getCurrentToken();
                                if (curTok != JsonToken.START_ARRAY) {
                                    throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception2: expected '[' got " + curTok.asString());
                                }
                                int colNo = 0;
                                while (jp.nextToken() != JsonToken.END_ARRAY) {
                                    curTok = jp.getCurrentToken();
                                    if (JsonToken.VALUE_STRING == curTok) {
                                        row[colNo++] = jp.getText();
                                        continue;
                                    }
                                    if (JsonToken.VALUE_NULL == curTok) {
                                        row[colNo++] = null;
                                        continue;
                                    }
                                    throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: unexpected data type got " + curTok.asString());
                                }
                                if (resultChunk.getColCount() == colNo) continue;
                                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: expected " + resultChunk.getColCount() + " columns and received " + colNo);
                            }
                            if (resultChunk.getRowCount() != currentRow) {
                                throw new SnowflakeSQLException("XX000", ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: expected " + resultChunk.getRowCount() + " rows and received " + currentRow);
                            }
                        } else {
                            resultData = mapper.readTree(bufferedReader);
                        }
                    }
                    catch (Exception ex) {
                        logger.log(Level.SEVERE, "Exception when parsing result", ex);
                        throw new SnowflakeSQLException(ex, "XX000", (int)ErrorCode.INTERNAL_ERROR.getMessageCode(), "Exception: " + ex.getLocalizedMessage() + "\nBad result json: " + response.toString());
                    }
                    finally {
                        if (bufferedReader != null) {
                            bufferedReader.close();
                        }
                    }
                    resultChunk.setParseTime(System.currentTimeMillis() - startTime);
                    downloader.addParsingTime(resultChunk.getParseTime());
                    resultChunk.setResultData(resultData);
                    if (logger.isLoggable(Level.INFO)) {
                        logger.log(Level.INFO, "Finished preparing chunk data for {0}, total download time={1}ms, total parse time={2}ms", new Object[]{resultChunk.getUrl(), resultChunk.getDownloadTime(), resultChunk.getParseTime()});
                    }
                    try {
                        resultChunk.getLock().lock();
                        logger.log(Level.FINE, "get lock to change the chunk to be ready to consume");
                        logger.log(Level.FINE, "wake up consumer if it is waiting for a chunk to be ready");
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.SUCCESS);
                        resultChunk.getDownloadCondition().signal();
                    }
                    finally {
                        logger.log(Level.FINE, "Downloaded chunk {0}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                    }
                }
                catch (Throwable ex) {
                    try {
                        logger.log(Level.INFO, "get lock to set chunk download error");
                        resultChunk.getLock().lock();
                        resultChunk.setDownloadState(SnowflakeResultChunk.DownloadState.FAILURE);
                        resultChunk.setDownloadError(ex.getLocalizedMessage());
                        logger.log(Level.INFO, "wake up consumer if it is waiting for a chunk to be ready");
                        resultChunk.getDownloadCondition().signal();
                    }
                    finally {
                        logger.log(Level.INFO, "Failed to download chunk {0}, free lock", chunkIndex);
                        resultChunk.getLock().unlock();
                    }
                    logger.log(Level.SEVERE, "Exception encountered ({0}:{1}) fetching chunk from: {2}", new Object[]{ex.getClass().getName(), ex.getLocalizedMessage(), resultChunk.getUrl()});
                    logger.log(Level.SEVERE, "Exception: ", ex);
                }
                return null;
            }
        };
    }
}

