/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.benchmark.framework;

import com.facebook.airlift.event.client.EventClient;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.benchmark.event.BenchmarkPhaseEvent;
import com.facebook.presto.benchmark.event.BenchmarkQueryEvent;
import com.facebook.presto.benchmark.framework.AbstractPhaseExecutor;
import com.facebook.presto.benchmark.framework.BenchmarkQuery;
import com.facebook.presto.benchmark.framework.BenchmarkRunnerConfig;
import com.facebook.presto.benchmark.framework.BenchmarkSuite;
import com.facebook.presto.benchmark.framework.ConcurrentExecutionPhase;
import com.facebook.presto.benchmark.prestoaction.PrestoActionFactory;
import com.facebook.presto.sql.parser.ParsingOptions;
import com.facebook.presto.sql.parser.SqlParser;
import com.google.inject.Inject;
import java.util.EnumMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentPhaseExecutor
extends AbstractPhaseExecutor<ConcurrentExecutionPhase> {
    private static final int DEFAULT_MAX_CONCURRENCY = 70;
    private static final Logger log = Logger.get(ConcurrentPhaseExecutor.class);
    private final boolean continueOnFailure;
    private final Optional<Integer> maxConcurrency;

    @Inject
    public ConcurrentPhaseExecutor(SqlParser sqlParser, ParsingOptions parsingOptions, PrestoActionFactory prestoActionFactory, Set<EventClient> eventClients, BenchmarkRunnerConfig config) {
        super(sqlParser, parsingOptions, prestoActionFactory, eventClients, config.getTestId());
        this.continueOnFailure = config.isContinueOnFailure();
        this.maxConcurrency = Objects.requireNonNull(config.getMaxConcurrency(), "maxConcurrency is null");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public BenchmarkPhaseEvent runPhase(ConcurrentExecutionPhase phase, BenchmarkSuite suite) {
        int maxConcurrency = this.maxConcurrency.orElseGet(() -> phase.getMaxConcurrency().orElse(70));
        log.info("Starting concurrent phase '%s' with max concurrency %s", new Object[]{phase.getName(), maxConcurrency});
        ExecutorService executor = Executors.newFixedThreadPool(maxConcurrency);
        try {
            ExecutorCompletionService<BenchmarkQueryEvent> completionService = new ExecutorCompletionService<BenchmarkQueryEvent>(executor);
            for (String queryName : phase.getQueries()) {
                BenchmarkQuery benchmarkQuery = ConcurrentPhaseExecutor.overrideSessionProperties(suite.getQueries().get(queryName), suite.getSessionProperties());
                completionService.submit(() -> this.runQuery(benchmarkQuery));
            }
            BenchmarkPhaseEvent benchmarkPhaseEvent = this.reportProgressUntilFinished(phase, completionService);
            return benchmarkPhaseEvent;
        }
        finally {
            executor.shutdownNow();
        }
    }

    private BenchmarkPhaseEvent reportProgressUntilFinished(ConcurrentExecutionPhase phase, CompletionService<BenchmarkQueryEvent> completionService) {
        String phaseName = phase.getName();
        int completed = 0;
        double lastProgress = 0.0;
        int queriesSubmitted = phase.getQueries().size();
        EnumMap<BenchmarkQueryEvent.Status, Integer> statusCount = new EnumMap<BenchmarkQueryEvent.Status, Integer>(BenchmarkQueryEvent.Status.class);
        while (completed < queriesSubmitted) {
            try {
                BenchmarkQueryEvent event = completionService.take().get();
                this.postEvent(event);
                ++completed;
                statusCount.compute(event.getEventStatus(), (status, count) -> count == null ? 1 : count + 1);
                if (event.getEventStatus() == BenchmarkQueryEvent.Status.FAILED && !this.continueOnFailure) {
                    return BenchmarkPhaseEvent.failed(phaseName, event.getErrorMessage());
                }
                double progress = (double)completed / (double)queriesSubmitted * 100.0;
                if (!(progress - lastProgress > 0.5) && completed != queriesSubmitted) continue;
                log.info("Progress: %s succeeded, %s failed, %s submitted, %.2f%% done", new Object[]{statusCount.getOrDefault((Object)BenchmarkQueryEvent.Status.SUCCEEDED, 0), statusCount.getOrDefault((Object)BenchmarkQueryEvent.Status.FAILED, 0), queriesSubmitted, progress});
                lastProgress = progress;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (this.continueOnFailure) continue;
                return BenchmarkPhaseEvent.failed(phaseName, e.toString());
            }
            catch (ExecutionException e) {
                if (this.continueOnFailure) continue;
                return BenchmarkPhaseEvent.failed(phaseName, e.toString());
            }
        }
        if (statusCount.getOrDefault((Object)BenchmarkQueryEvent.Status.FAILED, 0) > 0) {
            return BenchmarkPhaseEvent.completedWithFailures(phaseName, String.format("%s out of %s submitted queries failed", statusCount.get((Object)BenchmarkQueryEvent.Status.FAILED), queriesSubmitted));
        }
        return BenchmarkPhaseEvent.succeeded(phaseName);
    }
}

