/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarbinaryType;
import com.facebook.presto.execution.Lifespan;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.DevNullOperator;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.OperationTimer;
import com.facebook.presto.operator.Operator;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.OperatorFactory;
import com.facebook.presto.operator.PageSinkCommitStrategy;
import com.facebook.presto.operator.TableCommitContext;
import com.facebook.presto.operator.TableFinishInfo;
import com.facebook.presto.operator.TableWriterUtils;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.slice.Slice;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class TableFinishOperator
implements Operator {
    public static final List<Type> TYPES = ImmutableList.of((Object)BigintType.BIGINT);
    private final OperatorContext operatorContext;
    private final TableFinisher tableFinisher;
    private final Operator statisticsAggregationOperator;
    private final StatisticAggregationsDescriptor<Integer> descriptor;
    private State state = State.RUNNING;
    private final AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata = new AtomicReference(Optional.empty());
    private final ImmutableList.Builder<ComputedStatistics> computedStatisticsBuilder = ImmutableList.builder();
    private final OperationTimer.OperationTiming statisticsTiming = new OperationTimer.OperationTiming();
    private final boolean statisticsCpuTimerEnabled;
    private final boolean memoryTrackingEnabled;
    private final JsonCodec<TableCommitContext> tableCommitContextCodec;
    private final LifespanAndStageStateTracker lifespanAndStageStateTracker;
    private final LocalMemoryContext systemMemoryContext;
    private final AtomicLong operatorRetainedMemoryBytes = new AtomicLong();
    private final Supplier<TableFinishInfo> tableFinishInfoSupplier;

    public TableFinishOperator(OperatorContext operatorContext, TableFinisher tableFinisher, PageSinkCommitter pageSinkCommitter, Operator statisticsAggregationOperator, StatisticAggregationsDescriptor<Integer> descriptor, boolean statisticsCpuTimerEnabled, boolean memoryTrackingEnabled, JsonCodec<TableCommitContext> tableCommitContextCodec) {
        this.operatorContext = Objects.requireNonNull(operatorContext, "operatorContext is null");
        this.tableFinisher = Objects.requireNonNull(tableFinisher, "tableCommitter is null");
        this.statisticsAggregationOperator = Objects.requireNonNull(statisticsAggregationOperator, "statisticsAggregationOperator is null");
        this.descriptor = Objects.requireNonNull(descriptor, "descriptor is null");
        this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled;
        this.memoryTrackingEnabled = memoryTrackingEnabled;
        this.tableCommitContextCodec = Objects.requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
        this.lifespanAndStageStateTracker = new LifespanAndStageStateTracker(pageSinkCommitter, this.operatorRetainedMemoryBytes);
        this.systemMemoryContext = operatorContext.localSystemMemoryContext();
        this.tableFinishInfoSupplier = TableFinishOperator.createTableFinishInfoSupplier(this.outputMetadata, this.statisticsTiming);
        operatorContext.setInfoSupplier(this.tableFinishInfoSupplier);
    }

    @Override
    public OperatorContext getOperatorContext() {
        return this.operatorContext;
    }

    @Override
    public void finish() {
        OperationTimer timer = new OperationTimer(this.statisticsCpuTimerEnabled);
        this.statisticsAggregationOperator.finish();
        timer.end(this.statisticsTiming);
        if (this.state == State.RUNNING) {
            this.state = State.FINISHING;
        }
    }

    @Override
    public boolean isFinished() {
        if (this.state == State.FINISHED) {
            Verify.verify((boolean)this.statisticsAggregationOperator.isFinished());
            return true;
        }
        return false;
    }

    @Override
    public ListenableFuture<?> isBlocked() {
        return this.statisticsAggregationOperator.isBlocked();
    }

    @Override
    public boolean needsInput() {
        if (this.state != State.RUNNING) {
            return false;
        }
        return this.statisticsAggregationOperator.needsInput();
    }

    @Override
    public void addInput(Page page) {
        Objects.requireNonNull(page, "page is null");
        Preconditions.checkState((this.state == State.RUNNING ? 1 : 0) != 0, (String)"Operator is %s", (Object)((Object)this.state));
        TableCommitContext tableCommitContext = TableWriterUtils.getTableCommitContext(page, this.tableCommitContextCodec);
        this.lifespanAndStageStateTracker.update(page, tableCommitContext);
        this.lifespanAndStageStateTracker.getStatisticsPagesToProcess(page, tableCommitContext).forEach(statisticsPage -> {
            OperationTimer timer = new OperationTimer(this.statisticsCpuTimerEnabled);
            this.statisticsAggregationOperator.addInput((Page)statisticsPage);
            timer.end(this.statisticsTiming);
        });
        if (this.memoryTrackingEnabled) {
            this.systemMemoryContext.setBytes(this.operatorRetainedMemoryBytes.get());
        }
    }

    @Override
    public Page getOutput() {
        if (!this.isBlocked().isDone()) {
            return null;
        }
        if (!this.statisticsAggregationOperator.isFinished()) {
            Verify.verify((boolean)this.statisticsAggregationOperator.isBlocked().isDone(), (String)"aggregation operator should not be blocked", (Object[])new Object[0]);
            OperationTimer timer = new OperationTimer(this.statisticsCpuTimerEnabled);
            Page page = this.statisticsAggregationOperator.getOutput();
            timer.end(this.statisticsTiming);
            if (page == null) {
                return null;
            }
            for (int position = 0; position < page.getPositionCount(); ++position) {
                this.computedStatisticsBuilder.add((Object)this.getComputedStatistics(page, position));
            }
            return null;
        }
        if (this.state != State.FINISHING) {
            return null;
        }
        this.state = State.FINISHED;
        this.lifespanAndStageStateTracker.commit();
        this.outputMetadata.set(this.tableFinisher.finishTable(this.lifespanAndStageStateTracker.getFinalFragments(), (Collection<ComputedStatistics>)this.computedStatisticsBuilder.build()));
        PageBuilder page = new PageBuilder(1, TYPES);
        page.declarePosition();
        BigintType.BIGINT.writeLong(page.getBlockBuilder(0), this.lifespanAndStageStateTracker.getFinalRowCount());
        return page.build();
    }

    private ComputedStatistics getComputedStatistics(Page page, int position) {
        ImmutableList.Builder groupingColumns = ImmutableList.builder();
        ImmutableList.Builder groupingValues = ImmutableList.builder();
        this.descriptor.getGrouping().forEach((column, channel) -> {
            groupingColumns.add(column);
            groupingValues.add((Object)page.getBlock(channel.intValue()).getSingleValueBlock(position));
        });
        ComputedStatistics.Builder statistics = ComputedStatistics.builder((List)groupingColumns.build(), (List)groupingValues.build());
        this.descriptor.getTableStatistics().forEach((type, channel) -> statistics.addTableStatistic(type, page.getBlock(channel.intValue()).getSingleValueBlock(position)));
        this.descriptor.getColumnStatistics().forEach((metadata, channel) -> statistics.addColumnStatistic(metadata, page.getBlock(channel.intValue()).getSingleValueBlock(position)));
        return statistics.build();
    }

    @VisibleForTesting
    TableFinishInfo getInfo() {
        return this.tableFinishInfoSupplier.get();
    }

    private static Supplier<TableFinishInfo> createTableFinishInfoSupplier(AtomicReference<Optional<ConnectorOutputMetadata>> outputMetadata, OperationTimer.OperationTiming statisticsTiming) {
        Objects.requireNonNull(outputMetadata, "outputMetadata is null");
        Objects.requireNonNull(statisticsTiming, "statisticsTiming is null");
        return () -> new TableFinishInfo((Optional)outputMetadata.get(), Duration.succinctNanos((long)statisticsTiming.getWallNanos()), Duration.succinctNanos((long)statisticsTiming.getCpuNanos()));
    }

    @Override
    public void close() throws Exception {
        this.statisticsAggregationOperator.close();
        this.systemMemoryContext.setBytes(0L);
    }

    private static class LifespanAndStageStateTracker {
        private final Map<LifespanAndStage, LifespanAndStageState> noCommitUnrecoverableLifespanAndStageStates = new HashMap<LifespanAndStage, LifespanAndStageState>();
        private final Map<LifespanAndStage, LifespanAndStageState> taskCommitUnrecoverableLifespanAndStageStates = new HashMap<LifespanAndStage, LifespanAndStageState>();
        private final Map<LifespanAndStage, Map<TaskId, LifespanAndStageState>> uncommittedRecoverableLifespanAndStageStates = new HashMap<LifespanAndStage, Map<TaskId, LifespanAndStageState>>();
        private final Map<LifespanAndStage, LifespanAndStageState> committedRecoverableLifespanAndStages = new HashMap<LifespanAndStage, LifespanAndStageState>();
        private final PageSinkCommitter pageSinkCommitter;
        private final List<ListenableFuture<Void>> commitFutures = new ArrayList<ListenableFuture<Void>>();
        private final AtomicLong operatorRetainedMemoryBytes;

        LifespanAndStageStateTracker(PageSinkCommitter pageSinkCommitter, AtomicLong operatorRetainedMemoryBytes) {
            this.pageSinkCommitter = Objects.requireNonNull(pageSinkCommitter, "pageSinkCommitter is null");
            this.operatorRetainedMemoryBytes = Objects.requireNonNull(operatorRetainedMemoryBytes, "operatorRetainedMemoryBytes is null");
        }

        public void commit() {
            for (LifespanAndStageState lifespanAndStageState : this.taskCommitUnrecoverableLifespanAndStageStates.values()) {
                this.commitFutures.add(this.pageSinkCommitter.commitAsync(lifespanAndStageState.getFragments()));
            }
            ListenableFuture future = Futures.whenAllSucceed(this.commitFutures).call(() -> null, MoreExecutors.directExecutor());
            try {
                future.get();
            }
            catch (InterruptedException e) {
                future.cancel(true);
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                future.cancel(true);
                Throwables.propagateIfPossible((Throwable)e.getCause(), PrestoException.class);
                throw new RuntimeException(e.getCause());
            }
        }

        public void update(Page page, TableCommitContext tableCommitContext) {
            LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext);
            PageSinkCommitStrategy commitStrategy = tableCommitContext.getPageSinkCommitStrategy();
            switch (commitStrategy) {
                case NO_COMMIT: {
                    this.noCommitUnrecoverableLifespanAndStageStates.computeIfAbsent(lifespanAndStage, ignored -> new LifespanAndStageState(tableCommitContext.getTaskId(), this.operatorRetainedMemoryBytes, false)).update(page);
                    return;
                }
                case TASK_COMMIT: {
                    this.taskCommitUnrecoverableLifespanAndStageStates.computeIfAbsent(lifespanAndStage, ignored -> new LifespanAndStageState(tableCommitContext.getTaskId(), this.operatorRetainedMemoryBytes, false)).update(page);
                    return;
                }
                case LIFESPAN_COMMIT: {
                    Preconditions.checkState((lifespanAndStage.lifespan != Lifespan.taskWide() ? 1 : 0) != 0, (Object)"Recoverable lifespan cannot be TASK_WIDE");
                    if (this.committedRecoverableLifespanAndStages.containsKey(lifespanAndStage)) {
                        Preconditions.checkState((!this.committedRecoverableLifespanAndStages.get(lifespanAndStage).getTaskId().equals(tableCommitContext.getTaskId()) ? 1 : 0) != 0, (Object)"Received page from same task of committed lifespan and stage combination");
                        return;
                    }
                    Map lifespanStageStatesPerTask = this.uncommittedRecoverableLifespanAndStageStates.computeIfAbsent(lifespanAndStage, ignored -> new HashMap());
                    lifespanStageStatesPerTask.computeIfAbsent(tableCommitContext.getTaskId(), ignored -> new LifespanAndStageState(tableCommitContext.getTaskId(), this.operatorRetainedMemoryBytes, true)).update(page);
                    if (tableCommitContext.isLastPage()) {
                        Preconditions.checkState((!this.committedRecoverableLifespanAndStages.containsKey(lifespanAndStage) ? 1 : 0) != 0, (Object)"LifespanAndStage already finished");
                        LifespanAndStageState lifespanAndStageState = (LifespanAndStageState)lifespanStageStatesPerTask.get(tableCommitContext.getTaskId());
                        this.committedRecoverableLifespanAndStages.put(lifespanAndStage, lifespanAndStageState);
                        this.uncommittedRecoverableLifespanAndStageStates.remove(lifespanAndStage);
                        this.commitFutures.add(this.pageSinkCommitter.commitAsync(lifespanAndStageState.getFragments()));
                    }
                    return;
                }
            }
            throw new IllegalArgumentException("unexpected commit strategy: " + (Object)((Object)commitStrategy));
        }

        List<Page> getStatisticsPagesToProcess(Page page, TableCommitContext tableCommitContext) {
            LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext);
            if (tableCommitContext.getPageSinkCommitStrategy() != PageSinkCommitStrategy.LIFESPAN_COMMIT) {
                return (List)TableWriterUtils.extractStatisticsRows(page).map(ImmutableList::of).orElse(ImmutableList.of());
            }
            if (!this.committedRecoverableLifespanAndStages.containsKey(lifespanAndStage)) {
                return ImmutableList.of();
            }
            Preconditions.checkState((!this.uncommittedRecoverableLifespanAndStageStates.containsKey(lifespanAndStage) ? 1 : 0) != 0, (String)"lifespanAndStage %s is already committed", (Object)lifespanAndStage);
            LifespanAndStageState lifespanAndStageState = this.committedRecoverableLifespanAndStages.get(lifespanAndStage);
            List<Page> pages = lifespanAndStageState.getStatisticsPages();
            lifespanAndStageState.resetStatisticsPages();
            return pages;
        }

        public long getFinalRowCount() {
            Preconditions.checkState((boolean)this.uncommittedRecoverableLifespanAndStageStates.isEmpty(), (Object)"All recoverable LifespanAndStage should be committed when fetching final row count");
            return Streams.concat((Stream[])new Stream[]{this.noCommitUnrecoverableLifespanAndStageStates.values().stream(), this.taskCommitUnrecoverableLifespanAndStageStates.values().stream(), this.committedRecoverableLifespanAndStages.values().stream()}).mapToLong(LifespanAndStageState::getRowCount).sum();
        }

        public List<Slice> getFinalFragments() {
            Preconditions.checkState((boolean)this.uncommittedRecoverableLifespanAndStageStates.isEmpty(), (Object)"All recoverable LifespanAndStage should be committed when fetching final fragments");
            return (List)Streams.concat((Stream[])new Stream[]{this.noCommitUnrecoverableLifespanAndStageStates.values().stream(), this.taskCommitUnrecoverableLifespanAndStageStates.values().stream(), this.committedRecoverableLifespanAndStages.values().stream()}).map(LifespanAndStageState::getFragments).flatMap(Collection::stream).collect(ImmutableList.toImmutableList());
        }

        private static class LifespanAndStageState {
            private final ImmutableList.Builder<Slice> fragmentBuilder = ImmutableList.builder();
            private final TaskId taskId;
            private final AtomicLong operatorRetainedMemoryBytes;
            private long retainedMemoryBytesForStatisticsPages;
            private long rowCount;
            private Optional<ImmutableList.Builder<Page>> statisticsPages;

            public LifespanAndStageState(TaskId taskId, AtomicLong operatorRetainedMemoryBytes, boolean trackStatisticsPages) {
                this.taskId = Objects.requireNonNull(taskId, "taskId is null");
                this.operatorRetainedMemoryBytes = Objects.requireNonNull(operatorRetainedMemoryBytes, "operatorRetainedMemoryBytes is null");
                this.statisticsPages = trackStatisticsPages ? Optional.of(ImmutableList.builder()) : Optional.empty();
            }

            public void update(Page page) {
                Optional<Page> statisticsPage;
                long memoryBytesDelta = 0L;
                Block rowCountBlock = page.getBlock(0);
                Block fragmentBlock = page.getBlock(1);
                for (int position = 0; position < page.getPositionCount(); ++position) {
                    if (!rowCountBlock.isNull(position)) {
                        this.rowCount += BigintType.BIGINT.getLong(rowCountBlock, position);
                    }
                    if (fragmentBlock.isNull(position)) continue;
                    Slice fragment = VarbinaryType.VARBINARY.getSlice(fragmentBlock, position);
                    this.fragmentBuilder.add((Object)fragment);
                    memoryBytesDelta += fragment.getRetainedSize();
                }
                if (this.statisticsPages.isPresent() && (statisticsPage = TableWriterUtils.extractStatisticsRows(page)).isPresent()) {
                    this.statisticsPages.get().add((Object)statisticsPage.get());
                    long retainedSizeForStatisticsPage = statisticsPage.get().getRetainedSizeInBytes();
                    this.retainedMemoryBytesForStatisticsPages += retainedSizeForStatisticsPage;
                    memoryBytesDelta += retainedSizeForStatisticsPage;
                }
                this.operatorRetainedMemoryBytes.addAndGet(memoryBytesDelta);
            }

            public long getRowCount() {
                return this.rowCount;
            }

            public List<Slice> getFragments() {
                return this.fragmentBuilder.build();
            }

            public List<Page> getStatisticsPages() {
                Preconditions.checkState((boolean)this.statisticsPages.isPresent(), (Object)"statisticsPages is present for recoverable grouped execution only");
                return this.statisticsPages.get().build();
            }

            public TaskId getTaskId() {
                return this.taskId;
            }

            public void resetStatisticsPages() {
                this.statisticsPages = Optional.empty();
                this.operatorRetainedMemoryBytes.addAndGet(-this.retainedMemoryBytesForStatisticsPages);
                this.retainedMemoryBytesForStatisticsPages = 0L;
            }
        }

        private static class LifespanAndStage {
            private final Lifespan lifespan;
            private final int stageId;

            private LifespanAndStage(Lifespan lifespan, int stageId) {
                this.lifespan = Objects.requireNonNull(lifespan, "lifespan is null");
                this.stageId = stageId;
            }

            public static LifespanAndStage fromTableCommitContext(TableCommitContext operatorExecutionContext) {
                return new LifespanAndStage(operatorExecutionContext.getLifespan(), operatorExecutionContext.getTaskId().getStageExecutionId().getStageId().getId());
            }

            public Lifespan getLifespan() {
                return this.lifespan;
            }

            public int getStageId() {
                return this.stageId;
            }

            public boolean equals(Object o) {
                if (this == o) {
                    return true;
                }
                if (!(o instanceof LifespanAndStage)) {
                    return false;
                }
                LifespanAndStage that = (LifespanAndStage)o;
                return this.stageId == that.stageId && Objects.equals(this.lifespan, that.lifespan);
            }

            public int hashCode() {
                return Objects.hash(this.lifespan, this.stageId);
            }

            public String toString() {
                return MoreObjects.toStringHelper((Object)this).add("lifespan", (Object)this.lifespan).add("stageId", this.stageId).toString();
            }
        }
    }

    public static interface PageSinkCommitter {
        public ListenableFuture<Void> commitAsync(Collection<Slice> var1);
    }

    public static interface TableFinisher {
        public Optional<ConnectorOutputMetadata> finishTable(Collection<Slice> var1, Collection<ComputedStatistics> var2);
    }

    private static enum State {
        RUNNING,
        FINISHING,
        FINISHED;

    }

    public static class TableFinishOperatorFactory
    implements OperatorFactory {
        private final int operatorId;
        private final PlanNodeId planNodeId;
        private final TableFinisher tableFinisher;
        private final PageSinkCommitter pageSinkCommitter;
        private final OperatorFactory statisticsAggregationOperatorFactory;
        private final StatisticAggregationsDescriptor<Integer> descriptor;
        private final Session session;
        private final JsonCodec<TableCommitContext> tableCommitContextCodec;
        private final boolean memoryTrackingEnabled;
        private boolean closed;

        public TableFinishOperatorFactory(int operatorId, PlanNodeId planNodeId, TableFinisher tableFinisher, PageSinkCommitter pageSinkCommitter, OperatorFactory statisticsAggregationOperatorFactory, StatisticAggregationsDescriptor<Integer> descriptor, Session session, JsonCodec<TableCommitContext> tableCommitContextCodec, boolean memoryTrackingEnabled) {
            this.operatorId = operatorId;
            this.planNodeId = Objects.requireNonNull(planNodeId, "planNodeId is null");
            this.tableFinisher = Objects.requireNonNull(tableFinisher, "tableFinisher is null");
            this.pageSinkCommitter = Objects.requireNonNull(pageSinkCommitter, "pageSinkCommitter is null");
            this.statisticsAggregationOperatorFactory = Objects.requireNonNull(statisticsAggregationOperatorFactory, "statisticsAggregationOperatorFactory is null");
            this.descriptor = Objects.requireNonNull(descriptor, "descriptor is null");
            this.session = Objects.requireNonNull(session, "session is null");
            this.tableCommitContextCodec = Objects.requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
            this.memoryTrackingEnabled = memoryTrackingEnabled;
        }

        @Override
        public Operator createOperator(DriverContext driverContext) {
            Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"Factory is already closed");
            OperatorContext context = driverContext.addOperatorContext(this.operatorId, this.planNodeId, TableFinishOperator.class.getSimpleName());
            Operator statisticsAggregationOperator = this.statisticsAggregationOperatorFactory.createOperator(driverContext);
            boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && SystemSessionProperties.isStatisticsCpuTimerEnabled(this.session);
            return new TableFinishOperator(context, this.tableFinisher, this.pageSinkCommitter, statisticsAggregationOperator, this.descriptor, statisticsCpuTimerEnabled, this.memoryTrackingEnabled, this.tableCommitContextCodec);
        }

        @Override
        public void noMoreOperators() {
            this.closed = true;
        }

        @Override
        public OperatorFactory duplicate() {
            return new TableFinishOperatorFactory(this.operatorId, this.planNodeId, this.tableFinisher, this.pageSinkCommitter, this.statisticsAggregationOperatorFactory, this.descriptor, this.session, this.tableCommitContextCodec, this.memoryTrackingEnabled);
        }
    }
}

