/*
 * Decompiled with CFR 0.152.
 */
package com.databricks.jdbc.api.impl.arrow;

import com.databricks.internal.sdk.service.sql.BaseChunkInfo;
import com.databricks.jdbc.api.impl.arrow.AbstractArrowResultChunk;
import com.databricks.jdbc.api.impl.arrow.AbstractRemoteChunkProvider;
import com.databricks.jdbc.api.impl.arrow.ArrowResultChunk;
import com.databricks.jdbc.api.impl.arrow.ChunkDownloadTask;
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.common.CompressionCodec;
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
import com.databricks.jdbc.dbclient.impl.common.StatementId;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
import com.databricks.jdbc.model.client.thrift.generated.TSparkArrowResultLink;
import com.databricks.jdbc.model.core.ResultData;
import com.databricks.jdbc.model.core.ResultManifest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class RemoteChunkProvider
extends AbstractRemoteChunkProvider<ArrowResultChunk> {
    private static final String CHUNKS_DOWNLOADER_THREAD_POOL_PREFIX = "databricks-jdbc-chunks-downloader-";
    private ExecutorService chunkDownloaderExecutorService;

    RemoteChunkProvider(StatementId statementId, ResultManifest resultManifest, ResultData resultData, IDatabricksSession session, IDatabricksHttpClient httpClient, int chunksDownloaderThreadPoolSize) throws DatabricksSQLException {
        super(statementId, resultManifest, resultData, session, httpClient, chunksDownloaderThreadPoolSize, resultManifest.getResultCompression());
    }

    RemoteChunkProvider(IDatabricksStatementInternal parentStatement, TFetchResultsResp resultsResp, IDatabricksSession session, IDatabricksHttpClient httpClient, int chunksDownloaderThreadPoolSize, CompressionCodec compressionCodec) throws DatabricksSQLException {
        super(parentStatement, resultsResp, session, httpClient, chunksDownloaderThreadPoolSize, compressionCodec);
    }

    @Override
    protected ArrowResultChunk createChunk(StatementId statementId, long chunkIndex, BaseChunkInfo chunkInfo) throws DatabricksSQLException {
        return ArrowResultChunk.builder().withStatementId(statementId).withChunkInfo(chunkInfo).withChunkReadyTimeoutSeconds(this.chunkReadyTimeoutSeconds).build();
    }

    @Override
    protected ArrowResultChunk createChunk(StatementId statementId, long chunkIndex, TSparkArrowResultLink resultLink) throws DatabricksSQLException {
        return ArrowResultChunk.builder().withStatementId(statementId).withThriftChunkInfo(chunkIndex, resultLink).withChunkReadyTimeoutSeconds(this.chunkReadyTimeoutSeconds).build();
    }

    @Override
    public void downloadNextChunks() {
        if (this.chunkDownloaderExecutorService == null) {
            this.chunkDownloaderExecutorService = this.createChunksDownloaderExecutorService();
        }
        while (!this.isClosed && this.nextChunkToDownload < this.chunkCount && this.totalChunksInMemory < this.allowedChunksInMemory) {
            ArrowResultChunk chunk = (ArrowResultChunk)this.chunkIndexToChunksMap.get(this.nextChunkToDownload);
            this.chunkDownloaderExecutorService.submit(new ChunkDownloadTask(chunk, this.httpClient, this, this.linkDownloadService));
            ++this.totalChunksInMemory;
            ++this.nextChunkToDownload;
        }
    }

    @Override
    protected void doClose() {
        this.isClosed = true;
        this.chunkDownloaderExecutorService.shutdownNow();
        this.chunkIndexToChunksMap.values().forEach(AbstractArrowResultChunk::releaseChunk);
        DatabricksThreadContextHolder.clearStatementInfo();
    }

    private ExecutorService createChunksDownloaderExecutorService() {
        ThreadFactory threadFactory = new ThreadFactory(){
            private final AtomicInteger threadCount = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(RemoteChunkProvider.CHUNKS_DOWNLOADER_THREAD_POOL_PREFIX + this.threadCount.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        };
        return Executors.newFixedThreadPool(this.maxParallelChunkDownloadsPerQuery, threadFactory);
    }
}

