/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.util.logs;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.util.concurrent.AbstractScheduledService;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.concurrent.ScheduledTask;
import org.apache.gobblin.util.concurrent.TaskScheduler;
import org.apache.gobblin.util.concurrent.TaskSchedulerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogCopier
extends AbstractScheduledService {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogCopier.class);
    private static final long DEFAULT_SOURCE_LOG_FILE_MONITOR_INTERVAL = 120L;
    private static final long DEFAULT_LOG_COPY_INTERVAL_SECONDS = 60L;
    private static final long DEFAULT_MAX_MINUTES_PER_LOG_FILE = Long.MAX_VALUE;
    private static final long DEFAULT_MAX_BYTES_PER_LOG_FILE = Long.MAX_VALUE;
    private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
    private static final int DEFAULT_LINES_WRITTEN_BEFORE_FLUSH = 100;
    private final FileSystem srcFs;
    private final FileSystem destFs;
    private final Path srcLogDir;
    private final Path destLogDir;
    private final long sourceLogFileMonitorInterval;
    private final long copyInterval;
    private final long maxMinutesPerLogFile;
    private final long maxBytesPerLogFile;
    private final TimeUnit timeUnit;
    private final Set<String> logFileExtensions;
    private final Optional<List<Pattern>> includingRegexPatterns;
    private final Optional<List<Pattern>> excludingRegexPatterns;
    private final Optional<String> logFileNamePrefix;
    private final int linesWrittenBeforeFlush;
    private final TaskScheduler<Path, LogCopyTask> scheduler;

    private LogCopier(Builder builder) {
        this.srcFs = builder.srcFs;
        this.destFs = builder.destFs;
        this.srcLogDir = this.srcFs.makeQualified(builder.srcLogDir);
        this.destLogDir = this.destFs.makeQualified(builder.destLogDir);
        this.sourceLogFileMonitorInterval = builder.sourceLogFileMonitorInterval;
        this.copyInterval = builder.copyInterval;
        this.maxMinutesPerLogFile = builder.maxMinutesPerLogFile;
        this.maxBytesPerLogFile = builder.maxBytesPerLogFile;
        this.timeUnit = builder.timeUnit;
        this.logFileExtensions = builder.logFileExtensions;
        this.includingRegexPatterns = Optional.fromNullable((Object)builder.includingRegexPatterns);
        this.excludingRegexPatterns = Optional.fromNullable((Object)builder.excludingRegexPatterns);
        this.logFileNamePrefix = Optional.fromNullable((Object)builder.logFileNamePrefix);
        this.linesWrittenBeforeFlush = builder.linesWrittenBeforeFlush;
        this.scheduler = TaskSchedulerFactory.get(builder.schedulerName, (Optional<String>)Optional.absent());
    }

    protected void shutDown() throws Exception {
        this.scheduler.close();
    }

    protected void runOneIteration() throws IOException {
        this.checkSrcLogFiles();
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedRateSchedule((long)0L, (long)this.sourceLogFileMonitorInterval, (TimeUnit)this.timeUnit);
    }

    private void checkSrcLogFiles() throws IOException {
        List<FileStatus> srcLogFiles = FileListUtils.listFilesRecursively(this.srcFs, this.srcLogDir, new PathFilter(){

            public boolean accept(Path path) {
                return LogCopier.this.logFileExtensions.contains(Files.getFileExtension((String)path.getName()));
            }
        });
        if (srcLogFiles.isEmpty()) {
            LOGGER.warn("No log file found under directory " + this.srcLogDir);
            return;
        }
        HashSet newLogFiles = Sets.newHashSet();
        for (FileStatus srcLogFile : srcLogFiles) {
            newLogFiles.add(srcLogFile.getPath());
        }
        HashSet deletedLogFiles = Sets.newHashSet(this.getSourceFiles());
        deletedLogFiles.removeAll(newLogFiles);
        newLogFiles.removeAll((Collection<?>)this.getSourceFiles());
        for (Path srcLogFile : newLogFiles) {
            String destLogFileName = this.logFileNamePrefix.isPresent() ? (String)this.logFileNamePrefix.get() + "." + srcLogFile.getName() : srcLogFile.getName();
            Path destLogFile = new Path(this.destLogDir, destLogFileName);
            this.scheduler.schedule(new LogCopyTask(srcLogFile, destLogFile), this.copyInterval, this.timeUnit);
        }
        for (Path deletedLogFile : deletedLogFiles) {
            Optional<LogCopyTask> logCopyTask = this.scheduler.getScheduledTask(deletedLogFile);
            if (!logCopyTask.isPresent()) continue;
            this.scheduler.cancel((LogCopyTask)((ScheduledTask)logCopyTask.get()));
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private ImmutableList<Path> getSourceFiles() {
        return ImmutableList.copyOf((Iterable)Iterables.transform(this.scheduler.getScheduledTasks(), (Function)new Function<LogCopyTask, Path>(){

            public Path apply(LogCopyTask input) {
                return input.getKey();
            }
        }));
    }

    static /* synthetic */ TimeUnit access$1600() {
        return DEFAULT_TIME_UNIT;
    }

    private class LogCopyTask
    implements ScheduledTask<Path> {
        private final Path srcLogFile;
        private final Path destLogFile;
        private final Stopwatch watch;
        private long currentPos = 0L;

        public LogCopyTask(Path srcLogFile, Path destLogFile) {
            this.srcLogFile = srcLogFile;
            this.destLogFile = destLogFile;
            this.watch = Stopwatch.createStarted();
        }

        @Override
        public Path getKey() {
            return this.srcLogFile;
        }

        @Override
        public void runOneIteration() {
            try {
                this.createNewLogFileIfNeeded();
                LOGGER.debug(String.format("Copying changes from %s to %s", this.srcLogFile, this.destLogFile));
                this.copyChangesOfLogFile(LogCopier.this.srcFs.makeQualified(this.srcLogFile), LogCopier.this.destFs.makeQualified(this.destLogFile));
            }
            catch (IOException ioe) {
                LOGGER.error(String.format("Failed while copying logs from %s to %s", this.srcLogFile, this.destLogFile), (Throwable)ioe);
            }
        }

        private void createNewLogFileIfNeeded() throws IOException {
            if (LogCopier.this.destFs.exists(this.destLogFile) && (this.watch.elapsed(TimeUnit.MINUTES) > LogCopier.this.maxMinutesPerLogFile || LogCopier.this.destFs.getFileStatus(this.destLogFile).getLen() > LogCopier.this.maxBytesPerLogFile)) {
                HadoopUtils.renamePath(LogCopier.this.destFs, this.destLogFile, new Path(this.destLogFile.toString() + "." + System.currentTimeMillis()));
                this.watch.reset();
                this.watch.start();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void copyChangesOfLogFile(Path srcFile, Path destFile) throws IOException {
            if (!LogCopier.this.srcFs.exists(srcFile)) {
                LOGGER.warn("Source log file not found: " + srcFile);
                return;
            }
            FSDataInputStream fsDataInputStream = null;
            try (Closer closer = Closer.create();){
                String line;
                fsDataInputStream = (FSDataInputStream)closer.register((Closeable)LogCopier.this.srcFs.open(srcFile));
                LOGGER.debug(String.format("Reading log file %s from position %d", srcFile, this.currentPos));
                fsDataInputStream.seek(this.currentPos);
                BufferedReader srcLogFileReader = (BufferedReader)closer.register((Closeable)new BufferedReader(new InputStreamReader((InputStream)fsDataInputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
                FSDataOutputStream outputStream = LogCopier.this.destFs.exists(destFile) ? LogCopier.this.destFs.append(destFile) : LogCopier.this.destFs.create(destFile);
                BufferedWriter destLogFileWriter = (BufferedWriter)closer.register((Closeable)new BufferedWriter(new OutputStreamWriter((OutputStream)outputStream, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
                int linesProcessed = 0;
                while (!Thread.currentThread().isInterrupted() && (line = srcLogFileReader.readLine()) != null) {
                    if (!this.shouldCopyLine(line)) continue;
                    destLogFileWriter.write(line);
                    destLogFileWriter.newLine();
                    if (++linesProcessed % LogCopier.this.linesWrittenBeforeFlush != 0) continue;
                    destLogFileWriter.flush();
                }
            }
            finally {
                if (fsDataInputStream != null) {
                    this.currentPos = fsDataInputStream.getPos();
                }
            }
        }

        private boolean shouldCopyLine(String line) {
            boolean including = !LogCopier.this.includingRegexPatterns.isPresent() || DatasetFilterUtils.stringInPatterns(line, (List)LogCopier.this.includingRegexPatterns.get());
            boolean excluding = LogCopier.this.excludingRegexPatterns.isPresent() && DatasetFilterUtils.stringInPatterns(line, (List)LogCopier.this.excludingRegexPatterns.get());
            return !excluding && including;
        }
    }

    public static class Builder {
        private static final Splitter COMMA_SPLITTER = Splitter.on((char)',').omitEmptyStrings().trimResults();
        private FileSystem srcFs;
        private Path srcLogDir;
        private FileSystem destFs;
        private Path destLogDir;
        private long sourceLogFileMonitorInterval = 120L;
        private long copyInterval = 60L;
        private long maxMinutesPerLogFile = Long.MAX_VALUE;
        private long maxBytesPerLogFile = Long.MAX_VALUE;
        private TimeUnit timeUnit = LogCopier.access$1600();
        private Set<String> logFileExtensions;
        private List<Pattern> includingRegexPatterns;
        private List<Pattern> excludingRegexPatterns;
        private String logFileNamePrefix;
        private int linesWrittenBeforeFlush = 100;
        private String schedulerName = null;

        public Builder useSourceLogFileMonitorInterval(long sourceLogFileMonitorInterval) {
            Preconditions.checkArgument((sourceLogFileMonitorInterval > 0L ? 1 : 0) != 0, (Object)"Source log file monitor interval must be positive");
            this.sourceLogFileMonitorInterval = sourceLogFileMonitorInterval;
            return this;
        }

        public Builder useCopyInterval(long copyInterval) {
            Preconditions.checkArgument((copyInterval > 0L ? 1 : 0) != 0, (Object)"Copy interval must be positive");
            this.copyInterval = copyInterval;
            return this;
        }

        public Builder useMaxMinutesPerLogFile(long maxMinutesPerLogFile) {
            Preconditions.checkArgument((maxMinutesPerLogFile > 0L ? 1 : 0) != 0, (Object)"Max minutes per log file must be positive");
            this.maxMinutesPerLogFile = maxMinutesPerLogFile;
            return this;
        }

        public Builder useMaxBytesPerLogFile(long maxBytesPerLogFile) {
            Preconditions.checkArgument((maxBytesPerLogFile > 0L ? 1 : 0) != 0, (Object)"Max bytes per log file must be positive");
            this.maxBytesPerLogFile = maxBytesPerLogFile;
            return this;
        }

        public Builder useTimeUnit(TimeUnit timeUnit) {
            Preconditions.checkNotNull((Object)((Object)timeUnit));
            this.timeUnit = timeUnit;
            return this;
        }

        public Builder acceptsLogFileExtensions(Set<String> logFileExtensions) {
            Preconditions.checkNotNull(logFileExtensions);
            this.logFileExtensions = ImmutableSet.copyOf(logFileExtensions);
            return this;
        }

        public Builder useIncludingRegexPatterns(String regexList) {
            Preconditions.checkNotNull((Object)regexList);
            this.includingRegexPatterns = DatasetFilterUtils.getPatternsFromStrings(COMMA_SPLITTER.splitToList((CharSequence)regexList));
            return this;
        }

        public Builder useExcludingRegexPatterns(String regexList) {
            Preconditions.checkNotNull((Object)regexList);
            this.excludingRegexPatterns = DatasetFilterUtils.getPatternsFromStrings(COMMA_SPLITTER.splitToList((CharSequence)regexList));
            return this;
        }

        public Builder useSrcFileSystem(FileSystem srcFs) {
            Preconditions.checkNotNull((Object)srcFs);
            this.srcFs = srcFs;
            return this;
        }

        public Builder useDestFileSystem(FileSystem destFs) {
            Preconditions.checkNotNull((Object)destFs);
            this.destFs = destFs;
            return this;
        }

        public Builder readFrom(Path srcLogDir) {
            Preconditions.checkNotNull((Object)srcLogDir);
            this.srcLogDir = srcLogDir;
            return this;
        }

        public Builder writeTo(Path destLogDir) {
            Preconditions.checkNotNull((Object)destLogDir);
            this.destLogDir = destLogDir;
            return this;
        }

        public Builder useLogFileNamePrefix(String logFileNamePrefix) {
            Preconditions.checkArgument((!Strings.isNullOrEmpty((String)logFileNamePrefix) ? 1 : 0) != 0, (Object)("Invalid log file name prefix: " + logFileNamePrefix));
            this.logFileNamePrefix = logFileNamePrefix;
            return this;
        }

        public Builder useLinesWrittenBeforeFlush(int linesWrittenBeforeFlush) {
            Preconditions.checkArgument((linesWrittenBeforeFlush > 0 ? 1 : 0) != 0, (Object)"The value specifying the lines to write before flush must be positive");
            this.linesWrittenBeforeFlush = linesWrittenBeforeFlush;
            return this;
        }

        public Builder useScheduler(String schedulerName) {
            Preconditions.checkArgument((!Strings.isNullOrEmpty((String)schedulerName) ? 1 : 0) != 0, (Object)("Invalid scheduler name: " + schedulerName));
            this.schedulerName = schedulerName;
            return this;
        }

        public LogCopier build() {
            return new LogCopier(this);
        }
    }
}

