package org.opennms.nephron;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Instant;
import org.opennms.nephron.Aggregate;
import org.opennms.nephron.CompoundKey;
import org.opennms.nephron.FlowSummaryData;
import org.opennms.nephron.coders.FlowDocumentProtobufCoder;
import org.opennms.nephron.coders.KafkaInputFlowDeserializer;
import org.opennms.nephron.elastic.AggregationType;
import org.opennms.nephron.elastic.FlowSummary;
import org.opennms.nephron.elastic.IndexStrategy;
import org.opennms.netmgt.flows.persistence.model.Direction;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/nephron/Pipeline.class */
public class Pipeline {
    private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    public static final RateLimitedLog RATE_LIMITED_LOG = RateLimitedLog.withRateLimit(LOG).maxRate(5).every(Duration.ofSeconds(10)).build();

    /* loaded from: input_file:org/opennms/nephron/Pipeline$CalculateFlowStatistics.class */
    public static class CalculateFlowStatistics extends PTransform<PCollection<FlowDocument>, PCollection<FlowSummaryData>> {
        private final int topK;
        private final org.joda.time.Duration fixedWindowSize;
        private final org.joda.time.Duration maxFlowDuration;
        private final org.joda.time.Duration earlyProcessingDelay;
        private final org.joda.time.Duration lateProcessingDelay;
        private final org.joda.time.Duration allowedLateness;

        public CalculateFlowStatistics(int i, org.joda.time.Duration duration, org.joda.time.Duration duration2, org.joda.time.Duration duration3, org.joda.time.Duration duration4, org.joda.time.Duration duration5) {
            this.topK = i;
            this.fixedWindowSize = (org.joda.time.Duration) Objects.requireNonNull(duration);
            this.maxFlowDuration = (org.joda.time.Duration) Objects.requireNonNull(duration2);
            this.earlyProcessingDelay = (org.joda.time.Duration) Objects.requireNonNull(duration3);
            this.lateProcessingDelay = (org.joda.time.Duration) Objects.requireNonNull(duration4);
            this.allowedLateness = (org.joda.time.Duration) Objects.requireNonNull(duration5);
        }

        public CalculateFlowStatistics(NephronOptions nephronOptions) {
            this(nephronOptions.getTopK(), org.joda.time.Duration.millis(nephronOptions.getFixedWindowSizeMs()), org.joda.time.Duration.millis(nephronOptions.getMaxFlowDurationMs()), org.joda.time.Duration.millis(nephronOptions.getEarlyProcessingDelayMs()), org.joda.time.Duration.millis(nephronOptions.getLateProcessingDelayMs()), org.joda.time.Duration.millis(nephronOptions.getAllowedLatenessMs()));
        }

        public PCollection<FlowSummaryData> expand(PCollection<FlowDocument> pCollection) {
            PCollection apply = pCollection.apply("WindowedFlows", new WindowedFlows(this.fixedWindowSize, this.maxFlowDuration, this.earlyProcessingDelay, this.lateProcessingDelay, this.allowedLateness));
            SumsAndTopKs aggregateSumsAndTopKs = Pipeline.aggregateSumsAndTopKs("app_", apply, CompoundKeyType.EXPORTER_INTERFACE_TOS_APPLICATION, CompoundKeyType.EXPORTER_INTERFACE_APPLICATION, this.topK);
            SumsAndTopKs aggregateSumsAndTopKs2 = Pipeline.aggregateSumsAndTopKs("host_", apply, CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST, CompoundKeyType.EXPORTER_INTERFACE_HOST, this.topK);
            SumsAndTopKs aggregateSumsAndTopKs3 = Pipeline.aggregateSumsAndTopKs("conv_", apply, CompoundKeyType.EXPORTER_INTERFACE_TOS_CONVERSATION, CompoundKeyType.EXPORTER_INTERFACE_CONVERSATION, this.topK);
            TotalAndSummary aggregateParentTotal = Pipeline.aggregateParentTotal("tos_", aggregateSumsAndTopKs.withTos.sum);
            return PCollectionList.of(Pipeline.aggregateParentTotal("itf_", aggregateParentTotal.total).summary).and(aggregateParentTotal.summary).and(aggregateSumsAndTopKs.withTos.topK).and(aggregateSumsAndTopKs.withoutTos.topK).and(aggregateSumsAndTopKs2.withTos.topK).and(aggregateSumsAndTopKs2.withoutTos.topK).and(aggregateSumsAndTopKs3.withTos.topK).and(aggregateSumsAndTopKs3.withoutTos.topK).apply(Flatten.pCollections());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opennms/nephron/Pipeline$FlowBytesValueComparator.class */
    public static class FlowBytesValueComparator implements Comparator<KV<CompoundKey, Aggregate>>, Serializable {
        FlowBytesValueComparator() {
        }

        @Override // java.util.Comparator
        public int compare(KV<CompoundKey, Aggregate> kv, KV<CompoundKey, Aggregate> kv2) {
            int compare = Long.compare(((Aggregate) kv.getValue()).getBytes(), ((Aggregate) kv2.getValue()).getBytes());
            return compare != 0 ? compare : ((CompoundKey) kv2.getKey()).groupedByKey().compareTo(((CompoundKey) kv.getKey()).groupedByKey());
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$KeyFlowBy.class */
    public static class KeyFlowBy extends DoFn<FlowDocument, KV<CompoundKey, Aggregate>> {
        private final CompoundKeyType type;
        private final Counter flowsWithMissingFields = Metrics.counter(Pipeline.class, "flowsWithMissingFields");
        private final Counter flowsInWindow = Metrics.counter("flows", "in_window");

        public KeyFlowBy(CompoundKeyType compoundKeyType) {
            this.type = compoundKeyType;
        }

        public static long bytesInWindow(long j, long j2, double d, long j3, long j4) {
            long j5 = (j2 - j) + 1;
            return ((long) ((((Math.min(j2, j4) - j) + 1) * d) / j5)) - ((long) (((((Math.max(j, j3) - 1) - j) + 1) * d) / j5));
        }

        private Aggregate aggregatize(IntervalWindow intervalWindow, FlowDocument flowDocument, String str) {
            Aggregate aggregate;
            double d = 1.0d;
            if (flowDocument.hasSamplingInterval()) {
                double value = flowDocument.getSamplingInterval().getValue();
                if (value > 0.0d) {
                    d = value;
                }
            }
            long bytesInWindow = bytesInWindow(flowDocument.getDeltaSwitched().getValue(), flowDocument.getLastSwitched().getValue(), flowDocument.getNumBytes().getValue() * d, intervalWindow.start().getMillis(), intervalWindow.maxTimestamp().getMillis());
            this.flowsInWindow.inc();
            if (Direction.INGRESS.equals(flowDocument.getDirection())) {
                aggregate = new Aggregate(bytesInWindow, 0L, str, flowDocument.hasEcn() ? Integer.valueOf(flowDocument.getEcn().getValue()) : null);
            } else {
                aggregate = new Aggregate(0L, bytesInWindow, str, flowDocument.hasEcn() ? Integer.valueOf(flowDocument.getEcn().getValue()) : null);
            }
            return aggregate;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<FlowDocument, KV<CompoundKey, Aggregate>>.ProcessContext processContext, IntervalWindow intervalWindow) {
            FlowDocument flowDocument = (FlowDocument) processContext.element();
            try {
                for (WithHostname<CompoundKey> withHostname : key(flowDocument)) {
                    processContext.output(KV.of(withHostname.value, aggregatize(intervalWindow, flowDocument, withHostname.hostname)));
                }
            } catch (MissingFieldsException e) {
                this.flowsWithMissingFields.inc();
            }
        }

        public Collection<WithHostname<CompoundKey>> key(FlowDocument flowDocument) throws MissingFieldsException {
            return this.type.create(flowDocument);
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$ReadFromKafka.class */
    public static class ReadFromKafka extends PTransform<PBegin, PCollection<FlowDocument>> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaConsumerConfig;
        private final Counter flowsFromKafka = Metrics.counter("flows", "from_kafka");
        private final Distribution flowsFromKafkaDrift = Metrics.distribution("flows", "from_kafka_drift");

        public ReadFromKafka(String str, String str2, Map<String, Object> map) {
            this.bootstrapServers = (String) Objects.requireNonNull(str);
            this.topic = (String) Objects.requireNonNull(str2);
            this.kafkaConsumerConfig = (Map) Objects.requireNonNull(map);
        }

        public PCollection<FlowDocument> expand(PBegin pBegin) {
            return pBegin.apply(KafkaIO.read().withTopic(this.topic).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(KafkaInputFlowDeserializer.class).withConsumerConfigUpdates(this.kafkaConsumerConfig).withBootstrapServers(this.bootstrapServers).withTimestampPolicyFactory(Pipeline.getKafkaInputTimestampPolicyFactory(org.joda.time.Duration.millis(((NephronOptions) pBegin.getPipeline().getOptions().as(NephronOptions.class)).getDefaultMaxInputDelayMs()))).withoutMetadata()).apply(Values.create()).apply("init", ParDo.of(new DoFn<FlowDocument, FlowDocument>() { // from class: org.opennms.nephron.Pipeline.ReadFromKafka.1
                @DoFn.ProcessElement
                public void processElement(DoFn<FlowDocument, FlowDocument>.ProcessContext processContext) {
                    FlowDocument flowDocument = (FlowDocument) processContext.element();
                    if (!flowDocument.hasDeltaSwitched()) {
                        flowDocument = FlowDocument.newBuilder((FlowDocument) processContext.element()).setDeltaSwitched(flowDocument.getFirstSwitched()).build();
                    }
                    processContext.output(flowDocument);
                    ReadFromKafka.this.flowsFromKafka.inc();
                    ReadFromKafka.this.flowsFromKafkaDrift.update(System.currentTimeMillis() - flowDocument.getTimestamp());
                }
            }));
        }

        public static long getTimestampMs(FlowDocument flowDocument) {
            return flowDocument.getLastSwitched().getValue();
        }

        public static Instant getTimestamp(FlowDocument flowDocument) {
            return Instant.ofEpochMilli(getTimestampMs(flowDocument));
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$SumAndTopK.class */
    public static class SumAndTopK {
        public final PCollection<KV<CompoundKey, Aggregate>> sum;
        public final PCollection<FlowSummaryData> topK;

        public SumAndTopK(PCollection<KV<CompoundKey, Aggregate>> pCollection, PCollection<FlowSummaryData> pCollection2) {
            this.sum = pCollection;
            this.topK = pCollection2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opennms/nephron/Pipeline$SumBytes.class */
    public static class SumBytes extends Combine.BinaryCombineFn<Aggregate> {
        SumBytes() {
        }

        public Aggregate apply(Aggregate aggregate, Aggregate aggregate2) {
            return Aggregate.merge(aggregate, aggregate2);
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$SumsAndTopKs.class */
    public static class SumsAndTopKs {
        public final SumAndTopK withTos;
        public final SumAndTopK withoutTos;

        public SumsAndTopKs(SumAndTopK sumAndTopK, SumAndTopK sumAndTopK2) {
            this.withTos = sumAndTopK;
            this.withoutTos = sumAndTopK2;
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$TotalAndSummary.class */
    public static class TotalAndSummary {
        public final PCollection<KV<CompoundKey, Aggregate>> total;
        public final PCollection<FlowSummaryData> summary;

        public TotalAndSummary(PCollection<KV<CompoundKey, Aggregate>> pCollection, PCollection<FlowSummaryData> pCollection2) {
            this.total = pCollection;
            this.summary = pCollection2;
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$WindowedFlows.class */
    public static class WindowedFlows extends PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> {
        private final org.joda.time.Duration fixedWindowSize;
        private final org.joda.time.Duration maxFlowDuration;
        private final org.joda.time.Duration earlyProcessingDelay;
        private final org.joda.time.Duration lateProcessingDelay;
        private final org.joda.time.Duration allowedLateness;

        public WindowedFlows(org.joda.time.Duration duration, org.joda.time.Duration duration2, org.joda.time.Duration duration3, org.joda.time.Duration duration4, org.joda.time.Duration duration5) {
            this.fixedWindowSize = (org.joda.time.Duration) Objects.requireNonNull(duration);
            this.maxFlowDuration = (org.joda.time.Duration) Objects.requireNonNull(duration2);
            this.earlyProcessingDelay = (org.joda.time.Duration) Objects.requireNonNull(duration3);
            this.lateProcessingDelay = (org.joda.time.Duration) Objects.requireNonNull(duration4);
            this.allowedLateness = (org.joda.time.Duration) Objects.requireNonNull(duration5);
        }

        public PCollection<FlowDocument> expand(PCollection<FlowDocument> pCollection) {
            return pCollection.apply("attach_timestamp", Pipeline.attachTimestamps(this.fixedWindowSize, this.maxFlowDuration)).apply("to_windows", Pipeline.toWindow(this.fixedWindowSize, this.earlyProcessingDelay, this.lateProcessingDelay, this.allowedLateness));
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$WriteToElasticsearch.class */
    public static class WriteToElasticsearch extends PTransform<PCollection<FlowSummaryData>, PDone> {
        private final String elasticIndex;
        private final IndexStrategy indexStrategy;
        private final ElasticsearchIO.ConnectionConfiguration esConfig;
        private final Counter flowsToEs;
        private final Distribution flowsToEsDrift;
        private int elasticRetryCount;
        private long elasticRetryDuration;

        public WriteToElasticsearch(String str, String str2, String str3, String str4, IndexStrategy indexStrategy, int i, int i2, int i3, long j) {
            this.flowsToEs = Metrics.counter("flows", "to_es");
            this.flowsToEsDrift = Metrics.distribution("flows", "to_es_drift");
            Objects.requireNonNull(str);
            this.elasticIndex = (String) Objects.requireNonNull(str4);
            this.indexStrategy = (IndexStrategy) Objects.requireNonNull(indexStrategy);
            ElasticsearchIO.ConnectionConfiguration create = ElasticsearchIO.ConnectionConfiguration.create(new String[]{str}, str4, "_doc");
            if (!Strings.isNullOrEmpty(str2) && !Strings.isNullOrEmpty(str3)) {
                create = create.withUsername(str2).withPassword(str3);
            }
            this.esConfig = create.withConnectTimeout(Integer.valueOf(i)).withSocketTimeout(Integer.valueOf(i2));
            this.elasticRetryCount = i3;
            this.elasticRetryDuration = j;
        }

        public WriteToElasticsearch(NephronOptions nephronOptions) {
            this(nephronOptions.getElasticUrl(), nephronOptions.getElasticUser(), nephronOptions.getElasticPassword(), nephronOptions.getElasticFlowIndex(), nephronOptions.getElasticIndexStrategy(), nephronOptions.getElasticConnectTimeout(), nephronOptions.getElasticSocketTimeout(), nephronOptions.getElasticRetryCount(), nephronOptions.getElasticRetryDuration());
        }

        public PDone expand(PCollection<FlowSummaryData> pCollection) {
            return pCollection.apply("SerializeToJson", Pipeline.access$400()).apply("WriteToElasticsearch", ElasticsearchIO.write().withConnectionConfiguration(this.esConfig).withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(this.elasticRetryCount, org.joda.time.Duration.millis(this.elasticRetryDuration))).withIndexFn(new ElasticsearchIO.Write.FieldValueExtractFn() { // from class: org.opennms.nephron.Pipeline.WriteToElasticsearch.1
                public String apply(JsonNode jsonNode) {
                    java.time.Instant ofEpochMilli = java.time.Instant.ofEpochMilli(jsonNode.get("@timestamp").asLong());
                    String index = WriteToElasticsearch.this.indexStrategy.getIndex(WriteToElasticsearch.this.elasticIndex, ofEpochMilli);
                    WriteToElasticsearch.this.flowsToEs.inc();
                    WriteToElasticsearch.this.flowsToEsDrift.update(System.currentTimeMillis() - ofEpochMilli.toEpochMilli());
                    return index;
                }
            }));
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$WriteToKafka.class */
    public static class WriteToKafka extends PTransform<PCollection<FlowSummaryData>, PDone> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaProducerConfig;

        public WriteToKafka(String str, String str2, Map<String, Object> map) {
            this.bootstrapServers = (String) Objects.requireNonNull(str);
            this.topic = (String) Objects.requireNonNull(str2);
            this.kafkaProducerConfig = map;
        }

        public PDone expand(PCollection<FlowSummaryData> pCollection) {
            return pCollection.apply(Pipeline.access$400()).apply(KafkaIO.write().withProducerConfigUpdates(this.kafkaProducerConfig).withBootstrapServers(this.bootstrapServers).withTopic(this.topic).withValueSerializer(StringSerializer.class).values());
        }
    }

    public static org.apache.beam.sdk.Pipeline create(NephronOptions nephronOptions) {
        Objects.requireNonNull(nephronOptions);
        org.apache.beam.sdk.Pipeline create = org.apache.beam.sdk.Pipeline.create(nephronOptions);
        registerCoders(create);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        if (!Strings.isNullOrEmpty(nephronOptions.getKafkaClientProperties())) {
            Properties properties = new Properties();
            try {
                properties.load(new FileReader(nephronOptions.getKafkaClientProperties()));
                for (Map.Entry entry : properties.entrySet()) {
                    hashMap.put(entry.getKey().toString(), entry.getValue());
                    hashMap2.put(entry.getKey().toString(), entry.getValue());
                }
            } catch (IOException e) {
                LOG.error("Error loading properties file", e);
                throw new RuntimeException("Error reading properties file", e);
            }
        }
        hashMap.put("group.id", nephronOptions.getGroupId());
        hashMap.put("enable.auto.commit", Boolean.valueOf(nephronOptions.getAutoCommit()));
        PCollection apply = create.apply(new ReadFromKafka(nephronOptions.getBootstrapServers(), nephronOptions.getFlowSourceTopic(), hashMap)).apply(new CalculateFlowStatistics(nephronOptions));
        apply.apply(new WriteToElasticsearch(nephronOptions));
        if (nephronOptions.getFlowDestTopic() != null) {
            apply.apply(new WriteToKafka(nephronOptions.getBootstrapServers(), nephronOptions.getFlowDestTopic(), hashMap2));
        }
        return create;
    }

    public static void registerCoders(org.apache.beam.sdk.Pipeline pipeline) {
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForClass(FlowDocument.class, new FlowDocumentProtobufCoder());
        coderRegistry.registerCoderForClass(FlowSummaryData.class, new FlowSummaryData.FlowSummaryDataCoder());
        coderRegistry.registerCoderForClass(CompoundKey.class, new CompoundKey.CompoundKeyCoder());
        coderRegistry.registerCoderForClass(Aggregate.class, new Aggregate.AggregateCoder());
    }

    public static TimestampPolicyFactory<String, FlowDocument> getKafkaInputTimestampPolicyFactory(org.joda.time.Duration duration) {
        return (topicPartition, optional) -> {
            return new FlowTimestampPolicy(duration, optional);
        };
    }

    private static ParDo.SingleOutput<FlowSummaryData, String> toJson() {
        return ParDo.of(new DoFn<FlowSummaryData, String>() { // from class: org.opennms.nephron.Pipeline.1
            @DoFn.ProcessElement
            public void processElement(DoFn<FlowSummaryData, String>.ProcessContext processContext) throws JsonProcessingException {
                processContext.output(Pipeline.MAPPER.writeValueAsString(Pipeline.toFlowSummary((FlowSummaryData) processContext.element())));
            }
        });
    }

    public static ParDo.SingleOutput<FlowDocument, FlowDocument> attachTimestamps(final org.joda.time.Duration duration, final org.joda.time.Duration duration2) {
        return ParDo.of(new DoFn<FlowDocument, FlowDocument>() { // from class: org.opennms.nephron.Pipeline.2
            final long windowSizeMs;
            final long maxFlowDurationMs;

            {
                this.windowSizeMs = duration.getMillis();
                this.maxFlowDurationMs = duration2.getMillis();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<FlowDocument, FlowDocument>.ProcessContext processContext) {
                FlowDocument flowDocument = (FlowDocument) processContext.element();
                long value = flowDocument.getDeltaSwitched().getValue();
                long value2 = flowDocument.getLastSwitched().getValue();
                int nodeId = flowDocument.getExporterNode().getNodeId();
                long windowNumber = (UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, value2) - UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, value)) + 1;
                long j = value;
                long j2 = 0;
                while (true) {
                    long j3 = j2;
                    if (j3 >= windowNumber) {
                        return;
                    }
                    if (j <= processContext.timestamp().getMillis() - this.maxFlowDurationMs) {
                        Pipeline.RATE_LIMITED_LOG.warn("Skipping output for flow w/ start: {}, end: {}, target timestamp: {}, current input timestamp: {}. Full flow: {}", new Object[]{Instant.ofEpochMilli(value), Instant.ofEpochMilli(value2), Instant.ofEpochMilli(j), processContext.timestamp(), flowDocument});
                    } else {
                        processContext.outputWithTimestamp(flowDocument, Instant.ofEpochMilli(j));
                    }
                    j = j + this.windowSizeMs < value2 ? j + this.windowSizeMs : value2;
                    j2 = j3 + 1;
                }
            }

            public org.joda.time.Duration getAllowedTimestampSkew() {
                return duration2;
            }
        });
    }

    public static FlowSummaryData toFlowSummaryData(AggregationType aggregationType, IntervalWindow intervalWindow, KV<CompoundKey, Aggregate> kv, int i) {
        return new FlowSummaryData(aggregationType, (CompoundKey) kv.getKey(), (Aggregate) kv.getValue(), intervalWindow.start().getMillis(), intervalWindow.end().getMillis(), i);
    }

    public static FlowSummary toFlowSummary(FlowSummaryData flowSummaryData) {
        FlowSummary flowSummary = new FlowSummary();
        flowSummaryData.key.populate(flowSummary);
        flowSummary.setAggregationType(flowSummaryData.aggregationType);
        flowSummary.setRangeStartMs(flowSummaryData.windowStart);
        flowSummary.setRangeEndMs(flowSummaryData.windowEnd);
        flowSummary.setTimestamp(flowSummary.getRangeEndMs());
        flowSummary.setBytesEgress(Long.valueOf(flowSummaryData.aggregate.getBytesOut()));
        flowSummary.setBytesIngress(Long.valueOf(flowSummaryData.aggregate.getBytesIn()));
        flowSummary.setBytesTotal(Long.valueOf(flowSummary.getBytesIngress().longValue() + flowSummary.getBytesEgress().longValue()));
        flowSummary.setCongestionEncountered(Boolean.valueOf(flowSummaryData.aggregate.isCongestionEncountered()));
        flowSummary.setNonEcnCapableTransport(Boolean.valueOf(flowSummaryData.aggregate.isNonEcnCapableTransport()));
        flowSummary.setHostName(flowSummaryData.aggregate.getHostname());
        flowSummary.setRanking(flowSummaryData.ranking);
        return flowSummary;
    }

    public static Window<FlowDocument> toWindow(org.joda.time.Duration duration, org.joda.time.Duration duration2, org.joda.time.Duration duration3, org.joda.time.Duration duration4) {
        AfterWatermark.AfterWatermarkEarlyAndLate withLateFirings = AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(duration3));
        if (duration2 != null && !duration2.isEqual(org.joda.time.Duration.ZERO)) {
            withLateFirings = withLateFirings.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(duration2));
        }
        return Window.into(UnalignedFixedWindows.of(duration)).withTimestampCombiner(TimestampCombiner.END_OF_WINDOW).triggering(withLateFirings).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY).withAllowedLateness(duration4).discardingFiredPanes();
    }

    public static TotalAndSummary aggregateParentTotal(String str, PCollection<KV<CompoundKey, Aggregate>> pCollection) {
        PCollection apply = pCollection.apply(str + "group_by_outer_key", ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>() { // from class: org.opennms.nephron.Pipeline.3
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>.ProcessContext processContext) {
                KV kv = (KV) processContext.element();
                processContext.output(KV.of(((CompoundKey) kv.getKey()).getOuterKey(), kv.getValue()));
            }
        })).apply(str + "sum_bytes_by_key", Combine.perKey(new SumBytes()));
        return new TotalAndSummary(apply, apply.apply(str + "total_summary", ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, FlowSummaryData>() { // from class: org.opennms.nephron.Pipeline.4
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<CompoundKey, Aggregate>, FlowSummaryData>.ProcessContext processContext, IntervalWindow intervalWindow) {
                processContext.output(Pipeline.toFlowSummaryData(AggregationType.TOTAL, intervalWindow, (KV) processContext.element(), 0));
            }
        })));
    }

    public static SumsAndTopKs aggregateSumsAndTopKs(String str, PCollection<FlowDocument> pCollection, CompoundKeyType compoundKeyType, final CompoundKeyType compoundKeyType2, int i) {
        SumAndTopK aggregateSumAndTopK = aggregateSumAndTopK(str + "with_tos_", pCollection.apply(str + "group_with_tos", ParDo.of(new KeyFlowBy(compoundKeyType))), i);
        return new SumsAndTopKs(aggregateSumAndTopK, aggregateSumAndTopK(str + "without_tos_", aggregateSumAndTopK.sum.apply(str + "group_without_tos_", ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>() { // from class: org.opennms.nephron.Pipeline.5
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>.ProcessContext processContext) {
                KV kv = (KV) processContext.element();
                processContext.output(KV.of(((CompoundKey) kv.getKey()).project(CompoundKeyType.this), kv.getValue()));
            }
        })), i));
    }

    public static SumAndTopK aggregateSumAndTopK(String str, PCollection<KV<CompoundKey, Aggregate>> pCollection, int i) {
        PCollection apply = pCollection.apply(str + "sum_bytes_by_key", Combine.perKey(new SumBytes()));
        return new SumAndTopK(apply, apply.apply(str + "group_by_outer_key", ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, KV<CompoundKey, Aggregate>>>() { // from class: org.opennms.nephron.Pipeline.6
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, KV<CompoundKey, Aggregate>>>.ProcessContext processContext) {
                KV kv = (KV) processContext.element();
                processContext.output(KV.of(((CompoundKey) kv.getKey()).getOuterKey(), kv));
            }
        })).apply(str + "top_k_per_key", Top.perKey(i, new FlowBytesValueComparator())).apply(str + "flatten", Values.create()).apply(str + "top_k_summary", ParDo.of(new DoFn<List<KV<CompoundKey, Aggregate>>, FlowSummaryData>() { // from class: org.opennms.nephron.Pipeline.7
            @DoFn.ProcessElement
            public void processElement(DoFn<List<KV<CompoundKey, Aggregate>>, FlowSummaryData>.ProcessContext processContext, IntervalWindow intervalWindow) {
                int i2 = 1;
                Iterator it = ((List) processContext.element()).iterator();
                while (it.hasNext()) {
                    int i3 = i2;
                    i2++;
                    processContext.output(Pipeline.toFlowSummaryData(AggregationType.TOPK, intervalWindow, (KV) it.next(), i3));
                }
            }
        })));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1390575061:
                if (implMethodName.equals("lambda$getKafkaInputTimestampPolicyFactory$6934a24b$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/kafka/TimestampPolicyFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("createTimestampPolicy") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/kafka/common/TopicPartition;Ljava/util/Optional;)Lorg/apache/beam/sdk/io/kafka/TimestampPolicy;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Duration;Lorg/apache/kafka/common/TopicPartition;Ljava/util/Optional;)Lorg/apache/beam/sdk/io/kafka/TimestampPolicy;")) {
                    org.joda.time.Duration duration = (org.joda.time.Duration) serializedLambda.getCapturedArg(0);
                    return (topicPartition, optional) -> {
                        return new FlowTimestampPolicy(duration, optional);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static /* synthetic */ ParDo.SingleOutput access$400() {
        return toJson();
    }
}
