/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.util.logs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractScheduledService;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.filesystem.FileSystemSupplier;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogCopier
extends AbstractScheduledService {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogCopier.class);
    private static final long DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL = 120L;
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
    private static final int DEFAULT_LINES_WRITTEN_BEFORE_FLUSH = 100;
    private static final int DEFAULT_NUM_COPY_THREADS = 10;
    private FileSystem srcFs;
    private FileSystem destFs;
    private final List<Path> srcLogDirs;
    private final Path destLogDir;
    private final long sourceLogFileMonitorInterval;
    private final TimeUnit timeUnit;
    private final FileSystemSupplier destFsSupplier;
    private final FileSystemSupplier srcFsSupplier;
    private final Set<String> logFileExtensions;
    private final int numCopyThreads;
    private final String currentLogFileName;
    private final Optional<List<Pattern>> includingRegexPatterns;
    private final Optional<List<Pattern>> excludingRegexPatterns;
    private final Optional<String> logFileNamePrefix;
    private final int linesWrittenBeforeFlush;
    private final ExecutorService executorService;
    private boolean needToUpdateDestFs;
    private boolean needToUpdateSrcFs;
    private final Set<String> copiedFileNames = Sets.newConcurrentHashSet();
    private boolean shouldCopyCurrentLogFile;

    private LogCopier(Builder builder) throws IOException {
        this.destFsSupplier = builder.destFsSupplier;
        this.srcFsSupplier = builder.srcFsSupplier;
        this.srcFs = this.srcFsSupplier != null ? this.srcFsSupplier.getFileSystem() : builder.srcFs;
        Preconditions.checkArgument((this.srcFs != null ? 1 : 0) != 0, (Object)"srcFs or srcFsSupplier has not been set");
        this.destFs = this.destFsSupplier != null ? this.destFsSupplier.getFileSystem() : builder.destFs;
        Preconditions.checkArgument((this.destFs != null ? 1 : 0) != 0, (Object)"destFs or destFsSupplier has not been set");
        this.srcLogDirs = builder.srcLogDirs.stream().map(d -> this.srcFs.makeQualified(d)).collect(Collectors.toList());
        this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
        this.sourceLogFileMonitorInterval = builder.sourceLogFileMonitorInterval;
        this.timeUnit = builder.timeUnit;
        this.logFileExtensions = builder.logFileExtensions;
        this.currentLogFileName = builder.currentLogFileName;
        this.shouldCopyCurrentLogFile = false;
        this.needToUpdateDestFs = false;
        this.needToUpdateSrcFs = false;
        this.includingRegexPatterns = Optional.fromNullable((Object)builder.includingRegexPatterns);
        this.excludingRegexPatterns = Optional.fromNullable((Object)builder.excludingRegexPatterns);
        this.logFileNamePrefix = Optional.fromNullable((Object)builder.logFileNamePrefix);
        this.linesWrittenBeforeFlush = builder.linesWrittenBeforeFlush;
        this.numCopyThreads = builder.numCopyThreads;
        this.executorService = Executors.newFixedThreadPool(this.numCopyThreads);
    }

    protected void shutDown() throws Exception {
        try {
            this.shouldCopyCurrentLogFile = true;
            this.runOneIteration();
            LOGGER.debug("Closing FileSystem objects...");
            this.destFs.close();
            this.srcFs.close();
        }
        finally {
            super.shutDown();
        }
    }

    protected void runOneIteration() throws IOException {
        this.checkSrcLogFiles();
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedRateSchedule((long)0L, (long)this.sourceLogFileMonitorInterval, (TimeUnit)this.timeUnit);
    }

    private boolean shouldIncludeLogFile(FileStatus logFile) {
        Path logFilePath = logFile.getPath();
        if (this.currentLogFileName.equals(Files.getNameWithoutExtension((String)logFilePath.getName()))) {
            return this.shouldCopyCurrentLogFile;
        }
        if (this.copiedFileNames.contains(logFilePath.getName())) {
            return false;
        }
        if (this.logFileExtensions.isEmpty()) {
            return true;
        }
        return this.logFileExtensions.contains(Files.getFileExtension((String)logFilePath.getName()));
    }

    @VisibleForTesting
    void pruneCopiedFileNames(Set<String> srcLogFileNames) {
        Iterator<String> copiedFilesIterator = this.copiedFileNames.iterator();
        while (copiedFilesIterator.hasNext()) {
            String fileName = copiedFilesIterator.next();
            if (srcLogFileNames.contains(fileName)) continue;
            copiedFilesIterator.remove();
        }
    }

    @VisibleForTesting
    void checkSrcLogFiles() throws IOException {
        ArrayList<FileStatus> srcLogFiles = new ArrayList<FileStatus>();
        HashSet<String> srcLogFileNames = new HashSet<String>();
        HashSet<Path> newLogFiles = new HashSet<Path>();
        for (Path logDirPath : this.srcLogDirs) {
            srcLogFiles.addAll(FileListUtils.listFilesRecursively(this.srcFs, logDirPath));
            for (FileStatus srcLogFile : srcLogFiles) {
                if (this.shouldIncludeLogFile(srcLogFile)) {
                    newLogFiles.add(srcLogFile.getPath());
                }
                srcLogFileNames.add(srcLogFile.getPath().getName());
            }
        }
        if (newLogFiles.isEmpty()) {
            LOGGER.warn("No log file found under directories " + this.srcLogDirs);
            return;
        }
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (Path path : newLogFiles) {
            String destLogFileName = this.logFileNamePrefix.isPresent() ? (String)this.logFileNamePrefix.get() + "." + path.getName() : path.getName();
            Path destLogFile = new Path(this.destLogDir, destLogFileName);
            futures.add(this.executorService.submit(new LogCopyTask(path, destLogFile)));
        }
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                LOGGER.error("LogCopyTask was interrupted - {}", (Throwable)e);
            }
            catch (ExecutionException e) {
                LOGGER.error("Failed LogCopyTask - {}", (Throwable)e);
            }
        }
        if (this.needToUpdateDestFs) {
            if (this.destFsSupplier == null) {
                throw new IOException("Try to update dest fileSystem but destFsSupplier has not been set");
            }
            this.destFs.close();
            this.destFs = this.destFsSupplier.getFileSystem();
            LOGGER.info("Dest fs updated" + this.destFs.toString());
            this.needToUpdateDestFs = false;
        }
        if (this.needToUpdateSrcFs) {
            if (this.srcFsSupplier == null) {
                throw new IOException("Try to update source fileSystem but srcFsSupplier has not been set");
            }
            this.srcFs.close();
            this.srcFs = this.srcFsSupplier.getFileSystem();
            LOGGER.info("Src fs updated" + this.srcFs.toString());
            this.needToUpdateSrcFs = false;
        }
        this.pruneCopiedFileNames(srcLogFileNames);
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public void setNeedToUpdateDestFs(boolean needToUpdateDestFs) {
        this.needToUpdateDestFs = needToUpdateDestFs;
    }

    public void setNeedToUpdateSrcFs(boolean needToUpdateSrcFs) {
        this.needToUpdateSrcFs = needToUpdateSrcFs;
    }

    public Set<String> getCopiedFileNames() {
        return this.copiedFileNames;
    }

    static /* synthetic */ TimeUnit access$1500() {
        return DEFAULT_TIME_UNIT;
    }

    private class LogCopyTask
    implements Callable<Void> {
        private final Path srcLogFile;
        private final Path destLogFile;

        public LogCopyTask(Path srcLogFile, Path destLogFile) {
            this.srcLogFile = srcLogFile;
            this.destLogFile = destLogFile;
        }

        @Override
        public Void call() {
            try {
                this.copyChangesOfLogFile(LogCopier.this.srcFs.makeQualified(this.srcLogFile), LogCopier.this.destFs.makeQualified(this.destLogFile));
            }
            catch (IOException ioe) {
                LOGGER.error(String.format("Failed while copying logs from %s to %s", this.srcLogFile, this.destLogFile), (Throwable)ioe);
            }
            return null;
        }

        private void copyChangesOfLogFile(Path srcFile, Path destFile) throws IOException {
            LOGGER.info("Copying changes from {} to {}", (Object)srcFile.toString(), (Object)destFile.toString());
            if (!LogCopier.this.srcFs.exists(srcFile)) {
                LOGGER.warn("Source log file not found: " + srcFile);
                return;
            }
            FSDataInputStream fsDataInputStream = null;
            try (Closer closer = Closer.create();){
                String line;
                fsDataInputStream = (FSDataInputStream)closer.register((Closeable)LogCopier.this.srcFs.open(srcFile));
                BufferedReader srcLogFileReader = (BufferedReader)closer.register((Closeable)new BufferedReader(new InputStreamReader((InputStream)fsDataInputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
                FSDataOutputStream outputStream = LogCopier.this.destFs.create(destFile);
                BufferedWriter destLogFileWriter = (BufferedWriter)closer.register((Closeable)new BufferedWriter(new OutputStreamWriter((OutputStream)outputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
                int linesProcessed = 0;
                while (!Thread.currentThread().isInterrupted() && (line = srcLogFileReader.readLine()) != null) {
                    if (!this.shouldCopyLine(line)) continue;
                    destLogFileWriter.write(line);
                    destLogFileWriter.newLine();
                    if (++linesProcessed % LogCopier.this.linesWrittenBeforeFlush != 0) continue;
                    destLogFileWriter.flush();
                }
                LogCopier.this.copiedFileNames.add(srcFile.getName());
            }
        }

        private boolean shouldCopyLine(String line) {
            boolean including = !LogCopier.this.includingRegexPatterns.isPresent() || DatasetFilterUtils.stringInPatterns(line, (List)LogCopier.this.includingRegexPatterns.get());
            boolean excluding = LogCopier.this.excludingRegexPatterns.isPresent() && DatasetFilterUtils.stringInPatterns(line, (List)LogCopier.this.excludingRegexPatterns.get());
            return !excluding && including;
        }
    }

    public static class Builder {
        private static final Splitter COMMA_SPLITTER = Splitter.on((char)',').omitEmptyStrings().trimResults();
        private FileSystem srcFs = null;
        private List<Path> srcLogDirs;
        private FileSystem destFs = null;
        private Path destLogDir;
        private FileSystemSupplier destFsSupplier = null;
        private FileSystemSupplier srcFsSupplier = null;
        private long sourceLogFileMonitorInterval = 120L;
        private int numCopyThreads = 10;
        private TimeUnit timeUnit = LogCopier.access$1500();
        private Set<String> logFileExtensions;
        private String currentLogFileName;
        private List<Pattern> includingRegexPatterns;
        private List<Pattern> excludingRegexPatterns;
        private String logFileNamePrefix;
        private int linesWrittenBeforeFlush = 100;

        public Builder useSourceLogFileMonitorInterval(long sourceLogFileMonitorInterval) {
            Preconditions.checkArgument((sourceLogFileMonitorInterval > 0L ? 1 : 0) != 0, (Object)"Source log file monitor interval must be positive");
            this.sourceLogFileMonitorInterval = sourceLogFileMonitorInterval;
            return this;
        }

        public Builder useTimeUnit(TimeUnit timeUnit) {
            Preconditions.checkNotNull((Object)((Object)timeUnit));
            this.timeUnit = timeUnit;
            return this;
        }

        public Builder useDestFsSupplier(FileSystemSupplier supplier) {
            Preconditions.checkNotNull((Object)supplier);
            this.destFsSupplier = supplier;
            return this;
        }

        public Builder useSrcFsSupplier(FileSystemSupplier supplier) {
            Preconditions.checkNotNull((Object)supplier);
            this.srcFsSupplier = supplier;
            return this;
        }

        public Builder acceptsLogFileExtensions(Set<String> logFileExtensions) {
            Preconditions.checkNotNull(logFileExtensions);
            this.logFileExtensions = ImmutableSet.copyOf(logFileExtensions);
            return this;
        }

        public Builder useIncludingRegexPatterns(String regexList) {
            Preconditions.checkNotNull((Object)regexList);
            this.includingRegexPatterns = DatasetFilterUtils.getPatternsFromStrings(COMMA_SPLITTER.splitToList((CharSequence)regexList));
            return this;
        }

        public Builder useExcludingRegexPatterns(String regexList) {
            Preconditions.checkNotNull((Object)regexList);
            this.excludingRegexPatterns = DatasetFilterUtils.getPatternsFromStrings(COMMA_SPLITTER.splitToList((CharSequence)regexList));
            return this;
        }

        public Builder useSrcFileSystem(FileSystem srcFs) {
            Preconditions.checkNotNull((Object)srcFs);
            this.srcFs = srcFs;
            return this;
        }

        public Builder useDestFileSystem(FileSystem destFs) {
            Preconditions.checkNotNull((Object)destFs);
            this.destFs = destFs;
            return this;
        }

        public Builder readFrom(Path srcLogDir) {
            Preconditions.checkNotNull((Object)srcLogDir);
            this.srcLogDirs = ImmutableList.of((Object)srcLogDir);
            return this;
        }

        public Builder readFrom(List<Path> srcLogDirs) {
            Preconditions.checkNotNull(srcLogDirs);
            this.srcLogDirs = srcLogDirs;
            return this;
        }

        public Builder writeTo(Path destLogDir) {
            Preconditions.checkNotNull((Object)destLogDir);
            this.destLogDir = destLogDir;
            return this;
        }

        public Builder useLogFileNamePrefix(String logFileNamePrefix) {
            Preconditions.checkArgument((!Strings.isNullOrEmpty((String)logFileNamePrefix) ? 1 : 0) != 0, (Object)("Invalid log file name prefix: " + logFileNamePrefix));
            this.logFileNamePrefix = logFileNamePrefix;
            return this;
        }

        public Builder useLinesWrittenBeforeFlush(int linesWrittenBeforeFlush) {
            Preconditions.checkArgument((linesWrittenBeforeFlush > 0 ? 1 : 0) != 0, (Object)"The value specifying the lines to write before flush must be positive");
            this.linesWrittenBeforeFlush = linesWrittenBeforeFlush;
            return this;
        }

        public Builder useCurrentLogFileName(String currentLogFileName) {
            this.currentLogFileName = currentLogFileName;
            return this;
        }

        public Builder useNumCopyThreads(int numCopyThreads) {
            this.numCopyThreads = numCopyThreads;
            return this;
        }

        public LogCopier build() throws IOException {
            return new LogCopier(this);
        }
    }
}

