/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spark.classloader_interface;

import com.facebook.presto.spark.classloader_interface.ScalaUtils;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.shuffle.ShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleHandle;
import org.apache.spark.shuffle.ShuffleManager;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import scala.Option;
import scala.Product2;
import scala.collection.Iterator;

public class PrestoSparkNativeExecutionShuffleManager
implements ShuffleManager {
    private final Map<Integer, ShuffleHandle> shuffleDependencyMap = new ConcurrentHashMap<Integer, ShuffleHandle>();
    private final Map<Integer, Integer> shuffleNumMaps = new ConcurrentHashMap<Integer, Integer>();
    private final ShuffleManager fallbackShuffleManager;
    private static final String FALLBACK_SPARK_SHUFFLE_MANAGER = "spark.fallback.shuffle.manager";

    public PrestoSparkNativeExecutionShuffleManager(SparkConf conf) {
        this.fallbackShuffleManager = (ShuffleManager)PrestoSparkNativeExecutionShuffleManager.instantiateClass(conf.get(FALLBACK_SPARK_SHUFFLE_MANAGER), conf);
    }

    private static <T> T instantiateClass(String className, SparkConf conf) {
        try {
            return (T)Class.forName(className).getConstructor(SparkConf.class).newInstance(conf);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(String.format("%s class not found", className), e);
        }
    }

    public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
        this.shuffleNumMaps.put(shuffleId, numMaps);
        return this.fallbackShuffleManager.registerShuffle(shuffleId, numMaps, dependency);
    }

    public <K, V> ShuffleWriter<K, V> getWriter(ShuffleHandle handle, int mapId, TaskContext context) {
        this.shuffleDependencyMap.put(mapId, handle);
        int shuffleId = handle.shuffleId();
        return new EmptyShuffleWriter(this.getNumOfPartitions(shuffleId));
    }

    public <K, C> ShuffleReader<K, C> getReader(ShuffleHandle handle, int startPartition, int endPartition, TaskContext context) {
        return new EmptyShuffleReader();
    }

    public boolean unregisterShuffle(int shuffleId) {
        this.fallbackShuffleManager.unregisterShuffle(shuffleId);
        return true;
    }

    public ShuffleBlockResolver shuffleBlockResolver() {
        return this.fallbackShuffleManager.shuffleBlockResolver();
    }

    public void stop() {
        this.fallbackShuffleManager.stop();
    }

    public Optional<ShuffleHandle> getShuffleHandle(int partitionId) {
        return Optional.ofNullable(this.shuffleDependencyMap.getOrDefault(partitionId, null));
    }

    public int getNumOfPartitions(int shuffleId) {
        if (!this.shuffleNumMaps.containsKey(shuffleId)) {
            throw new RuntimeException(String.format("shuffleId=[%s] is not registered", shuffleId));
        }
        return this.shuffleNumMaps.get(shuffleId);
    }

    static class EmptyShuffleWriter<K, V>
    extends ShuffleWriter<K, V> {
        private final long[] mapStatus;

        public EmptyShuffleWriter(int totalMapStages) {
            this.mapStatus = new long[totalMapStages];
        }

        public void write(Iterator<Product2<K, V>> records) throws IOException {
            if (records.hasNext()) {
                throw new RuntimeException("EmptyShuffleWriter can only take empty write input.");
            }
        }

        public Option<MapStatus> stop(boolean success) {
            BlockManager blockManager = SparkEnv.get().blockManager();
            return Option.apply((Object)MapStatus$.MODULE$.apply(blockManager.blockManagerId(), this.mapStatus));
        }
    }

    static class EmptyShuffleReader<K, V>
    implements ShuffleReader<K, V> {
        EmptyShuffleReader() {
        }

        public Iterator<Product2<K, V>> read() {
            return ScalaUtils.emptyScalaIterator();
        }
    }
}

