package org.apache.ignite.internal.processors.cache.query.continuous;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.CacheQueryEntryEvent;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.systemview.view.ContinuousQueryView;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest.class */
public class CacheContinuousQueryBufferLimitTest extends GridCommonAbstractTest {
    private static final int PARTS = 1;
    private static final int TOTAL_KEYS = 1024;
    private static final long OVERFLOW_KEYS_COUNT = CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE * 3;
    private static final CacheEntryEventSerializableFilter<Integer, Integer> RMT_FILTER = cacheEntryEvent -> {
        return true;
    };
    private final AtomicInteger msgCntr = new AtomicInteger();

    @Parameterized.Parameter(0)
    public CacheMode cacheMode;

    @Parameterized.Parameter(1)
    public CacheAtomicityMode atomicityMode;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBufferLimitTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$ignite$cache$CacheAtomicityMode = new int[CacheAtomicityMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$ignite$cache$CacheAtomicityMode[CacheAtomicityMode.ATOMIC.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$ignite$cache$CacheAtomicityMode[CacheAtomicityMode.TRANSACTIONAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Parameterized.Parameters(name = "cacheMode={0}, atomicityMode={1}")
    public static Collection<?> parameters() {
        return Arrays.asList(new Object[]{CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC}, new Object[]{CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC}, new Object[]{CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL}, new Object[]{CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL});
    }

    @Test
    public void testContinuousQueryBatchSwitchOnAck() throws Exception {
        doTestContinuousQueryPendingBufferLimit((clusterNode, message) -> {
            return cachePutOperationRequestMessage(message) && this.msgCntr.getAndIncrement() == 10;
        }, CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE / 10);
    }

    @Test
    public void testContinuousQueryPendingBufferLimit() throws Exception {
        doTestContinuousQueryPendingBufferLimit((clusterNode, message) -> {
            return (cachePutOperationRequestMessage(message) && this.msgCntr.getAndIncrement() == 10) || (message instanceof CacheContinuousQueryBatchAck);
        }, (int) (CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE * 1.2d));
    }

    @Test
    public void testPendingSendToClientOnLimitReached() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        IgniteEx startGrids = startGrids(2);
        IgniteCache cache = startClientGrid().cache("default");
        CacheEntryEventSerializableFilter cacheEntryEventSerializableFilter = cacheEntryEvent -> {
            return ((Integer) cacheEntryEvent.getKey()).intValue() % 2 == 0;
        };
        ContinuousQuery continuousQuery = new ContinuousQuery();
        continuousQuery.setRemoteFilterFactory(FactoryBuilder.factoryOf(cacheEntryEventSerializableFilter));
        continuousQuery.setLocalListener(iterable -> {
            iterable.forEach(cacheEntryEvent2 -> {
                if (cacheEntryEventSerializableFilter.evaluate(cacheEntryEvent2)) {
                    return;
                }
                atomicReference.compareAndSet(null, "Key must be filtered [e=" + cacheEntryEvent2 + ']');
            });
        });
        continuousQuery.setLocal(false);
        TestRecordingCommunicationSpi.spi(startGrids).blockMessages((clusterNode, message) -> {
            return (cachePutOperationRequestMessage(message) && this.msgCntr.getAndIncrement() == 7) || (message instanceof CacheContinuousQueryBatchAck);
        });
        IgniteInternalFuture<Long> igniteInternalFuture = null;
        try {
            QueryCursor query = cache.query(continuousQuery);
            Throwable th = null;
            try {
                try {
                    awaitPartitionMapExchange();
                    igniteInternalFuture = GridTestUtils.runMultiThreadedAsync(() -> {
                        while (!Thread.currentThread().isInterrupted()) {
                            cache.put(Integer.valueOf(atomicInteger.incrementAndGet()), 0);
                        }
                    }, 6, "cq-put-");
                    assertTrue("Number of keys to put must reach the limit [keys=" + atomicInteger.get() + ", limit=" + OVERFLOW_KEYS_COUNT + ']', GridTestUtils.waitForCondition(() -> {
                        return ((long) atomicInteger.get()) > OVERFLOW_KEYS_COUNT;
                    }, AbstractPerformanceStatisticsTest.TIMEOUT));
                    if (query != null) {
                        if (0 != 0) {
                            try {
                                query.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            query.close();
                        }
                    }
                    TestRecordingCommunicationSpi.stopBlockAll();
                    if (igniteInternalFuture != null) {
                        igniteInternalFuture.cancel();
                    }
                    if (atomicReference.get() != null) {
                        throw new Exception((String) atomicReference.get());
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            TestRecordingCommunicationSpi.stopBlockAll();
            if (igniteInternalFuture != null) {
                igniteInternalFuture.cancel();
            }
            throw th3;
        }
    }

    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        return super.getConfiguration(str).setCommunicationSpi(new TestRecordingCommunicationSpi()).setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration("default").setAtomicityMode(this.atomicityMode).setCacheMode(this.cacheMode).setBackups(1).setAffinity(new RendezvousAffinityFunction(false, 1))});
    }

    @Before
    public void resetMessageCounter() {
        this.msgCntr.set(0);
    }

    @After
    public void stopAllInstances() {
        stopAllGrids();
    }

    private void doTestContinuousQueryPendingBufferLimit(IgniteBiPredicate<ClusterNode, Message> igniteBiPredicate, int i) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        IgniteEx startGrid = startGrid(0);
        IgniteEx startGrid2 = startGrid(1);
        IgniteCache cache = startGrid.cache("default");
        CacheConfiguration configuration = cache.getConfiguration(CacheConfiguration.class);
        for (int i2 = 0; i2 < 1024; i2++) {
            cache.put(Integer.valueOf(i2), Integer.valueOf(i2));
        }
        assertEquals(1, configuration.getAffinity().partitions());
        GridAtomicLong gridAtomicLong = new GridAtomicLong();
        ContinuousQuery continuousQuery = new ContinuousQuery();
        continuousQuery.setRemoteFilterFactory(FactoryBuilder.factoryOf(RMT_FILTER));
        continuousQuery.setLocalListener(iterable -> {
            iterable.forEach(cacheEntryEvent -> {
                gridAtomicLong.setIfGreater(((CacheQueryEntryEvent) cacheEntryEvent).getPartitionUpdateCounter());
            });
        });
        continuousQuery.setLocal(false);
        IgniteInternalFuture<Long> igniteInternalFuture = null;
        try {
            QueryCursor query = startGrid.cache("default").query(continuousQuery);
            Throwable th = null;
            try {
                try {
                    awaitPartitionMapExchange();
                    ConcurrentMap<Long, CacheContinuousQueryEntry> continuousQueryPendingBuffer = getContinuousQueryPendingBuffer(startGrid2, CU.cacheId("default"), 0);
                    TestRecordingCommunicationSpi.spi(startGrid).blockMessages(igniteBiPredicate);
                    igniteInternalFuture = GridTestUtils.runMultiThreadedAsync(() -> {
                        while (atomicInteger.get() <= OVERFLOW_KEYS_COUNT) {
                            cache.put(Integer.valueOf(atomicInteger.incrementAndGet()), 0);
                        }
                    }, 3, "cq-put-");
                    assertNotNull("Partition remote buffers must be inited", continuousQueryPendingBuffer);
                    log.warning("Waiting for pending buffer being overflowed within " + OVERFLOW_KEYS_COUNT + " number of keys.");
                    assertFalse("Pending buffer exceeded the limit despite entries have been acked [lastAcked=" + gridAtomicLong + ", pending=" + S.compact(continuousQueryPendingBuffer.keySet(), l -> {
                        return Long.valueOf(l.longValue() + 1);
                    }) + ']', GridTestUtils.waitForCondition(() -> {
                        return continuousQueryPendingBuffer.size() > i;
                    }, () -> {
                        return ((long) atomicInteger.get()) <= OVERFLOW_KEYS_COUNT;
                    }));
                    if (query != null) {
                        if (0 != 0) {
                            try {
                                query.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            query.close();
                        }
                    }
                    TestRecordingCommunicationSpi.spi(startGrid).stopBlock();
                    if (igniteInternalFuture != null) {
                        igniteInternalFuture.cancel();
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            TestRecordingCommunicationSpi.spi(startGrid).stopBlock();
            if (igniteInternalFuture != null) {
                igniteInternalFuture.cancel();
            }
            throw th4;
        }
    }

    private boolean cachePutOperationRequestMessage(Message message) {
        switch (AnonymousClass1.$SwitchMap$org$apache$ignite$cache$CacheAtomicityMode[this.atomicityMode.ordinal()]) {
            case 1:
                return message instanceof GridDhtAtomicUpdateRequest;
            case 2:
                return message instanceof GridDhtTxPrepareRequest;
            default:
                throw new IgniteException("Unsupported atomicity mode: " + this.atomicityMode);
        }
    }

    private static <K, V> CacheContinuousQueryHandler<K, V> getRemoteContinuousQueryHandler(IgniteEx igniteEx, UUID uuid) {
        ConcurrentMap concurrentMap = (ConcurrentMap) GridTestUtils.getFieldValue(igniteEx.context().continuous(), GridContinuousProcessor.class, "rmtInfos");
        if (concurrentMap.get(uuid) == null) {
            return null;
        }
        return ((GridContinuousProcessor.RemoteRoutineInfo) concurrentMap.get(uuid)).handler();
    }

    private static ConcurrentMap<Long, CacheContinuousQueryEntry> getContinuousQueryPendingBuffer(IgniteEx igniteEx, int i, int i2) {
        SystemView view = igniteEx.context().systemView().view(GridContinuousProcessor.CQ_SYS_VIEW);
        assertEquals(1, view.size());
        return (ConcurrentMap) GridTestUtils.getFieldValue(getRemoteContinuousQueryHandler(igniteEx, ((ContinuousQueryView) view.iterator().next()).routineId()).partitionBuffer(igniteEx.context().cache().context().cacheContext(i), i2), CacheContinuousQueryEventBuffer.class, "pending");
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2065031899:
                if (implMethodName.equals("lambda$testPendingSendToClientOnLimitReached$3c60aaa7$1")) {
                    z = 4;
                    break;
                }
                break;
            case -223179573:
                if (implMethodName.equals("lambda$testContinuousQueryBatchSwitchOnAck$3c60aaa7$1")) {
                    z = true;
                    break;
                }
                break;
            case 70435580:
                if (implMethodName.equals("lambda$testPendingSendToClientOnLimitReached$d4667bea$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1111108802:
                if (implMethodName.equals("lambda$static$90ebc8a4$1")) {
                    z = false;
                    break;
                }
                break;
            case 1269431491:
                if (implMethodName.equals("lambda$testContinuousQueryPendingBufferLimit$3c60aaa7$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/cache/CacheEntryEventSerializableFilter") && serializedLambda.getFunctionalInterfaceMethodName().equals("evaluate") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljavax/cache/event/CacheEntryEvent;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest") && serializedLambda.getImplMethodSignature().equals("(Ljavax/cache/event/CacheEntryEvent;)Z")) {
                    return cacheEntryEvent -> {
                        return true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteBiPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/ignite/cluster/ClusterNode;Lorg/apache/ignite/plugin/extensions/communication/Message;)Z")) {
                    CacheContinuousQueryBufferLimitTest cacheContinuousQueryBufferLimitTest = (CacheContinuousQueryBufferLimitTest) serializedLambda.getCapturedArg(0);
                    return (clusterNode, message) -> {
                        return cachePutOperationRequestMessage(message) && this.msgCntr.getAndIncrement() == 10;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteBiPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/ignite/cluster/ClusterNode;Lorg/apache/ignite/plugin/extensions/communication/Message;)Z")) {
                    CacheContinuousQueryBufferLimitTest cacheContinuousQueryBufferLimitTest2 = (CacheContinuousQueryBufferLimitTest) serializedLambda.getCapturedArg(0);
                    return (clusterNode2, message2) -> {
                        return (cachePutOperationRequestMessage(message2) && this.msgCntr.getAndIncrement() == 10) || (message2 instanceof CacheContinuousQueryBatchAck);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/cache/CacheEntryEventSerializableFilter") && serializedLambda.getFunctionalInterfaceMethodName().equals("evaluate") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljavax/cache/event/CacheEntryEvent;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest") && serializedLambda.getImplMethodSignature().equals("(Ljavax/cache/event/CacheEntryEvent;)Z")) {
                    return cacheEntryEvent2 -> {
                        return ((Integer) cacheEntryEvent2.getKey()).intValue() % 2 == 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteBiPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBufferLimitTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/ignite/cluster/ClusterNode;Lorg/apache/ignite/plugin/extensions/communication/Message;)Z")) {
                    CacheContinuousQueryBufferLimitTest cacheContinuousQueryBufferLimitTest3 = (CacheContinuousQueryBufferLimitTest) serializedLambda.getCapturedArg(0);
                    return (clusterNode3, message3) -> {
                        return (cachePutOperationRequestMessage(message3) && this.msgCntr.getAndIncrement() == 7) || (message3 instanceof CacheContinuousQueryBatchAck);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
