package org.apache.ignite.internal.processors.cache.query.continuous;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.EventType;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteAsyncCallback;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.class */
public class CacheContinuousWithTransformerReplicatedSelfTest extends GridCommonAbstractTest {
    private static final int DFLT_ENTRY_CNT = 10;
    private static final int DFLT_LATCH_TIMEOUT = 30000;
    private static final int DFLT_SERVER_NODE_CNT = 1;
    private static final String SARAH_CONNOR = "Sarah Connor";
    private static final String JOHN_CONNOR = "John Connor";
    private static final boolean ADD_EVT_FILTER = true;
    private static final boolean SKIP_EVT_FILTER = false;
    private static final boolean KEEP_BINARY = true;
    private static final boolean SKIP_KEEP_BINARY = false;
    private static final long LATCH_TIMEOUT = 10000;
    protected boolean client = false;

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest$Employee.class */
    public class Employee {
        public String name;
        public Integer salary;

        Employee(String str, Integer num) {
            this.name = str;
            this.salary = num;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest$LocalEventListener.class */
    public static class LocalEventListener implements ContinuousQueryWithTransformer.EventListener<String> {
        private final AtomicInteger cnt;
        private final CountDownLatch cntLatch;

        LocalEventListener(AtomicInteger atomicInteger, CountDownLatch countDownLatch) {
            this.cnt = atomicInteger;
            this.cntLatch = countDownLatch;
        }

        public void onUpdated(Iterable<? extends String> iterable) throws CacheEntryListenerException {
            for (String str : iterable) {
                this.cnt.incrementAndGet();
                if (str.startsWith(CacheContinuousWithTransformerReplicatedSelfTest.SARAH_CONNOR)) {
                    this.cntLatch.countDown();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @IgniteAsyncCallback
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest$LocalEventListenerAsync.class */
    public static class LocalEventListenerAsync extends LocalEventListener {
        LocalEventListenerAsync(AtomicInteger atomicInteger, CountDownLatch countDownLatch) {
            super(atomicInteger, countDownLatch);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest$RemoteCacheEntryEventFilter.class */
    public static class RemoteCacheEntryEventFilter implements CacheEntryEventSerializableFilter<Integer, Object> {
        private RemoteCacheEntryEventFilter() {
        }

        public boolean evaluate(CacheEntryEvent<? extends Integer, ?> cacheEntryEvent) throws CacheEntryListenerException {
            return ((Integer) cacheEntryEvent.getKey()).intValue() % 2 == 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest$RemoteTransformer.class */
    public static class RemoteTransformer implements IgniteClosure<CacheEntryEvent<?, ?>, String> {
        private boolean keepBinary;

        RemoteTransformer(boolean z) {
            this.keepBinary = z;
        }

        public String apply(CacheEntryEvent<?, ?> cacheEntryEvent) {
            return this.keepBinary ? (String) ((BinaryObject) cacheEntryEvent.getValue()).field("name") : ((Employee) cacheEntryEvent.getValue()).name;
        }
    }

    protected CacheMode cacheMode() {
        return CacheMode.REPLICATED;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        if (this.client) {
            configuration.setClientMode(true);
        } else {
            CacheConfiguration cacheConfiguration = new CacheConfiguration("default");
            cacheConfiguration.setCacheMode(cacheMode());
            cacheConfiguration.setAtomicityMode(atomicityMode());
            configuration.setCacheConfiguration(new CacheConfiguration[]{cacheConfiguration});
        }
        return configuration;
    }

    protected CacheAtomicityMode atomicityMode() {
        return CacheAtomicityMode.TRANSACTIONAL;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        gridToRunQuery().cache("default").removeAll();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void beforeTestsStarted() throws Exception {
        startGrids(1);
        super.beforeTestsStarted();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTestsStopped() throws Exception {
        stopAllGrids(true);
    }

    protected Ignite gridToRunQuery() throws Exception {
        return grid(0);
    }

    @Test
    public void testContinuousWithTransformer() throws Exception {
        runContinuousQueryWithTransformer(false, 10, false, false);
    }

    @Test
    public void testContinuousWithTransformerAsync() throws Exception {
        runContinuousQueryWithTransformer(false, 10, false, true);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListener() throws Exception {
        runContinuousQueryWithTransformer(false, 10, false, false);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListenerAsync() throws Exception {
        runContinuousQueryWithTransformer(false, 10, false, true);
    }

    @Test
    public void testContinuousWithTransformerWithFilter() throws Exception {
        runContinuousQueryWithTransformer(true, 5, false, false);
    }

    @Test
    public void testContinuousWithTransformerWithFilterAsync() throws Exception {
        runContinuousQueryWithTransformer(true, 5, false, true);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListenerWithFilter() throws Exception {
        runContinuousQueryWithTransformer(true, 5, true, false);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListenerWithFilterAsync() throws Exception {
        runContinuousQueryWithTransformer(true, 5, true, true);
    }

    @Test
    public void testContinuousWithTransformerKeepBinary() throws Exception {
        runContinuousQueryWithTransformer(false, 10, true, false);
    }

    @Test
    public void testContinuousWithTransformerKeepBinaryAsync() throws Exception {
        runContinuousQueryWithTransformer(false, 10, true, true);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListenerKeepBinary() throws Exception {
        runContinuousQueryWithTransformer(false, 10, true, false);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListenerKeepBinaryAsync() throws Exception {
        runContinuousQueryWithTransformer(false, 10, true, true);
    }

    @Test
    public void testContinuousWithTransformerWithFilterKeepBinary() throws Exception {
        runContinuousQueryWithTransformer(true, 5, true, false);
    }

    @Test
    public void testContinuousWithTransformerWithFilterKeepBinaryAsync() throws Exception {
        runContinuousQueryWithTransformer(true, 5, true, true);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListenerWithFilterKeepBinary() throws Exception {
        runContinuousQueryWithTransformer(true, 5, true, false);
    }

    @Test
    public void testContinuousWithTransformerAndRegularListenerWithFilterKeepBinaryAsync() throws Exception {
        runContinuousQueryWithTransformer(true, 5, true, true);
    }

    @Test
    public void testTransformerReturnNull() throws Exception {
        IgniteCache cache = gridToRunQuery().cache("default");
        ContinuousQueryWithTransformer continuousQueryWithTransformer = new ContinuousQueryWithTransformer();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        continuousQueryWithTransformer.setLocalListener(new ContinuousQueryWithTransformer.EventListener() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest.1
            public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
                Iterator it = iterable.iterator();
                while (it.hasNext()) {
                    CacheContinuousWithTransformerReplicatedSelfTest.assertNull(it.next());
                    atomicInteger.incrementAndGet();
                }
            }
        });
        continuousQueryWithTransformer.setRemoteTransformerFactory(FactoryBuilder.factoryOf(new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, String>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest.2
            public String apply(CacheEntryEvent<? extends Integer, ? extends Employee> cacheEntryEvent) {
                return null;
            }
        }));
        continuousQueryWithTransformer.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Employee>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest.3
            public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Employee> cacheEntryEvent) {
                return true;
            }
        }));
        QueryCursor query = cache.query(continuousQueryWithTransformer);
        Throwable th = null;
        for (int i = 0; i < 10; i++) {
            try {
                try {
                    cache.put(Integer.valueOf(i), new Employee("John Connor", Integer.valueOf(i)));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (query != null) {
                    if (th != null) {
                        try {
                            query.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        query.close();
                    }
                }
                throw th3;
            }
        }
        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest.4
            public boolean apply() {
                return atomicInteger.get() == 10;
            }
        }, 20000L));
        if (query != null) {
            if (0 == 0) {
                query.close();
                return;
            }
            try {
                query.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    @Test
    public void testExpired() throws Exception {
        IgniteCache withExpiryPolicy = gridToRunQuery().cache("default").withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 100L)));
        final GridConcurrentHashSet gridConcurrentHashSet = new GridConcurrentHashSet();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        ContinuousQueryWithTransformer continuousQueryWithTransformer = new ContinuousQueryWithTransformer();
        continuousQueryWithTransformer.setIncludeExpired(true);
        continuousQueryWithTransformer.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Employee>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest.5
            public boolean evaluate(CacheEntryEvent cacheEntryEvent) throws CacheEntryListenerException {
                return cacheEntryEvent.getEventType() == EventType.EXPIRED;
            }
        }));
        continuousQueryWithTransformer.setRemoteTransformerFactory(FactoryBuilder.factoryOf(new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, Integer>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest.6
            public Integer apply(CacheEntryEvent<? extends Integer, ? extends Employee> cacheEntryEvent) {
                CacheContinuousWithTransformerReplicatedSelfTest.assertNotNull(cacheEntryEvent.getValue());
                CacheContinuousWithTransformerReplicatedSelfTest.assertNotNull(cacheEntryEvent.getOldValue());
                return (Integer) cacheEntryEvent.getKey();
            }
        }));
        continuousQueryWithTransformer.setLocalListener(new ContinuousQueryWithTransformer.EventListener<Integer>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest.7
            public void onUpdated(Iterable<? extends Integer> iterable) {
                Iterator<? extends Integer> it = iterable.iterator();
                while (it.hasNext()) {
                    gridConcurrentHashSet.add(it.next());
                    countDownLatch.countDown();
                }
            }
        });
        QueryCursor query = withExpiryPolicy.query(continuousQueryWithTransformer);
        Throwable th = null;
        try {
            withExpiryPolicy.put(1, new Employee(SARAH_CONNOR, 42));
            withExpiryPolicy.put(2, new Employee("John Connor", 42));
            countDownLatch.await(10000L, TimeUnit.MILLISECONDS);
            assertEquals(2, gridConcurrentHashSet.size());
            assertTrue(gridConcurrentHashSet.contains(1));
            assertTrue(gridConcurrentHashSet.contains(2));
            if (query != null) {
                if (0 == 0) {
                    query.close();
                    return;
                }
                try {
                    query.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (query != null) {
                if (0 != 0) {
                    try {
                        query.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    query.close();
                }
            }
            throw th3;
        }
    }

    private void runContinuousQueryWithTransformer(boolean z, int i, boolean z2, boolean z3) throws Exception {
        IgniteCache<Integer, Employee> cache = gridToRunQuery().cache("default");
        if (z2) {
            cache = cache.withKeepBinary();
        }
        populateData(cache, "John Connor");
        CountDownLatch countDownLatch = new CountDownLatch(i);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ContinuousQueryWithTransformer.EventListener localEventListenerAsync = z3 ? new LocalEventListenerAsync(atomicInteger, countDownLatch) : new LocalEventListener(atomicInteger, countDownLatch);
        Factory factoryOf = z ? FactoryBuilder.factoryOf(new RemoteCacheEntryEventFilter()) : null;
        Factory factoryOf2 = FactoryBuilder.factoryOf(new RemoteTransformer(z2));
        ContinuousQueryWithTransformer continuousQueryWithTransformer = new ContinuousQueryWithTransformer();
        continuousQueryWithTransformer.setInitialQuery(new ScanQuery());
        continuousQueryWithTransformer.setRemoteFilterFactory(factoryOf);
        continuousQueryWithTransformer.setRemoteTransformerFactory(factoryOf2);
        continuousQueryWithTransformer.setLocalListener(localEventListenerAsync);
        QueryCursor<Cache.Entry> query = cache.query(continuousQueryWithTransformer);
        Throwable th = null;
        try {
            try {
                for (Cache.Entry entry : query) {
                    assertNotNull(entry);
                    if (z2) {
                        assertTrue(((BinaryObject) entry.getValue()).field("name").toString().startsWith("John Connor"));
                    } else {
                        assertTrue(((Employee) entry.getValue()).name.startsWith("John Connor"));
                    }
                }
                populateData(cache, SARAH_CONNOR);
                assertTrue("Receive all expected events", countDownLatch.await(AbstractPerformanceStatisticsTest.TIMEOUT, TimeUnit.MILLISECONDS));
                assertEquals("Count of updated records equal to expected", i, atomicInteger.get());
                if (query != null) {
                    if (0 == 0) {
                        query.close();
                        return;
                    }
                    try {
                        query.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (query != null) {
                if (th != null) {
                    try {
                        query.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    query.close();
                }
            }
            throw th4;
        }
    }

    private void populateData(IgniteCache<Integer, Employee> igniteCache, String str) {
        for (int i = 0; i < 10; i++) {
            igniteCache.put(Integer.valueOf(i), new Employee(str + i, Integer.valueOf(42 * i)));
        }
    }
}
