package org.apache.atlas.pc;

import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.RandomUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/atlas/pc/StatusReporterTest.class */
public class StatusReporterTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/atlas/pc/StatusReporterTest$IntegerConsumer.class */
    public static class IntegerConsumer extends WorkItemConsumer<Integer> {
        private static ThreadLocal<Integer> payload = new ThreadLocal<>();
        private Integer current;

        public IntegerConsumer(BlockingQueue<Integer> blockingQueue) {
            super(blockingQueue);
        }

        protected void doCommit() {
            addResult(this.current);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void processItem(Integer num) {
            try {
                this.current = num;
                Thread.sleep(20 + RandomUtils.nextInt(5, 7));
                super.commit();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /* loaded from: input_file:org/apache/atlas/pc/StatusReporterTest$IntegerConsumerBuilder.class */
    private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> {
        private IntegerConsumerBuilder() {
        }

        public IntegerConsumer build(BlockingQueue<Integer> blockingQueue) {
            return new IntegerConsumer(blockingQueue);
        }

        /* renamed from: build, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Runnable m8build(BlockingQueue blockingQueue) {
            return build((BlockingQueue<Integer>) blockingQueue);
        }
    }

    private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder integerConsumerBuilder, int i) {
        return new WorkItemManager<>(integerConsumerBuilder, "IntegerConsumer", 5, i, true);
    }

    @Test
    public void statusReporting() throws InterruptedException {
        WorkItemManager<Integer, WorkItemConsumer> workItemManger = getWorkItemManger(new IntegerConsumerBuilder(), 5);
        StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>();
        for (int i = 0; i < 50; i++) {
            workItemManger.produce(Integer.valueOf(i));
            statusReporter.produced(Integer.valueOf(i), Integer.valueOf(i));
            extractResults(workItemManger, statusReporter);
        }
        workItemManger.drain();
        extractResults(workItemManger, statusReporter);
        Assert.assertEquals(((Integer) statusReporter.ack()).intValue(), 49);
        workItemManger.shutdown();
        Assert.assertEquals(statusReporter.getProducedCount(), 0);
        Assert.assertEquals(statusReporter.getProcessedCount(), 0);
    }

    private void extractResults(WorkItemManager<Integer, WorkItemConsumer> workItemManager, StatusReporter<Integer, Integer> statusReporter) {
        while (true) {
            Object poll = workItemManager.getResults().poll();
            if (poll == null) {
                return;
            }
            if (poll != null && (poll instanceof Integer)) {
                statusReporter.processed((Integer) poll);
            }
        }
    }

    @Test
    public void reportWithTimeout() throws InterruptedException {
        StatusReporter statusReporter = new StatusReporter(2000L);
        statusReporter.produced(1, 100);
        statusReporter.produced(2, 200);
        statusReporter.processed(2);
        Assert.assertNull((Integer) statusReporter.ack());
        Thread.sleep(3000L);
        Integer num = (Integer) statusReporter.ack();
        Assert.assertNotNull(num);
        Assert.assertEquals(num, 200);
    }
}
