/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.server;

import com.facebook.presto.client.Column;
import com.facebook.presto.client.FailureInfo;
import com.facebook.presto.client.QueryError;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.client.StageStats;
import com.facebook.presto.client.StatementStats;
import com.facebook.presto.execution.BufferInfo;
import com.facebook.presto.execution.QueryId;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStats;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.StageState;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.operator.Page;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.TimeZoneKey;
import com.facebook.presto.spi.type.TimeZoneNotSupportedException;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.util.Failures;
import com.facebook.presto.util.IterableTransformer;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpUriBuilder;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.Closeable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

@Path(value="/v1/statement")
public class StatementResource {
    private static final Logger log = Logger.get(StatementResource.class);
    private static final Duration MAX_WAIT_TIME = new Duration(1.0, TimeUnit.SECONDS);
    private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
    private static final long DESIRED_RESULT_BYTES = new DataSize(1.0, DataSize.Unit.MEGABYTE).toBytes();
    private final QueryManager queryManager;
    private final Supplier<ExchangeClient> exchangeClientSupplier;
    private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<QueryId, Query>();
    private final ScheduledExecutorService queryPurger = Executors.newSingleThreadScheduledExecutor(Threads.threadsNamed((String)"query-purger"));

    @Inject
    public StatementResource(QueryManager queryManager, Supplier<ExchangeClient> exchangeClientSupplier) {
        this.queryManager = (QueryManager)Preconditions.checkNotNull((Object)queryManager, (Object)"queryManager is null");
        this.exchangeClientSupplier = (Supplier)Preconditions.checkNotNull(exchangeClientSupplier, (Object)"exchangeClientSupplier is null");
        this.queryPurger.scheduleWithFixedDelay(new PurgeQueriesRunnable(this.queries, queryManager), 200L, 200L, TimeUnit.MILLISECONDS);
    }

    @PreDestroy
    public void stop() {
        this.queryPurger.shutdownNow();
    }

    @POST
    @Produces(value={"application/json"})
    public Response createQuery(String statement, @HeaderParam(value="X-Presto-User") String user, @HeaderParam(value="X-Presto-Source") String source, @HeaderParam(value="X-Presto-Catalog") String catalog, @HeaderParam(value="X-Presto-Schema") String schema, @HeaderParam(value="X-Presto-Time-Zone") String timeZoneId, @HeaderParam(value="X-Presto-Language") String language, @HeaderParam(value="User-Agent") String userAgent, @Context HttpServletRequest requestContext, @Context UriInfo uriInfo) throws InterruptedException {
        StatementResource.assertRequest(!Strings.isNullOrEmpty((String)statement), "SQL statement is empty", new Object[0]);
        StatementResource.assertRequest(!Strings.isNullOrEmpty((String)user), "User (%s) is empty", "X-Presto-User");
        StatementResource.assertRequest(!Strings.isNullOrEmpty((String)catalog), "Catalog (%s) is empty", "X-Presto-Catalog");
        StatementResource.assertRequest(!Strings.isNullOrEmpty((String)schema), "Schema (%s) is empty", "X-Presto-Schema");
        if (timeZoneId == null) {
            timeZoneId = TimeZone.getDefault().getID();
        }
        Locale locale = Locale.getDefault();
        if (language != null) {
            locale = Locale.forLanguageTag(language);
        }
        String remoteUserAddress = requestContext.getRemoteAddr();
        ConnectorSession session = new ConnectorSession(user, source, catalog, schema, StatementResource.getTimeZoneKey(timeZoneId), locale, remoteUserAddress, userAgent);
        ExchangeClient exchangeClient = (ExchangeClient)this.exchangeClientSupplier.get();
        Query query = new Query(session, statement, this.queryManager, exchangeClient);
        this.queries.put(query.getQueryId(), query);
        return Response.ok((Object)query.getNextResults(uriInfo, new Duration(1.0, TimeUnit.MILLISECONDS))).build();
    }

    static void assertRequest(boolean expression, String format, Object ... args) {
        if (!expression) {
            throw StatementResource.badRequest(String.format(format, args));
        }
    }

    static TimeZoneKey getTimeZoneKey(String timeZoneId) {
        try {
            return TimeZoneKey.getTimeZoneKey((String)timeZoneId);
        }
        catch (TimeZoneNotSupportedException e) {
            throw StatementResource.badRequest(e.getMessage());
        }
    }

    private static WebApplicationException badRequest(String message) {
        throw new WebApplicationException(Response.status((Response.Status)Response.Status.BAD_REQUEST).type("text/plain").entity((Object)message).build());
    }

    @GET
    @Path(value="{queryId}/{token}")
    @Produces(value={"application/json"})
    public Response getQueryResults(@PathParam(value="queryId") QueryId queryId, @PathParam(value="token") long token, @QueryParam(value="maxWait") Duration maxWait, @Context UriInfo uriInfo) throws InterruptedException {
        Query query = (Query)this.queries.get(queryId);
        if (query == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        Duration wait = (Duration)WAIT_ORDERING.min((Object)MAX_WAIT_TIME, (Object)maxWait);
        return Response.ok((Object)query.getResults(token, uriInfo, wait)).build();
    }

    @DELETE
    @Path(value="{queryId}/{token}")
    @Produces(value={"application/json"})
    public Response cancelQuery(@PathParam(value="queryId") QueryId queryId, @PathParam(value="token") long token) {
        Query query = (Query)this.queries.get(queryId);
        if (query == null) {
            return Response.status((Response.Status)Response.Status.NOT_FOUND).build();
        }
        query.close();
        return Response.noContent().build();
    }

    private static class PurgeQueriesRunnable
    implements Runnable {
        private final ConcurrentMap<QueryId, Query> queries;
        private final QueryManager queryManager;

        public PurgeQueriesRunnable(ConcurrentMap<QueryId, Query> queries, QueryManager queryManager) {
            this.queries = queries;
            this.queryManager = queryManager;
        }

        @Override
        public void run() {
            try {
                ImmutableSet queryIdsSnapshot = ImmutableSet.copyOf(this.queries.keySet());
                ImmutableSet liveQueries = ImmutableSet.copyOf((Iterable)Iterables.transform(this.queryManager.getAllQueryInfo(), QueryInfo.queryIdGetter()));
                Sets.SetView deadQueries = Sets.difference((Set)queryIdsSnapshot, (Set)liveQueries);
                for (QueryId deadQueryId : deadQueries) {
                    Query query = (Query)this.queries.remove(deadQueryId);
                    if (query == null) continue;
                    query.close();
                    log.info("Removed expired query %s", new Object[]{deadQueryId});
                }
            }
            catch (Throwable e) {
                log.warn(e, "Error removing old queries");
            }
        }
    }

    @ThreadSafe
    public static class Query
    implements Closeable {
        private final QueryManager queryManager;
        private final QueryId queryId;
        private final ExchangeClient exchangeClient;
        private final AtomicLong resultId = new AtomicLong();
        private final ConnectorSession session;
        @GuardedBy(value="this")
        private QueryResults lastResult;
        @GuardedBy(value="this")
        private String lastResultPath;
        @GuardedBy(value="this")
        private List<Column> columns;

        public Query(ConnectorSession session, String query, QueryManager queryManager, ExchangeClient exchangeClient) {
            Preconditions.checkNotNull((Object)session, (Object)"session is null");
            Preconditions.checkNotNull((Object)query, (Object)"query is null");
            Preconditions.checkNotNull((Object)queryManager, (Object)"queryManager is null");
            Preconditions.checkNotNull((Object)exchangeClient, (Object)"exchangeClient is null");
            this.session = session;
            this.queryManager = queryManager;
            QueryInfo queryInfo = queryManager.createQuery(session, query);
            this.queryId = queryInfo.getQueryId();
            this.exchangeClient = exchangeClient;
        }

        @Override
        public void close() {
            this.queryManager.cancelQuery(this.queryId);
        }

        public QueryId getQueryId() {
            return this.queryId;
        }

        public synchronized QueryResults getResults(long token, UriInfo uriInfo, Duration maxWaitTime) throws InterruptedException {
            String requestedPath = uriInfo.getAbsolutePath().getPath();
            if (this.lastResultPath != null && requestedPath.equals(this.lastResultPath)) {
                this.queryManager.getQueryInfo(this.queryId);
                return this.lastResult;
            }
            if (token < this.resultId.get()) {
                throw new WebApplicationException(Response.Status.GONE);
            }
            if (this.lastResult.getNextUri() == null || !requestedPath.equals(this.lastResult.getNextUri().getPath())) {
                throw new WebApplicationException(Response.Status.NOT_FOUND);
            }
            return this.getNextResults(uriInfo, maxWaitTime);
        }

        public synchronized QueryResults getNextResults(UriInfo uriInfo, Duration maxWaitTime) throws InterruptedException {
            ImmutableSet data = this.getData(maxWaitTime);
            QueryInfo queryInfo = this.queryManager.getQueryInfo(this.queryId);
            if (this.exchangeClient.isClosed() && !queryInfo.getState().isDone()) {
                this.queryManager.waitForStateChange(this.queryId, queryInfo.getState(), maxWaitTime);
                queryInfo = this.queryManager.getQueryInfo(this.queryId);
            }
            if (queryInfo.getState().isDone()) {
                if (queryInfo.getState() != QueryState.FINISHED) {
                    this.exchangeClient.close();
                } else if (queryInfo.getOutputStage() == null) {
                    this.exchangeClient.close();
                    this.columns = ImmutableList.of((Object)new Column("result", "boolean"));
                    data = ImmutableSet.of((Object)ImmutableList.of((Object)true));
                }
            }
            URI nextResultsUri = null;
            if (!queryInfo.getState().isDone() || !this.exchangeClient.isClosed()) {
                nextResultsUri = this.createNextResultsUri(uriInfo);
            }
            QueryResults queryResults = new QueryResults(this.queryId.toString(), uriInfo.getRequestUriBuilder().replaceQuery("").replacePath(queryInfo.getSelf().getPath()).build(new Object[0]), Query.findCancelableLeafStage(queryInfo), nextResultsUri, this.columns, (Iterable)data, Query.toStatementStats(queryInfo), Query.toQueryError(queryInfo));
            this.lastResultPath = this.lastResult != null ? this.lastResult.getNextUri().getPath() : null;
            this.lastResult = queryResults;
            return queryResults;
        }

        private synchronized Iterable<List<Object>> getData(Duration maxWait) throws InterruptedException {
            Page page;
            QueryInfo queryInfo = this.queryManager.getQueryInfo(this.queryId);
            while (maxWait.toMillis() > 1L && !Query.isQueryStarted(queryInfo)) {
                maxWait = this.queryManager.waitForStateChange(this.queryId, queryInfo.getState(), maxWait);
                queryInfo = this.queryManager.getQueryInfo(this.queryId);
            }
            if (!Query.isQueryStarted(queryInfo) || queryInfo.getOutputStage() == null) {
                return null;
            }
            if (this.columns == null) {
                this.columns = Query.createColumnsList(queryInfo);
            }
            List<Type> types = queryInfo.getOutputStage().getTypes();
            this.updateExchangeClient(queryInfo.getOutputStage());
            ImmutableList.Builder pages = ImmutableList.builder();
            int bytes = 0;
            while ((long)bytes < DESIRED_RESULT_BYTES && (page = this.exchangeClient.getNextPage(maxWait)) != null) {
                bytes = (int)((long)bytes + page.getDataSize().toBytes());
                pages.add((Object)new RowIterable(this.session, types, page));
                maxWait = new Duration(0.0, TimeUnit.MILLISECONDS);
            }
            if (bytes == 0) {
                return null;
            }
            return Iterables.concat((Iterable)pages.build());
        }

        private static boolean isQueryStarted(QueryInfo queryInfo) {
            QueryState state = queryInfo.getState();
            return state != QueryState.QUEUED && queryInfo.getState() != QueryState.PLANNING && queryInfo.getState() != QueryState.STARTING;
        }

        private synchronized void updateExchangeClient(StageInfo outputStage) {
            if (!outputStage.getState().isDone()) {
                for (TaskInfo taskInfo : outputStage.getTasks()) {
                    List<BufferInfo> buffers = taskInfo.getOutputBuffers().getBuffers();
                    Preconditions.checkState((buffers.size() == 1 ? 1 : 0) != 0, (String)"Expected a single output buffer for task %s, but found %s", (Object[])new Object[]{taskInfo.getTaskId(), buffers});
                    String bufferId = ((BufferInfo)Iterables.getOnlyElement(buffers)).getBufferId();
                    URI uri = HttpUriBuilder.uriBuilderFrom((URI)taskInfo.getSelf()).appendPath("results").appendPath(bufferId).build();
                    this.exchangeClient.addLocation(uri);
                }
            }
            if (outputStage.getState() != StageState.PLANNED && outputStage.getState() != StageState.SCHEDULING) {
                this.exchangeClient.noMoreLocations();
            }
        }

        private synchronized URI createNextResultsUri(UriInfo uriInfo) {
            return uriInfo.getBaseUriBuilder().replacePath("/v1/statement").path(this.queryId.toString()).path(String.valueOf(this.resultId.incrementAndGet())).replaceQuery("").build(new Object[0]);
        }

        private static List<Column> createColumnsList(QueryInfo queryInfo) {
            Preconditions.checkNotNull((Object)queryInfo, (Object)"queryInfo is null");
            StageInfo outputStage = queryInfo.getOutputStage();
            Preconditions.checkNotNull((Object)outputStage, (Object)"outputStage is null");
            List<String> names = queryInfo.getFieldNames();
            List<Type> types = outputStage.getTypes();
            Preconditions.checkArgument((names.size() == types.size() ? 1 : 0) != 0, (Object)"names and types size mismatch");
            ImmutableList.Builder list = ImmutableList.builder();
            for (int i = 0; i < names.size(); ++i) {
                String name = names.get(i);
                String type = types.get(i).getName();
                list.add((Object)new Column(name, type));
            }
            return list.build();
        }

        private static StatementStats toStatementStats(QueryInfo queryInfo) {
            QueryStats queryStats = queryInfo.getQueryStats();
            return StatementStats.builder().setState(queryInfo.getState().toString()).setScheduled(Query.isScheduled(queryInfo)).setNodes(Query.globalUniqueNodes(queryInfo.getOutputStage()).size()).setTotalSplits(queryStats.getTotalDrivers()).setQueuedSplits(queryStats.getQueuedDrivers()).setRunningSplits(queryStats.getRunningDrivers()).setCompletedSplits(queryStats.getCompletedDrivers()).setUserTimeMillis(queryStats.getTotalUserTime().toMillis()).setCpuTimeMillis(queryStats.getTotalCpuTime().toMillis()).setWallTimeMillis(queryStats.getTotalScheduledTime().toMillis()).setProcessedRows(queryStats.getRawInputPositions()).setProcessedBytes(queryStats.getRawInputDataSize().toBytes()).setRootStage(Query.toStageStats(queryInfo.getOutputStage())).build();
        }

        private static StageStats toStageStats(StageInfo stageInfo) {
            if (stageInfo == null) {
                return null;
            }
            com.facebook.presto.execution.StageStats stageStats = stageInfo.getStageStats();
            ImmutableList.Builder subStages = ImmutableList.builder();
            for (StageInfo subStage : stageInfo.getSubStages()) {
                subStages.add((Object)Query.toStageStats(subStage));
            }
            HashSet<String> uniqueNodes = new HashSet<String>();
            for (TaskInfo task : stageInfo.getTasks()) {
                URI uri = task.getSelf();
                uniqueNodes.add(uri.getHost() + ":" + uri.getPort());
            }
            return StageStats.builder().setStageId(String.valueOf(stageInfo.getStageId().getId())).setState(stageInfo.getState().toString()).setDone(stageInfo.getState().isDone()).setNodes(uniqueNodes.size()).setTotalSplits(stageStats.getTotalDrivers()).setQueuedSplits(stageStats.getQueuedDrivers()).setRunningSplits(stageStats.getRunningDrivers()).setCompletedSplits(stageStats.getCompletedDrivers()).setUserTimeMillis(stageStats.getTotalUserTime().toMillis()).setCpuTimeMillis(stageStats.getTotalCpuTime().toMillis()).setWallTimeMillis(stageStats.getTotalScheduledTime().toMillis()).setProcessedRows(stageStats.getRawInputPositions()).setProcessedBytes(stageStats.getRawInputDataSize().toBytes()).setSubStages((List)subStages.build()).build();
        }

        private static Set<String> globalUniqueNodes(StageInfo stageInfo) {
            if (stageInfo == null) {
                return ImmutableSet.of();
            }
            ImmutableSet.Builder nodes = ImmutableSet.builder();
            for (TaskInfo task : stageInfo.getTasks()) {
                URI uri = task.getSelf();
                nodes.add((Object)(uri.getHost() + ":" + uri.getPort()));
            }
            for (StageInfo subStage : stageInfo.getSubStages()) {
                nodes.addAll(Query.globalUniqueNodes(subStage));
            }
            return nodes.build();
        }

        private static boolean isScheduled(QueryInfo queryInfo) {
            StageInfo stage = queryInfo.getOutputStage();
            if (stage == null) {
                return false;
            }
            return IterableTransformer.on(StageInfo.getAllStages(stage)).transform(StageInfo.stageStateGetter()).all(Query.isStageRunningOrDone());
        }

        private static Predicate<StageState> isStageRunningOrDone() {
            return new Predicate<StageState>(){

                public boolean apply(StageState state) {
                    return state == StageState.RUNNING || state.isDone();
                }
            };
        }

        private static URI findCancelableLeafStage(QueryInfo queryInfo) {
            if (queryInfo.getOutputStage() == null) {
                return null;
            }
            return Query.findCancelableLeafStage(queryInfo.getOutputStage());
        }

        private static URI findCancelableLeafStage(StageInfo stage) {
            if (stage.getState().isDone()) {
                return null;
            }
            for (StageInfo subStage : Lists.reverse(stage.getSubStages())) {
                URI leafStage = Query.findCancelableLeafStage(subStage);
                if (leafStage == null) continue;
                return leafStage;
            }
            return stage.getSelf();
        }

        private static QueryError toQueryError(QueryInfo queryInfo) {
            FailureInfo failure = queryInfo.getFailureInfo();
            if (failure == null) {
                QueryState state = queryInfo.getState();
                if (!state.isDone() || state == QueryState.FINISHED) {
                    return null;
                }
                log.warn("Query %s in state %s has no failure info", new Object[]{queryInfo.getQueryId(), state});
                failure = Failures.toFailure(new RuntimeException(String.format("Query is %s (reason unknown)", new Object[]{state}))).toFailureInfo();
            }
            return new QueryError(failure.getMessage(), null, 0, failure.getErrorLocation(), failure);
        }

        private static class RowIterator
        extends AbstractIterator<List<Object>> {
            private final ConnectorSession session;
            private final List<Type> types;
            private final Page page;
            private int position = -1;

            private RowIterator(ConnectorSession session, List<Type> types, Page page) {
                this.session = session;
                this.types = types;
                this.page = page;
            }

            protected List<Object> computeNext() {
                ++this.position;
                if (this.position >= this.page.getPositionCount()) {
                    return (List)this.endOfData();
                }
                ArrayList<Object> values = new ArrayList<Object>(this.page.getChannelCount());
                for (int channel = 0; channel < this.page.getChannelCount(); ++channel) {
                    Type type = this.types.get(channel);
                    Block block = this.page.getBlock(channel);
                    values.add(type.getObjectValue(this.session, block, this.position));
                }
                return Collections.unmodifiableList(values);
            }
        }

        private static class RowIterable
        implements Iterable<List<Object>> {
            private final ConnectorSession session;
            private final List<Type> types;
            private final Page page;

            private RowIterable(ConnectorSession session, List<Type> types, Page page) {
                this.session = session;
                this.types = ImmutableList.copyOf((Collection)((Collection)Preconditions.checkNotNull(types, (Object)"types is null")));
                this.page = (Page)Preconditions.checkNotNull((Object)page, (Object)"page is null");
            }

            @Override
            public Iterator<List<Object>> iterator() {
                return new RowIterator(this.session, this.types, this.page);
            }
        }
    }
}

