/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.files;

import com.hazelcast.function.PredicateEx;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.io.BufferedWriter;
import java.io.File;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class AccessLogStreamAnalyzer {
    private static Pipeline buildPipeline(Path tempDir) {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.fileWatcher((String)tempDir.toString())).withoutTimestamps().map(LogLine::parse).filter((PredicateEx & Serializable)line -> line.getResponseCode() >= 200 && line.getResponseCode() < 400).addTimestamps(LogLine::getTimestamp, 1000L).window((WindowDefinition)WindowDefinition.sliding((long)10000L, (long)1000L)).groupingKey(LogLine::getEndpoint).aggregate(AggregateOperations.counting()).writeTo(Sinks.logger());
        return p;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        Path tempDir = Files.createTempDirectory(AccessLogStreamAnalyzer.class.getSimpleName(), new FileAttribute[0]);
        Pipeline p = AccessLogStreamAnalyzer.buildPipeline(tempDir);
        JetInstance instance = Jet.bootstrappedInstance();
        try {
            instance.newJob(p);
            AccessLogStreamAnalyzer.startGenerator(tempDir);
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
        }
        finally {
            Jet.shutdownAll();
            IOUtil.delete((File)tempDir.toFile());
        }
    }

    private static void startGenerator(Path tempDir) throws Exception {
        Random random = new Random();
        try (BufferedWriter w = Files.newBufferedWriter(tempDir.resolve("access_log"), StandardOpenOption.CREATE);){
            for (int i = 0; i < 60000; ++i) {
                int articleNum = Math.min(10, Math.max(0, (int)(random.nextGaussian() * 2.0 + 5.0)));
                w.append("129.21.37.3 - - [").append(LogLine.DATE_TIME_FORMATTER.format(ZonedDateTime.now())).append("] \"GET /article").append(String.valueOf(articleNum)).append(" HTTP/1.0\" 200 12345");
                w.newLine();
                w.flush();
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
            }
        }
    }

    private static class LogLine
    implements Serializable {
        public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("dd/MMM/yyyy:HH:mm:ss Z", Locale.US);
        private static final String LOG_ENTRY_PATTERN = "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)";
        private static final Pattern PATTERN = Pattern.compile("^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)");
        private final String ipAddress;
        private final String clientIdentd;
        private final String userID;
        private final long timestamp;
        private final String method;
        private final String endpoint;
        private final String protocol;
        private final int responseCode;
        private final long contentSize;

        LogLine(String ipAddress, String clientIdentd, String userID, long timestamp, String method, String endpoint, String protocol, int responseCode, long contentSize) {
            this.ipAddress = ipAddress;
            this.clientIdentd = clientIdentd;
            this.userID = userID;
            this.timestamp = timestamp;
            this.method = method;
            this.endpoint = endpoint;
            this.protocol = protocol;
            this.responseCode = responseCode;
            this.contentSize = contentSize;
        }

        public static LogLine parse(String line) {
            Matcher m = PATTERN.matcher(line);
            if (!m.find()) {
                throw new IllegalArgumentException("Cannot parse log line: " + line);
            }
            long time = ZonedDateTime.parse(m.group(4), DATE_TIME_FORMATTER).toInstant().toEpochMilli();
            return new LogLine(m.group(1), m.group(2), m.group(3), time, m.group(5), m.group(6), m.group(7), Integer.parseInt(m.group(8)), Long.parseLong(m.group(9)));
        }

        public String getIpAddress() {
            return this.ipAddress;
        }

        public String getClientIdentd() {
            return this.clientIdentd;
        }

        public String getUserID() {
            return this.userID;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String getMethod() {
            return this.method;
        }

        public String getEndpoint() {
            return this.endpoint;
        }

        public String getProtocol() {
            return this.protocol;
        }

        public int getResponseCode() {
            return this.responseCode;
        }

        public long getContentSize() {
            return this.contentSize;
        }
    }
}

