/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.copy.publisher;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
import org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyDataPublisher
extends DataPublisher
implements UnpublishedHandling {
    private static final Logger log = LoggerFactory.getLogger(CopyDataPublisher.class);
    private final Path writerOutputDir;
    private final FileSystem fs;
    protected final EventSubmitter eventSubmitter;
    protected final RecoveryHelper recoveryHelper;
    protected final Optional<LineageInfo> lineageInfo;

    public boolean isThreadSafe() {
        return ((Object)((Object)this)).getClass() == CopyDataPublisher.class;
    }

    public CopyDataPublisher(State state) throws IOException {
        super(state);
        this.lineageInfo = state instanceof SourceState ? LineageInfo.getLineageInfo((SharedResourcesBroker)((SourceState)state).getBroker()) : (state instanceof WorkUnitState ? LineageInfo.getLineageInfo((SharedResourcesBroker)((WorkUnitState)state).getTaskBrokerNullable()) : Optional.absent());
        String uri = this.state.getProp("writer.fs.uri", "file:///");
        this.fs = FileSystem.get((URI)URI.create(uri), (Configuration)WriterUtils.getFsConfiguration((State)state));
        FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
        this.writerOutputDir = new Path(state.getProp("writer.output.dir"));
        MetricContext metricContext = Instrumented.getMetricContext((State)state, CopyDataPublisher.class, (List)GobblinMetrics.getCustomTagsFromState((State)state));
        this.eventSubmitter = new EventSubmitter.Builder(metricContext, "org.apache.gobblin.copy.CopyDataPublisher").build();
        this.recoveryHelper = new RecoveryHelper(this.fs, state);
        this.recoveryHelper.purgeOldPersistedFile();
    }

    public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
        Multimap<CopyEntity.DatasetAndPartition, WorkUnitState> datasets = CopyDataPublisher.groupByFileSet(states);
        boolean allDatasetsPublished = true;
        for (CopyEntity.DatasetAndPartition datasetAndPartition : datasets.keySet()) {
            try {
                this.publishFileSet(datasetAndPartition, datasets.get((Object)datasetAndPartition));
            }
            catch (Throwable e) {
                CopyEventSubmitterHelper.submitFailedDatasetPublish(this.eventSubmitter, datasetAndPartition);
                log.error("Failed to publish " + datasetAndPartition.getDataset().getDatasetURN(), e);
                allDatasetsPublished = false;
            }
        }
        if (!allDatasetsPublished) {
            throw new IOException("Not all datasets published successfully");
        }
    }

    public void handleUnpublishedWorkUnits(Collection<? extends WorkUnitState> states) throws IOException {
        int filesPersisted = this.persistFailedFileSet(states);
        log.info(String.format("Successfully persisted %d work units.", filesPersisted));
    }

    private static Multimap<CopyEntity.DatasetAndPartition, WorkUnitState> groupByFileSet(Collection<? extends WorkUnitState> states) {
        ArrayListMultimap datasetRoots = ArrayListMultimap.create();
        for (WorkUnitState workUnitState : states) {
            CopyEntity file = CopySource.deserializeCopyEntity((State)workUnitState);
            CopyEntity.DatasetAndPartition datasetAndPartition = file.getDatasetAndPartition(CopyableDatasetMetadata.deserialize(workUnitState.getProp("gobblin.copy.serialized.copyable.datasets")));
            datasetRoots.put((Object)datasetAndPartition, (Object)workUnitState);
        }
        return datasetRoots;
    }

    private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, Collection<WorkUnitState> datasetWorkUnitStates) throws IOException {
        HashMap additionalMetadata = Maps.newHashMap();
        Preconditions.checkArgument((!datasetWorkUnitStates.isEmpty() ? 1 : 0) != 0, (Object)"publishFileSet received an empty collection work units. This is an error in code.");
        CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(datasetWorkUnitStates.iterator().next().getProp("gobblin.copy.serialized.copyable.datasets"));
        Path datasetWriterOutputPath = new Path(this.writerOutputDir, datasetAndPartition.identifier());
        log.info("Merging all split work units.");
        DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates);
        log.info(String.format("[%s] Publishing fileSet from %s for dataset %s", datasetAndPartition.identifier(), datasetWriterOutputPath, metadata.getDatasetURN()));
        List<CommitStep> prePublish = CopyDataPublisher.getCommitSequence(datasetWorkUnitStates, PrePublishStep.class);
        List<CommitStep> postPublish = CopyDataPublisher.getCommitSequence(datasetWorkUnitStates, PostPublishStep.class);
        log.info(String.format("[%s] Found %d prePublish steps and %d postPublish steps.", datasetAndPartition.identifier(), prePublish.size(), postPublish.size()));
        CopyDataPublisher.executeCommitSequence(prePublish);
        if (CopyDataPublisher.hasCopyableFiles(datasetWorkUnitStates)) {
            HadoopUtils.renameRecursively((FileSystem)this.fs, (Path)datasetWriterOutputPath, (Path)new Path("/"));
        } else {
            log.info(String.format("[%s] No copyable files in dataset. Proceeding to postpublish steps.", datasetAndPartition.identifier()));
        }
        CopyDataPublisher.executeCommitSequence(postPublish);
        this.fs.delete(datasetWriterOutputPath, true);
        long datasetOriginTimestamp = Long.MAX_VALUE;
        long datasetUpstreamTimestamp = Long.MAX_VALUE;
        Optional fileSetRoot = Optional.absent();
        for (WorkUnitState wus : datasetWorkUnitStates) {
            CopyEntity copyEntity;
            if (wus.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) {
                wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
            }
            if (!((copyEntity = CopySource.deserializeCopyEntity((State)wus)) instanceof CopyableFile)) continue;
            CopyableFile copyableFile = (CopyableFile)copyEntity;
            if (wus.getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
                CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus);
                if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) {
                    fileSetRoot = Optional.of((Object)copyableFile.getDatasetOutputPath());
                }
                if (this.lineageInfo.isPresent()) {
                    ((LineageInfo)this.lineageInfo.get()).putDestination(copyableFile.getDestinationData(), 0, (State)wus);
                }
            }
            if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
                datasetOriginTimestamp = copyableFile.getOriginTimestamp();
            }
            if (datasetUpstreamTimestamp <= copyableFile.getUpstreamTimestamp()) continue;
            datasetUpstreamTimestamp = copyableFile.getUpstreamTimestamp();
        }
        if (Long.MAX_VALUE == datasetOriginTimestamp) {
            datasetOriginTimestamp = 0L;
        }
        if (Long.MAX_VALUE == datasetUpstreamTimestamp) {
            datasetUpstreamTimestamp = 0L;
        }
        additionalMetadata.put("sourceCluster", this.state.getProp("sourceCluster"));
        additionalMetadata.put("destinationCluster", this.state.getProp("destinationCluster"));
        additionalMetadata.put("datasetOutputPath", fileSetRoot.or((Object)"Unknown"));
        CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, datasetAndPartition, Long.toString(datasetOriginTimestamp), Long.toString(datasetUpstreamTimestamp), additionalMetadata);
    }

    private static boolean hasCopyableFiles(Collection<WorkUnitState> workUnits) throws IOException {
        for (WorkUnitState wus : workUnits) {
            if (!CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass((State)wus))) continue;
            return true;
        }
        return false;
    }

    private static List<CommitStep> getCommitSequence(Collection<WorkUnitState> workUnits, Class<?> baseClass) throws IOException {
        ArrayList steps = Lists.newArrayList();
        for (WorkUnitState wus : workUnits) {
            if (!baseClass.isAssignableFrom(CopySource.getCopyEntityClass((State)wus))) continue;
            CommitStepCopyEntity step = (CommitStepCopyEntity)CopySource.deserializeCopyEntity((State)wus);
            steps.add(step);
        }
        Comparator<CommitStepCopyEntity> commitStepSorter = new Comparator<CommitStepCopyEntity>(){

            @Override
            public int compare(CommitStepCopyEntity o1, CommitStepCopyEntity o2) {
                return Integer.compare(o1.getPriority(), o2.getPriority());
            }
        };
        Collections.sort(steps, commitStepSorter);
        ArrayList sequence = Lists.newArrayList();
        for (CommitStepCopyEntity entity : steps) {
            sequence.add(entity.getStep());
        }
        return sequence;
    }

    private static void executeCommitSequence(List<CommitStep> steps) throws IOException {
        for (CommitStep step : steps) {
            step.execute();
        }
    }

    private int persistFailedFileSet(Collection<? extends WorkUnitState> workUnitStates) throws IOException {
        int filesPersisted = 0;
        for (WorkUnitState workUnitState : workUnitStates) {
            CopyableDatasetMetadata metadata;
            Path outputDir;
            Path outputPath;
            CopyableFile file;
            CopyEntity entity;
            if (workUnitState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL || !((entity = CopySource.deserializeCopyEntity((State)workUnitState)) instanceof CopyableFile) || !this.recoveryHelper.persistFile((State)workUnitState, file = (CopyableFile)entity, outputPath = FileAwareInputStreamDataWriter.getOutputFilePath(file, outputDir = FileAwareInputStreamDataWriter.getOutputDir((State)workUnitState), file.getDatasetAndPartition(metadata = CopySource.deserializeCopyableDataset((State)workUnitState))))) continue;
            ++filesPersisted;
        }
        return filesPersisted;
    }

    public void publishMetadata(Collection<? extends WorkUnitState> states) throws IOException {
    }

    public void close() throws IOException {
    }

    public void initialize() throws IOException {
    }
}

