/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Verticle;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.test.core.VertxTestBase;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;

public class MessageConsumerTest
extends VertxTestBase {
    @Test
    public void testMessageConsumptionStayOnWorkerThreadAfterResume() throws Exception {
        TestVerticle verticle = new TestVerticle(2);
        Future deployVerticle = this.vertx.deployVerticle((Verticle)verticle, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER));
        CountDownLatch startLatch = new CountDownLatch(1);
        deployVerticle.onComplete(this.onSuccess(cf -> startLatch.countDown()));
        this.awaitLatch(startLatch);
        this.vertx.eventBus().send("testAddress", (Object)"message1");
        this.vertx.eventBus().send("testAddress", (Object)"message2");
        this.awaitLatch(verticle.msgLatch);
        this.assertEquals(2L, verticle.messageArrivedOnWorkerThread.size());
        this.assertTrue("message1 should be processed on worker thread", (Boolean)verticle.messageArrivedOnWorkerThread.get("message1"));
        this.assertTrue("message2 should be processed on worker thread", (Boolean)verticle.messageArrivedOnWorkerThread.get("message2"));
    }

    private static class TestVerticle
    extends AbstractVerticle {
        private final CountDownLatch msgLatch;
        private final Map<String, Boolean> messageArrivedOnWorkerThread = new HashMap<String, Boolean>();

        private TestVerticle(Integer numberOfExpectedMessages) {
            this.msgLatch = new CountDownLatch(numberOfExpectedMessages);
        }

        public void start() {
            MessageConsumer consumer = this.vertx.eventBus().localConsumer("testAddress");
            this.handleMessages((MessageConsumer<String>)consumer);
        }

        private void handleMessages(MessageConsumer<String> consumer) {
            consumer.handler(msg -> {
                consumer.pause();
                this.messageArrivedOnWorkerThread.putIfAbsent((String)msg.body(), Context.isOnWorkerThread());
                this.msgLatch.countDown();
                this.vertx.setTimer(20L, id -> consumer.resume());
            });
        }
    }
}

