/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.perf;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.perf.SimpleBenchmark;

public class YahooBenchmark {
    private final SimpleBenchmark parent;
    private final String campaignsTopic;
    private final String eventsTopic;

    public YahooBenchmark(SimpleBenchmark parent, String campaignsTopic, String eventsTopic) {
        this.parent = parent;
        this.campaignsTopic = campaignsTopic;
        this.eventsTopic = eventsTopic;
    }

    private boolean maybeSetupPhaseCampaigns(String topic, String clientId, boolean skipIfAllTests, int numCampaigns, int adsPerCampaign, List<String> ads) {
        this.parent.resetStats();
        System.out.println("Initializing topic " + topic);
        Properties props = new Properties();
        props.put("bootstrap.servers", this.parent.props.get("bootstrap.servers"));
        props.put("client.id", clientId);
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        try (KafkaProducer producer = new KafkaProducer(props);){
            for (int c = 0; c < numCampaigns; ++c) {
                String campaignID = UUID.randomUUID().toString();
                for (int a = 0; a < adsPerCampaign; ++a) {
                    String adId = UUID.randomUUID().toString();
                    String concat = adId + ":" + campaignID;
                    producer.send(new ProducerRecord(topic, (Object)adId, (Object)concat));
                    ads.add(adId);
                    ++this.parent.processedRecords;
                    this.parent.processedBytes += (long)(concat.length() + adId.length());
                }
            }
        }
        return true;
    }

    private void maybeSetupPhaseEvents(String topic, String clientId, int numRecords, List<String> ads) {
        this.parent.resetStats();
        String[] eventTypes = new String[]{"view", "click", "purchase"};
        Random rand = new Random(System.currentTimeMillis());
        System.out.println("Initializing topic " + topic);
        Properties props = new Properties();
        props.put("bootstrap.servers", this.parent.props.get("bootstrap.servers"));
        props.put("client.id", clientId);
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        long startTime = System.currentTimeMillis();
        try (KafkaProducer producer = new KafkaProducer(props);){
            ProjectedEvent event = new ProjectedEvent();
            HashMap<String, Class<ProjectedEvent>> serdeProps = new HashMap<String, Class<ProjectedEvent>>();
            JsonPOJOSerializer projectedEventSerializer = new JsonPOJOSerializer();
            serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
            projectedEventSerializer.configure(serdeProps, false);
            for (int i = 0; i < numRecords; ++i) {
                event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
                event.adID = ads.get(rand.nextInt(ads.size() - 1));
                event.eventTime = System.currentTimeMillis();
                byte[] value = projectedEventSerializer.serialize(topic, event);
                producer.send(new ProducerRecord(topic, (Object)event.adID, (Object)value));
                ++this.parent.processedRecords;
                this.parent.processedBytes += (long)(value.length + event.adID.length());
            }
        }
        long endTime = System.currentTimeMillis();
        this.parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
    }

    public void run() {
        int numCampaigns = 100;
        int adsPerCampaign = 10;
        ArrayList<String> ads = new ArrayList<String>(1000);
        this.maybeSetupPhaseCampaigns(this.campaignsTopic, "simple-benchmark-produce-campaigns", false, 100, 10, ads);
        this.maybeSetupPhaseEvents(this.eventsTopic, "simple-benchmark-produce-events", this.parent.numRecords, ads);
        CountDownLatch latch = new CountDownLatch(1);
        this.parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
        this.parent.props.put(StreamsConfig.producerPrefix((String)"request.timeout.ms"), (Object)60000);
        KafkaStreams streams = this.createYahooBenchmarkStreams(this.parent.props, this.campaignsTopic, this.eventsTopic, latch, this.parent.numRecords);
        this.parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
    }

    private KafkaStreams createYahooBenchmarkStreams(Properties streamConfig, String campaignsTopic, String eventsTopic, final CountDownLatch latch, final int numRecords) {
        HashMap<String, Class<ProjectedEvent>> serdeProps = new HashMap<String, Class<ProjectedEvent>>();
        JsonPOJOSerializer projectedEventSerializer = new JsonPOJOSerializer();
        serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
        projectedEventSerializer.configure(serdeProps, false);
        JsonPOJODeserializer projectedEventDeserializer = new JsonPOJODeserializer();
        serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
        projectedEventDeserializer.configure(serdeProps, false);
        StreamsBuilder builder = new StreamsBuilder();
        KStream kEvents = builder.stream(eventsTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
        KTable kCampaigns = builder.table(campaignsTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream filteredEvents = kEvents.peek((ForeachAction)new ForeachAction<String, ProjectedEvent>(){

            public void apply(String key, ProjectedEvent value) {
                ++((YahooBenchmark)YahooBenchmark.this).parent.processedRecords;
                if (((YahooBenchmark)YahooBenchmark.this).parent.processedRecords % 1000000 == 0) {
                    System.out.println("Processed " + ((YahooBenchmark)YahooBenchmark.this).parent.processedRecords);
                }
                if (((YahooBenchmark)YahooBenchmark.this).parent.processedRecords >= numRecords) {
                    latch.countDown();
                }
            }
        }).filter((Predicate)new Predicate<String, ProjectedEvent>(){

            public boolean test(String key, ProjectedEvent value) {
                return value.eventType.equals("view");
            }
        }).mapValues((ValueMapper)new ValueMapper<ProjectedEvent, ProjectedEvent>(){

            public ProjectedEvent apply(ProjectedEvent value) {
                ProjectedEvent event = new ProjectedEvent();
                event.adID = value.adID;
                event.eventTime = value.eventTime;
                event.eventType = value.eventType;
                return event;
            }
        });
        KTable deserCampaigns = kCampaigns.mapValues((ValueMapper)new ValueMapper<String, CampaignAd>(){

            public CampaignAd apply(String value) {
                String[] parts = value.split(":");
                CampaignAd cAdd = new CampaignAd();
                cAdd.adID = parts[0];
                cAdd.campaignID = parts[1];
                return cAdd;
            }
        });
        KStream joined = filteredEvents.join(deserCampaigns, (ValueJoiner)new ValueJoiner<ProjectedEvent, CampaignAd, String>(){

            public String apply(ProjectedEvent value1, CampaignAd value2) {
                return value2.campaignID;
            }
        }, Joined.with((Serde)Serdes.String(), (Serde)Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), null));
        KStream keyedByCampaign = joined.selectKey((KeyValueMapper)new KeyValueMapper<String, String, String>(){

            public String apply(String key, String value) {
                return value;
            }
        });
        keyedByCampaign.groupByKey(Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((long)10000L)).count(Materialized.as((String)"time-windows"));
        return new KafkaStreams(builder.build(), streamConfig);
    }

    private class JsonPOJODeserializer<T>
    implements Deserializer<T> {
        private ObjectMapper objectMapper = new ObjectMapper();
        private Class<T> tClass;

        public void configure(Map<String, ?> props, boolean isKey) {
            this.tClass = (Class)props.get("JsonPOJOClass");
        }

        public T deserialize(String topic, byte[] bytes) {
            Object data;
            if (bytes == null) {
                return null;
            }
            try {
                data = this.objectMapper.readValue(bytes, this.tClass);
            }
            catch (Exception e) {
                throw new SerializationException((Throwable)e);
            }
            return (T)data;
        }

        public void close() {
        }
    }

    private class JsonPOJOSerializer<T>
    implements Serializer<T> {
        private final ObjectMapper objectMapper = new ObjectMapper();

        public void configure(Map<String, ?> props, boolean isKey) {
        }

        public byte[] serialize(String topic, T data) {
            if (data == null) {
                return null;
            }
            try {
                return this.objectMapper.writeValueAsBytes(data);
            }
            catch (Exception e) {
                throw new SerializationException("Error serializing JSON message", (Throwable)e);
            }
        }

        public void close() {
        }
    }

    static class CampaignAd {
        public String adID;
        public String campaignID;

        CampaignAd() {
        }
    }

    static class ProjectedEvent {
        public String eventType;
        public String adID;
        public long eventTime;
        public String userID = UUID.randomUUID().toString();
        public String pageID = UUID.randomUUID().toString();
        public String addType = "banner78";
        public String ipAddress = "1.2.3.4";

        ProjectedEvent() {
        }
    }
}

