/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.data.management.copy.writer;

import com.codahale.metrics.Meter;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.collect.UnmodifiableIterator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gobblin.broker.EmptyKey;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.broker.iface.SharedResourceFactory;
import org.apache.gobblin.broker.iface.SharedResourceKey;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.crypto.EncryptionConfigParser;
import org.apache.gobblin.crypto.EncryptionFactory;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriter;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.io.StreamCopier;
import org.apache.gobblin.util.io.StreamThrottler;
import org.apache.gobblin.util.io.ThrottledInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileAwareInputStreamDataWriter
extends InstrumentedDataWriter<FileAwareInputStream>
implements FinalState,
SpeculativeAttemptAwareConstruct {
    private static final Logger log = LoggerFactory.getLogger(FileAwareInputStreamDataWriter.class);
    public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter";
    public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize";
    public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
    public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit";
    public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false;
    protected final AtomicLong bytesWritten = new AtomicLong();
    protected final AtomicLong filesWritten = new AtomicLong();
    protected final WorkUnitState state;
    protected final FileSystem fs;
    protected final Path stagingDir;
    protected final Path outputDir;
    private final Map<String, Object> encryptionConfig;
    protected CopyableDatasetMetadata copyableDatasetMetadata;
    protected final RecoveryHelper recoveryHelper;
    protected final SharedResourcesBroker<GobblinScopeTypes> taskBroker;
    protected final int bufferSize;
    private final boolean checkFileSize;
    private final Options.Rename renameOptions;
    private final FileContext fileContext;
    protected final Meter copySpeedMeter;
    protected final Optional<String> writerAttemptIdOptional;
    protected Optional<CopyableFile> actualProcessedCopyableFile;

    public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId, String writerAttemptId) throws IOException {
        this(state, null, numBranches, branchId, writerAttemptId);
    }

    public FileAwareInputStreamDataWriter(State state, FileSystem fileSystem, int numBranches, int branchId, String writerAttemptId) throws IOException {
        super(state);
        if (numBranches > 1) {
            throw new IOException("Distcp can only operate with one branch.");
        }
        if (!(state instanceof WorkUnitState)) {
            throw new RuntimeException(String.format("Distcp requires a %s on construction.", WorkUnitState.class.getSimpleName()));
        }
        this.state = (WorkUnitState)state;
        this.taskBroker = this.state.getTaskBroker();
        this.writerAttemptIdOptional = Optional.fromNullable((Object)writerAttemptId);
        String uriStr = this.state.getProp(ForkOperatorUtils.getPropertyNameForBranch((String)"writer.fs.uri", (int)numBranches, (int)branchId), "file:///");
        Configuration conf = WriterUtils.getFsConfiguration((State)state);
        URI uri = URI.create(uriStr);
        this.fs = fileSystem != null ? fileSystem : FileSystem.get((URI)uri, (Configuration)conf);
        this.fileContext = FileContext.getFileContext((URI)uri, (Configuration)conf);
        this.stagingDir = state.getPropAsBoolean("user.defined.staging.dir.flag", false) ? new Path(state.getProp("user.defined.static.staging.dir")) : (this.writerAttemptIdOptional.isPresent() ? WriterUtils.getWriterStagingDir((State)state, (int)numBranches, (int)branchId, (String)((String)this.writerAttemptIdOptional.get())) : WriterUtils.getWriterStagingDir((State)state, (int)numBranches, (int)branchId));
        this.copyableDatasetMetadata = CopyableDatasetMetadata.deserialize(state.getProp("gobblin.copy.serialized.copyable.datasets"));
        this.outputDir = FileAwareInputStreamDataWriter.getOutputDir(state);
        this.recoveryHelper = new RecoveryHelper(this.fs, state);
        this.actualProcessedCopyableFile = Optional.absent();
        this.copySpeedMeter = this.getMetricContext().meter(GOBBLIN_COPY_BYTES_COPIED_METER);
        this.bufferSize = state.getPropAsInt("gobblin.copy.bufferSize", 32768);
        this.encryptionConfig = EncryptionConfigParser.getConfigForBranch((EncryptionConfigParser.EntityType)EncryptionConfigParser.EntityType.WRITER, (State)this.state, (int)numBranches, (int)branchId);
        this.checkFileSize = state.getPropAsBoolean(GOBBLIN_COPY_CHECK_FILESIZE, false);
        boolean taskOverwriteOnCommit = state.getPropAsBoolean(GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT, false);
        this.renameOptions = taskOverwriteOnCommit ? Options.Rename.OVERWRITE : Options.Rename.NONE;
    }

    public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId) throws IOException {
        this(state, numBranches, branchId, null);
    }

    public final void writeImpl(FileAwareInputStream fileAwareInputStream) throws IOException {
        CopyableFile copyableFile = fileAwareInputStream.getFile();
        if (this.encryptionConfig != null) {
            copyableFile.setDestination(PathUtils.addExtension((Path)copyableFile.getDestination(), (String[])new String[]{"." + EncryptionConfigParser.getEncryptionType(this.encryptionConfig)}));
        }
        Path stagingFile = this.getStagingFilePath(copyableFile);
        if (this.actualProcessedCopyableFile.isPresent()) {
            throw new IOException(((Object)((Object)this)).getClass().getCanonicalName() + " can only process one file and cannot be reused.");
        }
        this.fs.mkdirs(stagingFile.getParent());
        this.writeImpl(fileAwareInputStream.getInputStream(), stagingFile, copyableFile, fileAwareInputStream);
        this.actualProcessedCopyableFile = Optional.of((Object)copyableFile);
        this.filesWritten.incrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile copyableFile, FileAwareInputStream record) throws IOException {
        Predicate<FileStatus> fileStatusAttributesFilter;
        Optional<FileStatus> persistedFile;
        long fileSize;
        final short replication = this.state.getPropAsShort("writer.file.replication.factor", copyableFile.getReplication(this.fs));
        final long blockSize = copyableFile.getBlockSize(this.fs);
        long expectedBytes = fileSize = copyableFile.getFileStatus().getLen();
        Long maxBytes = null;
        boolean mustMatchMaxBytes = false;
        if (record.getSplit().isPresent()) {
            maxBytes = ((DistcpFileSplitter.Split)record.getSplit().get()).getHighPosition() - ((DistcpFileSplitter.Split)record.getSplit().get()).getLowPosition();
            if (((DistcpFileSplitter.Split)record.getSplit().get()).isLastSplit()) {
                expectedBytes = fileSize % blockSize;
                mustMatchMaxBytes = false;
            } else {
                expectedBytes = maxBytes;
                mustMatchMaxBytes = true;
            }
        }
        if ((persistedFile = this.recoveryHelper.findPersistedFile((State)this.state, copyableFile, fileStatusAttributesFilter = new Predicate<FileStatus>(){

            public boolean apply(FileStatus input) {
                return input.getReplication() == replication && input.getBlockSize() == blockSize;
            }
        })).isPresent()) {
            log.info(String.format("Recovering persisted file %s to %s.", ((FileStatus)persistedFile.get()).getPath(), writeAt));
            this.fs.rename(((FileStatus)persistedFile.get()).getPath(), writeAt);
        } else {
            if (copyableFile.getFileStatus().isDirectory()) {
                this.fs.mkdirs(writeAt);
                return;
            }
            Object os = this.fs.create(writeAt, true, this.fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
            if (this.encryptionConfig != null) {
                os = EncryptionFactory.buildStreamCryptoProvider(this.encryptionConfig).encodeOutputStream((OutputStream)os);
            }
            try {
                FileSystem defaultFS = FileSystem.get((Configuration)new Configuration());
                StreamThrottler throttler = (StreamThrottler)this.taskBroker.getSharedResource((SharedResourceFactory)new StreamThrottler.Factory(), (SharedResourceKey)new EmptyKey());
                ThrottledInputStream throttledInputStream = throttler.throttleInputStream().inputStream(inputStream).sourceURI(copyableFile.getOrigin().getPath().makeQualified(defaultFS.getUri(), defaultFS.getWorkingDirectory()).toUri()).targetURI(this.fs.makeQualified(writeAt).toUri()).build();
                StreamCopier copier = new StreamCopier((InputStream)throttledInputStream, (OutputStream)os, maxBytes).withBufferSize(this.bufferSize);
                log.info("File {}: Starting copy", (Object)copyableFile.getOrigin().getPath());
                if (this.isInstrumentationEnabled()) {
                    copier.withCopySpeedMeter(this.copySpeedMeter);
                }
                long numBytes = copier.copy();
                if ((this.checkFileSize || mustMatchMaxBytes) && numBytes != expectedBytes) {
                    throw new IOException(String.format("Incomplete write: expected %d, wrote %d bytes.", expectedBytes, numBytes));
                }
                this.bytesWritten.addAndGet(numBytes);
                if (this.isInstrumentationEnabled()) {
                    log.info("File {}: copied {} bytes, average rate: {} B/s", new Object[]{copyableFile.getOrigin().getPath(), this.copySpeedMeter.getCount(), this.copySpeedMeter.getMeanRate()});
                } else {
                    log.info("File {} copied.", (Object)copyableFile.getOrigin().getPath());
                }
            }
            catch (NotConfiguredException nce) {
                log.warn("Broker error. Some features of stream copier may not be available.", (Throwable)nce);
            }
            finally {
                ((OutputStream)os).close();
                inputStream.close();
            }
        }
    }

    protected void setFilePermissions(CopyableFile file) throws IOException {
        this.setRecursivePermission(this.getStagingFilePath(file), file.getDestinationOwnerAndPermission());
    }

    protected Path getStagingFilePath(CopyableFile file) {
        if (DistcpFileSplitter.isSplitWorkUnit((State)this.state)) {
            return new Path(this.stagingDir, ((DistcpFileSplitter.Split)DistcpFileSplitter.getSplit((State)this.state).get()).getPartName());
        }
        return new Path(this.stagingDir, file.getDestination().getName());
    }

    protected static Path getPartitionOutputRoot(Path outputDir, CopyEntity.DatasetAndPartition datasetAndPartition) {
        return new Path(outputDir, datasetAndPartition.identifier());
    }

    public static Path getOutputFilePath(CopyableFile file, Path outputDir, CopyEntity.DatasetAndPartition datasetAndPartition) {
        Path destinationWithoutSchemeAndAuthority = PathUtils.getPathWithoutSchemeAndAuthority((Path)file.getDestination());
        return new Path(FileAwareInputStreamDataWriter.getPartitionOutputRoot(outputDir, datasetAndPartition), PathUtils.withoutLeadingSeparator((Path)destinationWithoutSchemeAndAuthority));
    }

    public static Path getSplitOutputFilePath(CopyableFile file, Path outputDir, CopyEntity.DatasetAndPartition datasetAndPartition, State workUnit) {
        if (DistcpFileSplitter.isSplitWorkUnit(workUnit)) {
            return new Path(FileAwareInputStreamDataWriter.getOutputFilePath(file, outputDir, datasetAndPartition).getParent(), ((DistcpFileSplitter.Split)DistcpFileSplitter.getSplit(workUnit).get()).getPartName());
        }
        return FileAwareInputStreamDataWriter.getOutputFilePath(file, outputDir, datasetAndPartition);
    }

    public static Path getOutputDir(State state) {
        return new Path(state.getProp(ForkOperatorUtils.getPropertyNameForBranch((String)"writer.output.dir", (int)1, (int)0)));
    }

    private void safeSetPathPermission(Path path, OwnerAndPermission ownerAndPermission) {
        try {
            if (ownerAndPermission.getFsPermission() != null) {
                this.fs.setPermission(path, ownerAndPermission.getFsPermission());
            }
        }
        catch (IOException ioe) {
            log.warn("Failed to set permission for directory " + path, (Throwable)ioe);
        }
        String owner = Strings.isNullOrEmpty((String)ownerAndPermission.getOwner()) ? null : ownerAndPermission.getOwner();
        String group = Strings.isNullOrEmpty((String)ownerAndPermission.getGroup()) ? null : ownerAndPermission.getGroup();
        try {
            if (owner != null || group != null) {
                this.fs.setOwner(path, owner, group);
            }
        }
        catch (IOException ioe) {
            log.warn("Failed to set owner and/or group for path " + path + " to " + owner + ":" + group, (Throwable)ioe);
        }
    }

    private void setRecursivePermission(Path path, OwnerAndPermission ownerAndPermission) throws IOException {
        List files = FileListUtils.listPathsRecursively((FileSystem)this.fs, (Path)path, (PathFilter)FileListUtils.NO_OP_PATH_FILTER);
        Collections.reverse(files);
        for (FileStatus file : files) {
            this.safeSetPathPermission(file.getPath(), FileAwareInputStreamDataWriter.addExecutePermissionsIfRequired(file, ownerAndPermission));
        }
    }

    private static OwnerAndPermission addExecutePermissionsIfRequired(FileStatus file, OwnerAndPermission ownerAndPermission) {
        if (ownerAndPermission.getFsPermission() == null) {
            return ownerAndPermission;
        }
        if (!file.isDir()) {
            return ownerAndPermission;
        }
        return new OwnerAndPermission(ownerAndPermission.getOwner(), ownerAndPermission.getGroup(), FileAwareInputStreamDataWriter.addExecutePermissionToOwner(ownerAndPermission.getFsPermission()));
    }

    static FsPermission addExecutePermissionToOwner(FsPermission fsPermission) {
        FsAction newOwnerAction = fsPermission.getUserAction().or(FsAction.EXECUTE);
        return new FsPermission(newOwnerAction, fsPermission.getGroupAction(), fsPermission.getOtherAction());
    }

    public long recordsWritten() {
        return this.filesWritten.get();
    }

    public long bytesWritten() throws IOException {
        return this.bytesWritten.get();
    }

    public void commit() throws IOException {
        if (!this.actualProcessedCopyableFile.isPresent()) {
            return;
        }
        CopyableFile copyableFile = (CopyableFile)this.actualProcessedCopyableFile.get();
        Path stagingFilePath = this.getStagingFilePath(copyableFile);
        Path outputFilePath = FileAwareInputStreamDataWriter.getSplitOutputFilePath(copyableFile, this.outputDir, copyableFile.getDatasetAndPartition(this.copyableDatasetMetadata), (State)this.state);
        log.info(String.format("Committing data from %s to %s", stagingFilePath, outputFilePath));
        try {
            this.setFilePermissions(copyableFile);
            UnmodifiableIterator ancestorOwnerAndPermissionIt = copyableFile.getAncestorsOwnerAndPermission() == null ? Iterators.emptyIterator() : copyableFile.getAncestorsOwnerAndPermission().iterator();
            this.ensureDirectoryExists(this.fs, outputFilePath.getParent(), (Iterator<OwnerAndPermission>)ancestorOwnerAndPermissionIt);
            this.fileContext.rename(stagingFilePath, outputFilePath, new Options.Rename[]{this.renameOptions});
        }
        catch (IOException ioe) {
            log.error("Could not commit file %s.", (Object)outputFilePath);
            this.recoveryHelper.persistFile((State)this.state, copyableFile, stagingFilePath);
            throw ioe;
        }
        finally {
            try {
                this.fs.delete(this.stagingDir, true);
            }
            catch (IOException ioe) {
                log.warn("Failed to delete staging path at " + this.stagingDir);
            }
        }
    }

    private void ensureDirectoryExists(FileSystem fs, Path path, Iterator<OwnerAndPermission> ownerAndPermissionIterator) throws IOException {
        if (fs.exists(path)) {
            return;
        }
        if (ownerAndPermissionIterator.hasNext()) {
            OwnerAndPermission ownerAndPermission = ownerAndPermissionIterator.next();
            if (path.getParent() != null) {
                this.ensureDirectoryExists(fs, path.getParent(), ownerAndPermissionIterator);
            }
            if (!fs.mkdirs(path)) {
                return;
            }
            if (ownerAndPermission.getFsPermission() != null) {
                log.debug("Applying permissions %s to path %s.", (Object)ownerAndPermission.getFsPermission(), (Object)path);
                fs.setPermission(path, FileAwareInputStreamDataWriter.addExecutePermissionToOwner(ownerAndPermission.getFsPermission()));
            }
            String group = ownerAndPermission.getGroup();
            String owner = ownerAndPermission.getOwner();
            if (group != null || owner != null) {
                log.debug("Applying owner %s and group %s to path %s.", new Object[]{owner, group, path});
                fs.setOwner(path, owner, group);
            }
        } else {
            fs.mkdirs(path);
        }
    }

    public void cleanup() throws IOException {
    }

    public State getFinalState() {
        State state = new State();
        if (this.actualProcessedCopyableFile.isPresent()) {
            CopySource.serializeCopyEntity(state, (CopyEntity)this.actualProcessedCopyableFile.get());
        }
        ConstructState constructState = new ConstructState();
        constructState.addOverwriteProperties(state);
        return constructState;
    }

    public boolean isSpeculativeAttemptSafe() {
        return this.writerAttemptIdOptional.isPresent() && ((Object)((Object)this)).getClass() == FileAwareInputStreamDataWriter.class;
    }
}

