/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.shuffle.common.impl;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;

@InterfaceAudience.Private
public class SimpleFetchedInputAllocator
implements FetchedInputAllocator,
FetchedInputCallback {
    private static final Log LOG = LogFactory.getLog(SimpleFetchedInputAllocator.class);
    private final Configuration conf;
    private final TezTaskOutputFiles fileNameAllocator;
    private final LocalDirAllocator localDirAllocator;
    private final long memoryLimit;
    private final long maxSingleShuffleLimit;
    private final long maxAvailableTaskMemory;
    private final long initialMemoryAvailable;
    private volatile long usedMemory = 0L;

    public SimpleFetchedInputAllocator(String uniqueIdentifier, Configuration conf, long maxTaskAvailableMemory, long memoryAvailable) {
        this.conf = conf;
        this.maxAvailableTaskMemory = maxTaskAvailableMemory;
        this.initialMemoryAvailable = memoryAvailable;
        this.fileNameAllocator = new TezTaskOutputFiles(conf, uniqueIdentifier);
        this.localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        float maxInMemCopyUse = conf.getFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.fetch.buffer.percent: " + maxInMemCopyUse);
        }
        long memReq = (long)((float)conf.getLong("tez.runtime.task.memory", Math.min(this.maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
        this.memoryLimit = memReq <= this.initialMemoryAvailable ? memReq : this.initialMemoryAvailable;
        LOG.info((Object)("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit));
        float singleShuffleMemoryLimitPercent = conf.getFloat("tez.runtime.shuffle.memory.limit.percent", 0.25f);
        if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.memory.limit.percent: " + singleShuffleMemoryLimitPercent);
        }
        this.maxSingleShuffleLimit = (long)((float)this.memoryLimit * singleShuffleMemoryLimitPercent);
        LOG.info((Object)("SimpleInputManager -> MemoryLimit: " + this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit));
    }

    @InterfaceAudience.Private
    public static long getInitialMemoryReq(Configuration conf, long maxAvailableTaskMemory) {
        float maxInMemCopyUse = conf.getFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.9f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.fetch.buffer.percent: " + maxInMemCopyUse);
        }
        long memReq = (long)((float)conf.getLong("tez.runtime.task.memory", Math.min(maxAvailableTaskMemory, Integer.MAX_VALUE)) * maxInMemCopyUse);
        return memReq;
    }

    @Override
    public synchronized FetchedInput allocate(long actualSize, long compressedSize, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
        if (actualSize > this.maxSingleShuffleLimit || this.usedMemory + actualSize > this.memoryLimit) {
            return new DiskFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this, this.conf, this.localDirAllocator, this.fileNameAllocator);
        }
        this.usedMemory += actualSize;
        LOG.info((Object)("Used memory after allocating " + actualSize + " : " + this.usedMemory));
        return new MemoryFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this);
    }

    @Override
    public synchronized FetchedInput allocateType(FetchedInput.Type type, long actualSize, long compressedSize, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
        switch (type) {
            case DISK: {
                return new DiskFetchedInput(actualSize, compressedSize, inputAttemptIdentifier, this, this.conf, this.localDirAllocator, this.fileNameAllocator);
            }
        }
        return this.allocate(actualSize, compressedSize, inputAttemptIdentifier);
    }

    @Override
    public synchronized void fetchComplete(FetchedInput fetchedInput) {
        switch (fetchedInput.getType()) {
            case DISK: 
            case DISK_DIRECT: 
            case MEMORY: {
                break;
            }
            default: {
                throw new TezUncheckedException("InputType: " + (Object)((Object)fetchedInput.getType()) + " not expected for Broadcast fetch");
            }
        }
    }

    @Override
    public synchronized void fetchFailed(FetchedInput fetchedInput) {
        this.cleanup(fetchedInput);
    }

    @Override
    public synchronized void freeResources(FetchedInput fetchedInput) {
        this.cleanup(fetchedInput);
    }

    private void cleanup(FetchedInput fetchedInput) {
        switch (fetchedInput.getType()) {
            case DISK: {
                break;
            }
            case MEMORY: {
                this.unreserve(fetchedInput.getActualSize());
                break;
            }
            default: {
                throw new TezUncheckedException("InputType: " + (Object)((Object)fetchedInput.getType()) + " not expected for Broadcast fetch");
            }
        }
    }

    private synchronized void unreserve(long size) {
        this.usedMemory -= size;
        LOG.info((Object)("Used memory after freeing " + size + " : " + this.usedMemory));
    }
}

