/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.ExceededMemoryLimitException;
import com.facebook.presto.spark.PrestoSparkBroadcastDependency;
import com.facebook.presto.spark.PrestoSparkServiceWaitTimeMetrics;
import com.facebook.presto.spark.RddAndMore;
import com.facebook.presto.spark.SparkErrorCode;
import com.facebook.presto.spark.classloader_interface.PrestoSparkStorageHandle;
import com.facebook.presto.spark.util.PrestoSparkUtils;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.storage.TempDataOperationContext;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.spi.storage.TempStorageHandle;
import io.airlift.units.DataSize;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;

public class PrestoSparkStorageBasedBroadcastDependency
implements PrestoSparkBroadcastDependency<PrestoSparkStorageHandle> {
    private static final Logger log = Logger.get(PrestoSparkStorageBasedBroadcastDependency.class);
    private RddAndMore<PrestoSparkStorageHandle> broadcastDependency;
    private final DataSize maxBroadcastSize;
    private final DataSize queryMaxTotalMemoryPerNode;
    private final long queryCompletionDeadline;
    private final TempStorage tempStorage;
    private final TempDataOperationContext tempDataOperationContext;
    private final Set<PrestoSparkServiceWaitTimeMetrics> waitTimeMetrics;
    private Broadcast<List<PrestoSparkStorageHandle>> broadcastVariable;

    public PrestoSparkStorageBasedBroadcastDependency(RddAndMore<PrestoSparkStorageHandle> broadcastDependency, DataSize maxBroadcastSize, DataSize queryMaxTotalMemoryPerNode, long queryCompletionDeadline, TempStorage tempStorage, TempDataOperationContext tempDataOperationContext, Set<PrestoSparkServiceWaitTimeMetrics> waitTimeMetrics) {
        this.broadcastDependency = Objects.requireNonNull(broadcastDependency, "broadcastDependency cannot be null");
        this.maxBroadcastSize = Objects.requireNonNull(maxBroadcastSize, "maxBroadcastSize cannot be null");
        this.queryMaxTotalMemoryPerNode = Objects.requireNonNull(queryMaxTotalMemoryPerNode, "queryMaxTotalMemoryPerNode cannot be null");
        this.queryCompletionDeadline = queryCompletionDeadline;
        this.tempStorage = Objects.requireNonNull(tempStorage, "tempStorage cannot be null");
        this.tempDataOperationContext = Objects.requireNonNull(tempDataOperationContext, "tempDataOperationContext cannot be null");
        this.waitTimeMetrics = Objects.requireNonNull(waitTimeMetrics, "waitTimeMetrics cannot be null");
    }

    @Override
    public Broadcast<List<PrestoSparkStorageHandle>> executeBroadcast(JavaSparkContext sparkContext) throws SparkException, TimeoutException {
        List broadcastValue = this.broadcastDependency.collectAndDestroyDependenciesWithTimeout(PrestoSparkUtils.computeNextTimeout(this.queryCompletionDeadline), TimeUnit.MILLISECONDS, this.waitTimeMetrics).stream().map(Tuple2::_2).collect(Collectors.toList());
        this.broadcastDependency = null;
        long compressedBroadcastSizeInBytes = broadcastValue.stream().mapToLong(PrestoSparkStorageHandle::getCompressedSizeInBytes).sum();
        long uncompressedBroadcastSizeInBytes = broadcastValue.stream().mapToLong(PrestoSparkStorageHandle::getUncompressedSizeInBytes).sum();
        long deserializedBroadcastSizeInBytes = broadcastValue.stream().mapToLong(PrestoSparkStorageHandle::getDeserializedRetainedSizeInBytes).sum();
        log.info("Got back %d pages. compressedBroadcastSizeInBytes: %d; uncompressedBroadcastSizeInBytes: %d; deserializedBroadcastObjectSizeInBytes: %d", new Object[]{broadcastValue.size(), compressedBroadcastSizeInBytes, uncompressedBroadcastSizeInBytes, deserializedBroadcastSizeInBytes});
        long maxBroadcastSizeInBytes = this.maxBroadcastSize.toBytes();
        if (deserializedBroadcastSizeInBytes > maxBroadcastSizeInBytes) {
            throw ExceededMemoryLimitException.exceededLocalBroadcastMemoryLimit((DataSize)this.maxBroadcastSize, (String)String.format("Broadcast size: %s", DataSize.succinctBytes((long)deserializedBroadcastSizeInBytes)));
        }
        if (deserializedBroadcastSizeInBytes > this.queryMaxTotalMemoryPerNode.toBytes()) {
            throw ExceededMemoryLimitException.exceededLocalTotalMemoryLimit((DataSize)this.queryMaxTotalMemoryPerNode, (String)String.format("Broadcast size: %s", DataSize.succinctBytes((long)deserializedBroadcastSizeInBytes)), (boolean)false, Optional.empty());
        }
        this.broadcastVariable = sparkContext.broadcast(broadcastValue);
        return this.broadcastVariable;
    }

    @Override
    public void destroy() {
        if (this.broadcastVariable == null) {
            return;
        }
        try {
            for (PrestoSparkStorageHandle diskPage : (List)this.broadcastVariable.getValue()) {
                TempStorageHandle storageHandle = this.tempStorage.deserialize(diskPage.getSerializedStorageHandle());
                this.tempStorage.remove(this.tempDataOperationContext, storageHandle);
                log.info("Deleted broadcast spill file: " + storageHandle.toString());
            }
        }
        catch (IOException e) {
            throw new PrestoException((ErrorCodeSupplier)SparkErrorCode.STORAGE_ERROR, "Unable to delete broadcast spill file", (Throwable)e);
        }
        this.broadcastVariable.destroy();
    }
}

