/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.execution;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.Page;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.spark.SparkErrorCode;
import com.facebook.presto.spark.classloader_interface.PrestoSparkStorageHandle;
import com.facebook.presto.spark.classloader_interface.PrestoSparkTaskOutput;
import com.facebook.presto.spark.execution.PrestoSparkBroadcastTableCacheManager;
import com.facebook.presto.spark.execution.PrestoSparkPageInput;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.PagesSerdeUtil;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.storage.TempDataOperationContext;
import com.facebook.presto.spi.storage.TempStorage;
import com.facebook.presto.spi.storage.TempStorageHandle;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.SliceInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.zip.CRC32;
import javax.annotation.concurrent.GuardedBy;

public class PrestoSparkDiskPageInput
implements PrestoSparkPageInput {
    private static final Logger log = Logger.get(PrestoSparkDiskPageInput.class);
    private final PagesSerde pagesSerde;
    private final TempStorage tempStorage;
    private final TempDataOperationContext tempDataOperationContext;
    private final PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager;
    private final StageId stageId;
    private final PlanNodeId planNodeId;
    private final List<List<PrestoSparkStorageHandle>> broadcastTableFilesInfo;
    @GuardedBy(value="this")
    private List<Iterator<Page>> pageIterators;
    @GuardedBy(value="this")
    private int currentIteratorIndex;

    public PrestoSparkDiskPageInput(PagesSerde pagesSerde, TempStorage tempStorage, TempDataOperationContext tempDataOperationContext, PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, StageId stageId, PlanNodeId planNodeId, List<List<PrestoSparkStorageHandle>> broadcastTableFilesInfo) {
        this.pagesSerde = Objects.requireNonNull(pagesSerde, "pagesSerde is null");
        this.tempStorage = Objects.requireNonNull(tempStorage, "tempStorage is null");
        this.tempDataOperationContext = Objects.requireNonNull(tempDataOperationContext, "tempDataOperationContext is null");
        this.prestoSparkBroadcastTableCacheManager = Objects.requireNonNull(prestoSparkBroadcastTableCacheManager, "prestoSparkBroadcastTableCacheManager is null");
        this.stageId = Objects.requireNonNull(stageId, "stageId is null");
        this.planNodeId = Objects.requireNonNull(planNodeId, "planNodeId is null");
        this.broadcastTableFilesInfo = Objects.requireNonNull(broadcastTableFilesInfo, "broadcastTableFilesInfo is null");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Page getNextPage() {
        Page page = null;
        PrestoSparkDiskPageInput prestoSparkDiskPageInput = this;
        synchronized (prestoSparkDiskPageInput) {
            while (page == null) {
                if (this.currentIteratorIndex >= this.getPageIterators().size()) {
                    return null;
                }
                Iterator<Page> currentIterator = this.getPageIterators().get(this.currentIteratorIndex);
                if (currentIterator.hasNext()) {
                    page = currentIterator.next();
                    continue;
                }
                ++this.currentIteratorIndex;
            }
        }
        return page;
    }

    private List<Iterator<Page>> getPageIterators() {
        if (this.pageIterators == null) {
            this.pageIterators = this.getPages(this.broadcastTableFilesInfo, this.tempStorage, this.tempDataOperationContext, this.prestoSparkBroadcastTableCacheManager, this.stageId, this.planNodeId);
        }
        return this.pageIterators;
    }

    private List<Iterator<Page>> getPages(List<List<PrestoSparkStorageHandle>> broadcastTableFilesInfo, TempStorage tempStorage, TempDataOperationContext tempDataOperationContext, PrestoSparkBroadcastTableCacheManager prestoSparkBroadcastTableCacheManager, StageId stageId, PlanNodeId planNodeId) {
        List pages = prestoSparkBroadcastTableCacheManager.getCachedBroadcastTable(stageId, planNodeId);
        if (pages == null) {
            pages = (List)broadcastTableFilesInfo.stream().map(tableFiles -> {
                List<SerializedPage> serializedPages = this.loadBroadcastTable((List<PrestoSparkStorageHandle>)tableFiles, tempStorage, tempDataOperationContext);
                return (ImmutableList)serializedPages.stream().map(serializedPage -> this.pagesSerde.deserialize(serializedPage)).collect(ImmutableList.toImmutableList());
            }).collect(ImmutableList.toImmutableList());
            prestoSparkBroadcastTableCacheManager.cache(stageId, planNodeId, pages);
        }
        return (List)pages.stream().map(List::iterator).collect(ImmutableList.toImmutableList());
    }

    private List<SerializedPage> loadBroadcastTable(List<PrestoSparkStorageHandle> broadcastTaskFilesInfo, TempStorage tempStorage, TempDataOperationContext tempDataOperationContext) {
        try {
            CRC32 checksum = new CRC32();
            ImmutableList.Builder pages = ImmutableList.builder();
            ArrayList<PrestoSparkStorageHandle> broadcastTaskFilesInfoCopy = new ArrayList<PrestoSparkStorageHandle>(broadcastTaskFilesInfo);
            Collections.shuffle(broadcastTaskFilesInfoCopy);
            for (PrestoSparkTaskOutput prestoSparkTaskOutput : broadcastTaskFilesInfoCopy) {
                checksum.reset();
                PrestoSparkStorageHandle prestoSparkStorageHandle = (PrestoSparkStorageHandle)prestoSparkTaskOutput;
                TempStorageHandle tempStorageHandle = tempStorage.deserialize(prestoSparkStorageHandle.getSerializedStorageHandle());
                log.info("Reading path: " + tempStorageHandle.toString());
                try (InputStream inputStream = tempStorage.open(tempDataOperationContext, tempStorageHandle);
                     InputStreamSliceInput inputStreamSliceInput = new InputStreamSliceInput(inputStream);){
                    Iterator pagesIterator = PagesSerdeUtil.readSerializedPages((SliceInput)inputStreamSliceInput);
                    while (pagesIterator.hasNext()) {
                        SerializedPage page = (SerializedPage)pagesIterator.next();
                        checksum.update(page.getSlice().byteArray(), page.getSlice().byteArrayOffset(), page.getSlice().length());
                        pages.add((Object)page);
                    }
                }
                if (checksum.getValue() == prestoSparkStorageHandle.getChecksum()) continue;
                throw new PrestoException((ErrorCodeSupplier)SparkErrorCode.STORAGE_ERROR, "Disk page checksum does not match. Data seems to be corrupted on disk for file " + tempStorageHandle.toString());
            }
            return pages.build();
        }
        catch (IOException | UncheckedIOException e) {
            throw new PrestoException((ErrorCodeSupplier)SparkErrorCode.STORAGE_ERROR, "Unable to read data from disk: ", (Throwable)e);
        }
    }

    public long getRetainedSizeInBytes() {
        return this.prestoSparkBroadcastTableCacheManager.getBroadcastTableSizeInBytes(this.stageId, this.planNodeId);
    }
}

