/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.runtime.asyncprocessing.AsyncRequestContainer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.state.forst.ForStDBGetRequest;
import org.apache.flink.state.forst.ForStDBIterRequest;
import org.apache.flink.state.forst.ForStDBOperation;
import org.apache.flink.state.forst.ForStDBPutRequest;
import org.apache.flink.state.forst.ForStGeneralMultiGetOperation;
import org.apache.flink.state.forst.ForStIterateOperation;
import org.apache.flink.state.forst.ForStStateRequestClassifier;
import org.apache.flink.state.forst.ForStWriteBatchOperation;
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.forstdb.RocksDB;
import org.forstdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStStateExecutor
implements StateExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ForStStateExecutor.class);
    private final ExecutorService coordinatorThread;
    private final ExecutorService readThreads;
    private final ExecutorService writeThreads;
    private final int readThreadCount;
    private final boolean sharedWriteThread;
    private final RocksDB db;
    private final WriteOptions writeOptions;
    private Throwable executionError;
    private final AtomicLong ongoing;
    private final ExecutorService directExecutor = org.apache.flink.util.concurrent.Executors.newDirectExecutorService();

    public ForStStateExecutor(boolean coordinatorInline, boolean isWriteInline, int readIoParallelism, int writeIoParallelism, RocksDB db, WriteOptions writeOptions) {
        if (isWriteInline) {
            Preconditions.checkState((readIoParallelism > 0 ? 1 : 0) != 0);
            this.coordinatorThread = coordinatorInline ? this.directExecutor : Executors.newSingleThreadExecutor((ThreadFactory)((Object)new ForStExecutorThreadFactory("ForSt-StateExecutor-Coordinator-And-Write", FileBasedCache::setFlinkThread)));
            this.readThreadCount = readIoParallelism;
            this.readThreads = Executors.newFixedThreadPool(readIoParallelism, (ThreadFactory)((Object)new ForStExecutorThreadFactory("ForSt-StateExecutor-read-IO", FileBasedCache::setFlinkThread)));
            this.writeThreads = this.directExecutor;
            this.sharedWriteThread = true;
        } else {
            Preconditions.checkState((readIoParallelism > 0 || writeIoParallelism > 0 ? 1 : 0) != 0);
            ExecutorService executorService = this.coordinatorThread = coordinatorInline ? this.directExecutor : Executors.newSingleThreadExecutor((ThreadFactory)((Object)new ForStExecutorThreadFactory("ForSt-StateExecutor-Coordinator", FileBasedCache::setFlinkThread)));
            if (readIoParallelism <= 0 || writeIoParallelism <= 0) {
                this.readThreadCount = Math.max(readIoParallelism, writeIoParallelism);
                this.writeThreads = this.readThreads = Executors.newFixedThreadPool(this.readThreadCount, (ThreadFactory)((Object)new ForStExecutorThreadFactory("ForSt-StateExecutor-IO", FileBasedCache::setFlinkThread)));
                this.sharedWriteThread = true;
            } else {
                this.readThreadCount = readIoParallelism;
                this.readThreads = Executors.newFixedThreadPool(readIoParallelism, (ThreadFactory)((Object)new ForStExecutorThreadFactory("ForSt-StateExecutor-read-IO", FileBasedCache::setFlinkThread)));
                this.writeThreads = Executors.newFixedThreadPool(writeIoParallelism, (ThreadFactory)((Object)new ForStExecutorThreadFactory("ForSt-StateExecutor-write-IO", FileBasedCache::setFlinkThread)));
                this.sharedWriteThread = false;
            }
        }
        this.db = db;
        this.writeOptions = writeOptions;
        this.ongoing = new AtomicLong();
    }

    public CompletableFuture<Void> executeBatchRequests(AsyncRequestContainer asyncRequestContainer) {
        this.checkState();
        Preconditions.checkArgument((boolean)(asyncRequestContainer instanceof ForStStateRequestClassifier));
        ForStStateRequestClassifier stateRequestClassifier = (ForStStateRequestClassifier)asyncRequestContainer;
        List<ForStDBGetRequest<?, ?, ?, ?>> getRequests = stateRequestClassifier.pollDbGetRequests();
        List<ForStDBIterRequest<?, ?, ?, ?, ?>> iterRequests = stateRequestClassifier.pollDbIterRequests();
        if (!getRequests.isEmpty()) {
            this.ongoing.addAndGet(1L);
        }
        if (!iterRequests.isEmpty()) {
            this.ongoing.addAndGet(1L);
        }
        CompletableFuture<Void> resultFuture = new CompletableFuture<Void>();
        this.coordinatorThread.execute(() -> {
            List<ForStDBPutRequest<?, ?, ?>> putRequests;
            long startTime = System.currentTimeMillis();
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(3);
            if (!getRequests.isEmpty()) {
                ForStGeneralMultiGetOperation getOperations = new ForStGeneralMultiGetOperation(this.db, getRequests, this.readThreads, this.readThreadCount, this.ongoing::decrementAndGet);
                this.ongoing.addAndGet(getOperations.subProcessCount() - 1);
                futures.add(getOperations.process());
            }
            if (!iterRequests.isEmpty()) {
                ForStIterateOperation iterOperations = new ForStIterateOperation(this.db, iterRequests, this.readThreads, this.ongoing::decrementAndGet);
                this.ongoing.addAndGet(iterOperations.subProcessCount() - 1);
                futures.add(iterOperations.process());
            }
            if (!(putRequests = stateRequestClassifier.pollDbPutRequests()).isEmpty()) {
                ForStWriteBatchOperation writeOperations = new ForStWriteBatchOperation(this.db, putRequests, this.writeOptions, this.writeThreads);
                futures.add(writeOperations.process());
            }
            FutureUtils.combineAll(futures).thenAcceptAsync(e -> {
                long duration = System.currentTimeMillis() - startTime;
                LOG.debug("Complete executing a batch of state requests, putRequest size {}, getRequest size {}, iterRequest size {}, duration {} ms", new Object[]{putRequests.size(), getRequests.size(), iterRequests.size(), duration});
                resultFuture.complete(null);
            }, (Executor)this.coordinatorThread).exceptionally(e -> {
                try {
                    for (ForStDBIterRequest iterRequest : iterRequests) {
                        iterRequest.close();
                    }
                }
                catch (IOException ioException) {
                    LOG.error("Close iterRequests fail", (Throwable)ioException);
                }
                this.executionError = e;
                resultFuture.completeExceptionally((Throwable)e);
                return null;
            });
        });
        return resultFuture;
    }

    public AsyncRequestContainer<StateRequest<?, ?, ?, ?>> createRequestContainer() {
        this.checkState();
        return new ForStStateRequestClassifier();
    }

    public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
        this.checkState();
        Object forstRequest = ForStStateRequestClassifier.convertRequests(stateRequest);
        try {
            ForStDBOperation operation;
            if (forstRequest instanceof ForStDBGetRequest) {
                operation = new ForStGeneralMultiGetOperation(this.db, Collections.singletonList((ForStDBGetRequest)forstRequest), this.directExecutor, 1, null);
            } else if (forstRequest instanceof ForStDBIterRequest) {
                operation = new ForStIterateOperation(this.db, Collections.singletonList((ForStDBIterRequest)forstRequest), this.directExecutor, null);
            } else if (forstRequest instanceof ForStDBPutRequest) {
                operation = new ForStWriteBatchOperation(this.db, Collections.singletonList((ForStDBPutRequest)forstRequest), this.writeOptions, this.directExecutor);
            } else {
                throw new IllegalArgumentException("Unknown request type: " + forstRequest);
            }
            operation.process().exceptionally(throwable -> {
                this.executionError = throwable;
                return null;
            });
        }
        catch (Exception e) {
            this.executionError = e;
        }
        this.checkState();
    }

    public boolean fullyLoaded() {
        return this.ongoing.get() >= (long)this.readThreadCount;
    }

    private void checkState() {
        if (this.executionError != null) {
            throw new IllegalStateException("previous state request already failed : ", this.executionError);
        }
    }

    public void shutdown() {
        this.shutdownAndWait(this.coordinatorThread);
        this.shutdownAndWait(this.readThreads);
        if (!this.sharedWriteThread) {
            this.shutdownAndWait(this.writeThreads);
        }
        LOG.info("Shutting down the ForStStateExecutor.");
    }

    private void shutdownAndWait(ExecutorService executorService) {
        try {
            executorService.shutdown();
            while (!executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
            }
        }
        catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }

    private static class ForStExecutorThreadFactory
    extends ExecutorThreadFactory {
        private final Runnable initializer;

        public ForStExecutorThreadFactory(String name, Runnable initializer) {
            super(name);
            this.initializer = initializer;
        }

        public Thread newThread(Runnable runnable) {
            return super.newThread(() -> {
                this.initializer.run();
                runnable.run();
            });
        }
    }
}

