/*
 * Decompiled with CFR 0.152.
 */
package org.deeplearning4j.spark.parameterserver.functions;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.Iterator;
import org.apache.commons.io.LineIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.datavec.spark.util.SerializableHadoopConfig;
import org.deeplearning4j.core.loader.DataSetLoader;
import org.deeplearning4j.spark.api.TrainingResult;
import org.deeplearning4j.spark.api.TrainingWorker;
import org.deeplearning4j.spark.iterator.PathSparkDataSetIterator;
import org.deeplearning4j.spark.parameterserver.pw.SharedTrainingWrapper;
import org.deeplearning4j.spark.parameterserver.training.SharedTrainingResult;
import org.deeplearning4j.spark.parameterserver.training.SharedTrainingWorker;
import org.nd4j.linalg.dataset.DataSet;

public class SharedFlatMapPaths<R extends TrainingResult>
implements FlatMapFunction<Iterator<String>, R> {
    public static Configuration defaultConfig;
    protected final SharedTrainingWorker worker;
    protected final DataSetLoader loader;
    protected final Broadcast<SerializableHadoopConfig> hadoopConfig;

    public static File toTempFile(Iterator<String> dataSetIterator) throws IOException {
        File f = Files.createTempFile("SharedFlatMapPaths", ".txt", new FileAttribute[0]).toFile();
        f.deleteOnExit();
        try (BufferedWriter bw = new BufferedWriter(new FileWriter(f));){
            while (dataSetIterator.hasNext()) {
                bw.write(dataSetIterator.next());
                bw.write("\n");
            }
        }
        return f;
    }

    public SharedFlatMapPaths(TrainingWorker<R> worker, DataSetLoader loader, Broadcast<SerializableHadoopConfig> hadoopConfig) {
        this.worker = (SharedTrainingWorker)worker;
        this.loader = loader;
        this.hadoopConfig = hadoopConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Iterator<R> call(Iterator<String> dataSetIterator) throws Exception {
        if (!dataSetIterator.hasNext()) {
            return Collections.emptyIterator();
        }
        File f = SharedFlatMapPaths.toTempFile(dataSetIterator);
        LineIterator lineIter = new LineIterator((Reader)new FileReader(f));
        try {
            SharedTrainingWrapper.getInstance(this.worker.getInstanceId()).attachDS((Iterator<DataSet>)new PathSparkDataSetIterator((Iterator)lineIter, this.loader, this.hadoopConfig));
            SharedTrainingResult result = SharedTrainingWrapper.getInstance(this.worker.getInstanceId()).run(this.worker);
            Iterator<SharedTrainingResult> iterator = Collections.singletonList(result).iterator();
            return iterator;
        }
        finally {
            lineIter.close();
            f.delete();
        }
    }
}

