package com.hazelcast.topic;

import com.hazelcast.config.Config;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/topic/TopicStressTest.class */
public class TopicStressTest extends HazelcastTestSupport {
    public static final int PUBLISH_THREAD_COUNT = 10;
    public static final int NODE_COUNT = 10;
    public static final int TOPIC_COUNT = 10;
    public static final int RUNNING_TIME_SECONDS = 600;
    public static final int MAX_PUBLISH_DELAY_MILLIS = 25;
    private HazelcastInstance[] instances;
    private CountDownLatch startLatch;
    private PublishThread[] publishThreads;
    private Map<String, List<MessageListenerImpl>> listenerMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/topic/TopicStressTest$MessageListenerImpl.class */
    public class MessageListenerImpl implements MessageListener<Integer> {
        private final AtomicLong counter;

        private MessageListenerImpl() {
            this.counter = new AtomicLong();
        }

        public void onMessage(Message<Integer> message) {
            this.counter.addAndGet(((Integer) message.getMessageObject()).intValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/topic/TopicStressTest$PublishThread.class */
    public class PublishThread extends Thread {
        private final Random random;
        private final Map<String, Long> messageCount;
        private final CountDownLatch startLatch;

        private PublishThread(CountDownLatch countDownLatch) {
            this.random = new Random();
            this.messageCount = new HashMap();
            this.startLatch = countDownLatch;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.startLatch.await();
                long endTimeMillis = getEndTimeMillis();
                while (System.currentTimeMillis() < endTimeMillis) {
                    ITopic<Integer> randomTopic = randomTopic(randomInstance());
                    int nextInt = this.random.nextInt(100);
                    randomTopic.publish(Integer.valueOf(nextInt));
                    updateCount(randomTopic, nextInt);
                    randomSleep();
                }
            } catch (Throwable th) {
                th.printStackTrace();
            }
        }

        private long getEndTimeMillis() {
            return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(600L);
        }

        private void updateCount(ITopic<Integer> iTopic, int i) {
            String name = iTopic.getName();
            Long l = this.messageCount.get(name);
            if (l == null) {
                l = 0L;
            }
            this.messageCount.put(name, Long.valueOf(l.longValue() + i));
        }

        private void randomSleep() {
            try {
                Thread.sleep(this.random.nextInt(25));
            } catch (InterruptedException e) {
            }
        }

        private ITopic<Integer> randomTopic(HazelcastInstance hazelcastInstance) {
            return hazelcastInstance.getTopic(TopicStressTest.this.getTopicName(this.random.nextInt(10)));
        }

        private HazelcastInstance randomInstance() {
            return TopicStressTest.this.instances[this.random.nextInt(TopicStressTest.this.instances.length)];
        }
    }

    @Before
    public void setUp() {
        TopicConfig topicConfig = new TopicConfig();
        topicConfig.setName("topic*");
        Config config = new Config();
        config.addTopicConfig(topicConfig);
        this.instances = createHazelcastInstanceFactory(10).newInstances(config);
        this.startLatch = new CountDownLatch(1);
        this.publishThreads = new PublishThread[10];
        for (int i = 0; i < this.publishThreads.length; i++) {
            PublishThread publishThread = new PublishThread(this.startLatch);
            publishThread.start();
            this.publishThreads[i] = publishThread;
        }
        this.listenerMap = new HashMap();
        for (int i2 = 0; i2 < 10; i2++) {
            String topicName = getTopicName(i2);
            this.listenerMap.put(topicName, registerTopicListeners(topicName, this.instances));
        }
    }

    @Test(timeout = 1200000)
    public void test() throws Exception {
        this.startLatch.countDown();
        System.out.printf("Test is going to run for %s seconds\n", Integer.valueOf(RUNNING_TIME_SECONDS));
        for (PublishThread publishThread : this.publishThreads) {
            publishThread.join();
        }
        System.out.println("All publish threads have completed");
        assertTrueEventually(new AssertTask() { // from class: com.hazelcast.topic.TopicStressTest.1
            @Override // com.hazelcast.test.AssertTask
            public void run() {
                for (int i = 0; i < 10; i++) {
                    String topicName = TopicStressTest.this.getTopicName(i);
                    Assert.assertEquals("Count for topic " + topicName + " is not the same", TopicStressTest.this.getExpectedCount(topicName), TopicStressTest.this.getActualCount(topicName));
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getExpectedCount(String str) {
        long j = 0;
        for (PublishThread publishThread : this.publishThreads) {
            Long l = (Long) publishThread.messageCount.get(str);
            if (l != null) {
                j += l.longValue();
            }
        }
        return j * 10;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getActualCount(String str) {
        long j = 0;
        List<MessageListenerImpl> list = this.listenerMap.get(str);
        if (list == null) {
            return 0L;
        }
        Iterator<MessageListenerImpl> it = list.iterator();
        while (it.hasNext()) {
            j += it.next().counter.get();
        }
        return j;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getTopicName(int i) {
        return "topic" + i;
    }

    private List<MessageListenerImpl> registerTopicListeners(String str, HazelcastInstance[] hazelcastInstanceArr) {
        LinkedList linkedList = new LinkedList();
        for (HazelcastInstance hazelcastInstance : hazelcastInstanceArr) {
            MessageListenerImpl messageListenerImpl = new MessageListenerImpl();
            hazelcastInstance.getTopic(str).addMessageListener(messageListenerImpl);
            linkedList.add(messageListenerImpl);
        }
        return linkedList;
    }
}
