package org.apache.ignite.cdc;

import java.io.File;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cdc.AbstractCdcTest;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIOTest;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.internal.visor.cdc.VisorCdcDeleteLostSegmentsTask;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/ignite/cdc/CdcSelfTest.class */
public class CdcSelfTest extends AbstractCdcTest {
    public static final String TX_CACHE_NAME = "tx-cache";
    public static final long CREATE_TTL = 500000;
    public static final long UPDATE_TTL = 60000;

    @Parameterized.Parameter
    public boolean specificConsistentId;

    @Parameterized.Parameter(1)
    public WALMode walMode;

    @Parameterized.Parameter(2)
    public boolean persistenceEnabled;
    private long cdcWalDirMaxSize = 0;

    @Parameterized.Parameters(name = "consistentId={0}, wal={1}, persistence={2}")
    public static Collection<?> parameters() {
        ArrayList arrayList = new ArrayList();
        Iterator it = EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, WALMode.BACKGROUND).iterator();
        while (it.hasNext()) {
            WALMode wALMode = (WALMode) it.next();
            for (boolean z : new boolean[]{false, true}) {
                for (boolean z2 : new boolean[]{true, false}) {
                    arrayList.add(new Object[]{Boolean.valueOf(z), wALMode, Boolean.valueOf(z2)});
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        if (this.specificConsistentId) {
            configuration.setConsistentId(str);
        }
        configuration.setDataStorageConfiguration(new DataStorageConfiguration().setWalMode(this.walMode).setWalForceArchiveTimeout(5000L).setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(this.persistenceEnabled).setCdcEnabled(true)).setWalArchivePath("db/wal/archive/" + U.maskForFileName(str)).setCdcWalDirectoryMaxSize(this.cdcWalDirMaxSize));
        configuration.setCacheConfiguration(new CacheConfiguration[]{new CacheConfiguration(TX_CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL).setBackups(1)});
        return configuration;
    }

    @Test
    public void testReadAllKeysCommitAll() throws Exception {
        readAll(new AbstractCdcTest.UserCdcConsumer(), true);
    }

    @Test
    public void testReadAllKeysWithoutCommit() throws Exception {
        readAll(new AbstractCdcTest.UserCdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.1
            @Override // org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
            public boolean onEvents(Iterator<CdcEvent> it) {
                if (!it.hasNext()) {
                    return false;
                }
                super.onEvents(Collections.singleton(it.next()).iterator());
                return false;
            }
        }, false);
    }

    @Test
    public void testReadAllKeysCommitEachEvent() throws Exception {
        readAll(new AbstractCdcTest.UserCdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.2
            @Override // org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
            public boolean onEvents(Iterator<CdcEvent> it) {
                if (!it.hasNext()) {
                    return true;
                }
                super.onEvents(Collections.singleton(it.next()).iterator());
                return true;
            }
        }, true);
    }

    @Test
    public void testReadExpireTime() throws Exception {
        IgniteConfiguration configuration = getConfiguration("ignite-0");
        IgniteEx startGrid = startGrid(configuration);
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache<Integer, ?> orCreateCache = startGrid.getOrCreateCache("default");
        IgniteCache withExpiryPolicy = orCreateCache.withExpiryPolicy(new PlatformExpiryPolicy(CREATE_TTL, UPDATE_TTL, 0L));
        for (int i = 0; i < 50; i++) {
            if (i % 2 == 0) {
                withExpiryPolicy.put(Integer.valueOf(i), createUser(i));
                withExpiryPolicy.put(Integer.valueOf(i), createUser(i));
            } else {
                orCreateCache.put(Integer.valueOf(i), createUser(i));
                orCreateCache.put(Integer.valueOf(i), createUser(i));
            }
        }
        removeData(orCreateCache, 0, 50);
        final HashSet hashSet = new HashSet();
        AbstractCdcTest.TestCdcConsumer<?> testCdcConsumer = new AbstractCdcTest.UserCdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.3
            @Override // org.apache.ignite.cdc.AbstractCdcTest.UserCdcConsumer, org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
            public void checkEvent(CdcEvent cdcEvent) {
                super.checkEvent(cdcEvent);
                Integer num = (Integer) cdcEvent.key();
                if (cdcEvent.value() == null || num.intValue() % 2 != 0) {
                    CdcSelfTest.assertEquals("Expire time must not be set [key=" + num + ']', 0L, cdcEvent.expireTime());
                    return;
                }
                CdcSelfTest.assertTrue("Expire must be set [key=" + num + ']', cdcEvent.expireTime() != 0);
                CdcSelfTest.assertTrue("Expire for operation", cdcEvent.expireTime() - System.currentTimeMillis() <= (hashSet.contains(num) ? CdcSelfTest.UPDATE_TTL : CdcSelfTest.CREATE_TTL));
                hashSet.add(num);
            }
        };
        IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc(testCdcConsumer, configuration));
        waitForSize(100, "default", AbstractCdcTest.ChangeEventType.UPDATE, testCdcConsumer);
        waitForSize(50, "default", AbstractCdcTest.ChangeEventType.DELETE, testCdcConsumer);
        runAsync.cancel();
        assertTrue(testCdcConsumer.stopped());
        assertEquals(25, hashSet.size());
        stopAllGrids();
        cleanPersistenceDir();
    }

    private void readAll(AbstractCdcTest.UserCdcConsumer userCdcConsumer, boolean z) throws Exception {
        IgniteConfiguration configuration = getConfiguration("ignite-0");
        IgniteEx startGrid = startGrid(configuration);
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache<Integer, AbstractCdcTest.User> orCreateCache = startGrid.getOrCreateCache("default");
        addAndWaitForConsumption(userCdcConsumer, configuration, orCreateCache, startGrid.getOrCreateCache(TX_CACHE_NAME), (v0, v1, v2) -> {
            addData(v0, v1, v2);
        }, 0, 53, z);
        removeData(orCreateCache, 0, 50);
        CdcMain createCdc = createCdc(userCdcConsumer, configuration);
        IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc);
        waitForSize(50, "default", AbstractCdcTest.ChangeEventType.DELETE, userCdcConsumer);
        checkMetrics(createCdc, z ? 50 : 156);
        runAsync.cancel();
        assertTrue(userCdcConsumer.stopped());
        stopAllGrids();
        cleanPersistenceDir();
    }

    @Test
    public void testReadOneByOneForBackup() throws Exception {
        Assume.assumeTrue("CDC with 2 local nodes can't determine correct PDS directory without specificConsistentId.", this.specificConsistentId);
        IgniteEx startGrids = startGrids(2);
        startGrids.cluster().state(ClusterState.ACTIVE);
        IgniteCache<?, ?> cache = startGrids.cache(TX_CACHE_NAME);
        awaitPartitionMapExchange();
        final int i = 3;
        Map map = (Map) primaryKeys(cache, 3).stream().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return num2;
        }, (num3, num4) -> {
            return num3;
        }, TreeMap::new));
        cache.putAll(map);
        IgniteWalIteratorFactory.IteratorParametersBuilder filter = new IgniteWalIteratorFactory.IteratorParametersBuilder().filesOrDirs(new File[]{U.resolveWorkDirectory(U.defaultWorkDirectory(), grid(1).configuration().getDataStorageConfiguration().getWalArchivePath(), false)}).filter((recordType, wALPointer) -> {
            return recordType == WALRecord.RecordType.DATA_RECORD_V2;
        });
        assertTrue("DataRecord(List<DataEntry>) should be logged.", GridTestUtils.waitForCondition(() -> {
            WALIterator it;
            try {
                it = new IgniteWalIteratorFactory(log).iterator(filter);
                Throwable th = null;
            } catch (IgniteCheckedException e) {
                throw U.convertException(e);
            }
            while (true) {
                try {
                    try {
                        if (!it.hasNext()) {
                            break;
                        }
                        if (((DataRecord) ((IgniteBiTuple) it.next()).get2()).entryCount() > 1) {
                            break;
                        }
                    } finally {
                    }
                } finally {
                }
                throw U.convertException(e);
            }
            return true;
            return false;
        }, getTestTimeout()));
        for (int i2 = 0; i2 < 2; i2++) {
            IgniteEx grid = grid(i2);
            final HashSet hashSet = new HashSet();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            CdcConsumer cdcConsumer = new CdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.4
                public boolean onEvents(Iterator<CdcEvent> it) {
                    if (!it.hasNext()) {
                        return true;
                    }
                    if (!atomicBoolean.get()) {
                        throw new RuntimeException("Expected fail.");
                    }
                    hashSet.add((Integer) it.next().key());
                    atomicBoolean.set(false);
                    if (hashSet.size() == i) {
                        throw new RuntimeException("Expected fail.");
                    }
                    return true;
                }

                public void onTypes(Iterator<BinaryType> it) {
                    it.forEachRemaining(binaryType -> {
                        CdcSelfTest.assertNotNull(binaryType);
                    });
                }

                public void onMappings(Iterator<TypeMapping> it) {
                    it.forEachRemaining(typeMapping -> {
                        CdcSelfTest.assertNotNull(typeMapping);
                    });
                }

                public void onCacheChange(Iterator<CdcCacheEvent> it) {
                    it.forEachRemaining(cdcCacheEvent -> {
                        CdcSelfTest.assertNotNull(cdcCacheEvent);
                    });
                }

                public void onCacheDestroy(Iterator<Integer> it) {
                    it.forEachRemaining(num5 -> {
                        CdcSelfTest.assertNotNull(num5);
                    });
                }

                public void stop() {
                }

                public void start(MetricRegistry metricRegistry) {
                }
            };
            for (int i3 = 0; i3 < 3; i3++) {
                IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc(cdcConsumer, getConfiguration(grid.name())));
                runAsync.getClass();
                assertTrue(GridTestUtils.waitForCondition(runAsync::isDone, getTestTimeout()));
                assertEquals(i3 + 1, hashSet.size());
                atomicBoolean.set(true);
            }
            assertTrue(F.eqNotOrdered(map.keySet(), hashSet));
        }
    }

    @Test
    public void testReadFromNextEntry() throws Exception {
        IgniteConfiguration configuration = getConfiguration("ignite-0");
        IgniteEx startGrid = startGrid(configuration);
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache orCreateCache = startGrid.getOrCreateCache("default");
        final int i = 10;
        addData(orCreateCache, 0, 10 / 2);
        long lastArchivedSegment = startGrid.context().cache().context().wal(true).lastArchivedSegment();
        GridTestUtils.waitForCondition(() -> {
            return startGrid.context().cache().context().wal(true).lastArchivedSegment() > lastArchivedSegment;
        }, getTestTimeout());
        addData(orCreateCache, 10 / 2, 10);
        final AtomicInteger atomicInteger = new AtomicInteger();
        int i2 = 0;
        while (true) {
            int i3 = i2;
            if (atomicInteger.get() == 10) {
                return;
            }
            final String str = "Expected fail";
            IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc(new CdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.5
                boolean oneConsumed;

                public boolean onEvents(Iterator<CdcEvent> it) {
                    if (!it.hasNext()) {
                        return true;
                    }
                    if (this.oneConsumed) {
                        throw new RuntimeException(str);
                    }
                    CdcSelfTest.assertEquals(Integer.valueOf(atomicInteger.get()), it.next().key());
                    atomicInteger.incrementAndGet();
                    if (atomicInteger.get() == i) {
                        throw new RuntimeException(str);
                    }
                    this.oneConsumed = true;
                    return true;
                }

                public void onTypes(Iterator<BinaryType> it) {
                    it.forEachRemaining(binaryType -> {
                        CdcSelfTest.assertNotNull(binaryType);
                    });
                }

                public void onMappings(Iterator<TypeMapping> it) {
                    it.forEachRemaining(typeMapping -> {
                        CdcSelfTest.assertNotNull(typeMapping);
                    });
                }

                public void onCacheChange(Iterator<CdcCacheEvent> it) {
                    it.forEachRemaining(cdcCacheEvent -> {
                        CdcSelfTest.assertNotNull(cdcCacheEvent);
                    });
                }

                public void onCacheDestroy(Iterator<Integer> it) {
                    it.forEachRemaining(num -> {
                        CdcSelfTest.assertNotNull(num);
                    });
                }

                public void stop() {
                }

                public void start(MetricRegistry metricRegistry) {
                }
            }, configuration));
            runAsync.getClass();
            assertTrue(GridTestUtils.waitForCondition(runAsync::isDone, getTestTimeout()));
            if (!"Expected fail".equals(runAsync.error().getMessage())) {
                throw new RuntimeException(runAsync.error());
            }
            assertEquals(1, atomicInteger.get() - i3);
            i2 = atomicInteger.get();
        }
    }

    @Test
    public void testReadBeforeGracefulShutdown() throws Exception {
        IgniteEx startGrid = startGrid(getConfiguration("ignite-0"));
        startGrid.cluster().state(ClusterState.ACTIVE);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AbstractCdcTest.UserCdcConsumer userCdcConsumer = new AbstractCdcTest.UserCdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.6
            @Override // org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
            public boolean onEvents(Iterator<CdcEvent> it) {
                countDownLatch.countDown();
                try {
                    countDownLatch2.await(CdcSelfTest.this.getTestTimeout(), TimeUnit.MILLISECONDS);
                    return super.onEvents(it);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        CdcMain createCdc = createCdc(userCdcConsumer, getConfiguration(startGrid.name()));
        GridTestUtils.runAsync((Runnable) createCdc);
        addData(startGrid.getOrCreateCache("default"), 0, 50);
        Thread.sleep(10000L);
        countDownLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
        createCdc.stop();
        countDownLatch2.countDown();
        waitForSize(50, "default", AbstractCdcTest.ChangeEventType.UPDATE, userCdcConsumer);
        userCdcConsumer.getClass();
        assertTrue(GridTestUtils.waitForCondition(userCdcConsumer::stopped, getTestTimeout()));
        List<Integer> data = userCdcConsumer.data(AbstractCdcTest.ChangeEventType.UPDATE, GridCacheUtils.cacheId("default"));
        assertEquals(50, data.size());
        for (int i = 0; i < 50; i++) {
            assertTrue(data.contains(Integer.valueOf(i)));
        }
    }

    @Test
    @WithSystemProperty(key = "IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID", value = "true")
    public void testMultiNodeConsumption() throws Exception {
        Ignite startGrid = startGrid(0);
        Ignite startGrid2 = startGrid(1);
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache<Integer, ?> orCreateCache = startGrid.getOrCreateCache("default");
        int[] iArr = new int[2];
        for (int i = 0; i < 100; i++) {
            Ignite primaryNode = primaryNode(Integer.valueOf(i), "default");
            assertTrue(primaryNode == startGrid || primaryNode == startGrid2);
            char c = primaryNode == startGrid ? (char) 0 : (char) 1;
            iArr[c] = iArr[c] + 1;
        }
        IgniteInternalFuture runAsync = GridTestUtils.runAsync(() -> {
            addData(orCreateCache, 0, 50);
        });
        AbstractCdcTest.UserCdcConsumer userCdcConsumer = new AbstractCdcTest.UserCdcConsumer();
        AbstractCdcTest.UserCdcConsumer userCdcConsumer2 = new AbstractCdcTest.UserCdcConsumer();
        IgniteConfiguration configuration = getConfiguration(startGrid.name());
        IgniteConfiguration configuration2 = getConfiguration(startGrid2.name());
        if (!this.specificConsistentId) {
            configuration.setConsistentId((Serializable) startGrid.localNode().consistentId());
            configuration2.setConsistentId((Serializable) startGrid2.localNode().consistentId());
        }
        CountDownLatch countDownLatch = new CountDownLatch(2);
        GridAbsPredicate sizePredicate = sizePredicate(iArr[0], "default", AbstractCdcTest.ChangeEventType.UPDATE, userCdcConsumer);
        GridAbsPredicate sizePredicate2 = sizePredicate(iArr[1], "default", AbstractCdcTest.ChangeEventType.UPDATE, userCdcConsumer2);
        CdcMain createCdc = createCdc(userCdcConsumer, configuration, countDownLatch, sizePredicate);
        CdcMain createCdc2 = createCdc(userCdcConsumer2, configuration2, countDownLatch, sizePredicate2);
        IgniteInternalFuture runAsync2 = GridTestUtils.runAsync((Runnable) createCdc);
        IgniteInternalFuture runAsync3 = GridTestUtils.runAsync((Runnable) createCdc2);
        runAsync.get(getTestTimeout());
        GridTestUtils.runAsync(() -> {
            addData(orCreateCache, 50, 100);
        }).get(getTestTimeout());
        assertTrue(countDownLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS));
        checkMetrics(createCdc, iArr[0]);
        checkMetrics(createCdc2, iArr[1]);
        assertFalse(userCdcConsumer.stopped());
        assertFalse(userCdcConsumer2.stopped());
        runAsync2.cancel();
        runAsync3.cancel();
        assertTrue(userCdcConsumer.stopped());
        assertTrue(userCdcConsumer2.stopped());
        removeData(orCreateCache, 0, 100);
        CdcMain createCdc3 = createCdc(userCdcConsumer, configuration);
        CdcMain createCdc4 = createCdc(userCdcConsumer2, configuration2);
        IgniteInternalFuture runAsync4 = GridTestUtils.runAsync((Runnable) createCdc3);
        IgniteInternalFuture runAsync5 = GridTestUtils.runAsync((Runnable) createCdc4);
        waitForSize(100, "default", AbstractCdcTest.ChangeEventType.DELETE, userCdcConsumer, userCdcConsumer2);
        checkMetrics(createCdc3, iArr[0]);
        checkMetrics(createCdc4, iArr[1]);
        runAsync4.cancel();
        runAsync5.cancel();
        assertTrue(userCdcConsumer.stopped());
        assertTrue(userCdcConsumer2.stopped());
    }

    @Test
    public void testCdcSingleton() throws Exception {
        IgniteEx startGrid = startGrid(0);
        AbstractCdcTest.UserCdcConsumer userCdcConsumer = new AbstractCdcTest.UserCdcConsumer();
        AbstractCdcTest.UserCdcConsumer userCdcConsumer2 = new AbstractCdcTest.UserCdcConsumer();
        IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc(userCdcConsumer, getConfiguration(startGrid.name())));
        IgniteInternalFuture runAsync2 = GridTestUtils.runAsync((Runnable) createCdc(userCdcConsumer2, getConfiguration(startGrid.name())));
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return runAsync.isDone() || runAsync2.isDone();
        }, getTestTimeout()));
        assertEquals(runAsync.error() == null, runAsync2.error() != null);
        if (runAsync.isDone()) {
            runAsync2.cancel();
            assertTrue(userCdcConsumer2.stopped());
        } else {
            runAsync.cancel();
            assertTrue(userCdcConsumer.stopped());
        }
    }

    @Test
    public void testReReadWhenStateWasNotStored() throws Exception {
        Supplier supplier = () -> {
            stopAllGrids(false);
            try {
                IgniteEx startGrid = startGrid(getConfiguration("ignite-0"));
                startGrid.cluster().state(ClusterState.ACTIVE);
                return startGrid;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
        addData(((IgniteEx) supplier.get()).getOrCreateCache("default"), 0, 25);
        IgniteEx igniteEx = (IgniteEx) supplier.get();
        IgniteCache<Integer, ?> orCreateCache = igniteEx.getOrCreateCache("default");
        addData(orCreateCache, 25, 50);
        AbstractCdcTest.UserCdcConsumer userCdcConsumer = new AbstractCdcTest.UserCdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.7
            @Override // org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
            protected boolean commit() {
                return false;
            }
        };
        for (int i = 0; i < 3; i++) {
            CdcMain createCdc = createCdc(userCdcConsumer, getConfiguration(igniteEx.name()));
            IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc);
            waitForSize(50, "default", AbstractCdcTest.ChangeEventType.UPDATE, userCdcConsumer);
            checkMetrics(createCdc, 50);
            runAsync.cancel();
            assertTrue(userCdcConsumer.stopped());
            userCdcConsumer.data.clear();
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final int i2 = 25;
        AbstractCdcTest.TestCdcConsumer<?> testCdcConsumer = new AbstractCdcTest.UserCdcConsumer() { // from class: org.apache.ignite.cdc.CdcSelfTest.8
            @Override // org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
            public boolean onEvents(Iterator<CdcEvent> it) {
                if (atomicBoolean.get() && F.size(data(AbstractCdcTest.ChangeEventType.UPDATE, GridCacheUtils.cacheId("default")), new IgnitePredicate[0]) == i2) {
                    atomicBoolean2.set(true);
                    return false;
                }
                while (it.hasNext()) {
                    CdcEvent next = it.next();
                    if (next.primary()) {
                        ((List) this.data.computeIfAbsent(F.t(next.value() == null ? AbstractCdcTest.ChangeEventType.DELETE : AbstractCdcTest.ChangeEventType.UPDATE, Integer.valueOf(next.cacheId())), igniteBiTuple -> {
                            return new ArrayList();
                        })).add((Integer) next.key());
                        if (atomicBoolean.get()) {
                            return F.size(data(AbstractCdcTest.ChangeEventType.UPDATE, GridCacheUtils.cacheId("default")), new IgnitePredicate[0]) == i2;
                        }
                    }
                }
                return true;
            }
        };
        CdcMain createCdc2 = createCdc(testCdcConsumer, getConfiguration(igniteEx.name()));
        IgniteInternalFuture runAsync2 = GridTestUtils.runAsync((Runnable) createCdc2);
        waitForSize(25, "default", AbstractCdcTest.ChangeEventType.UPDATE, testCdcConsumer);
        checkMetrics(createCdc2, 25);
        atomicBoolean2.getClass();
        GridTestUtils.waitForCondition(atomicBoolean2::get, getTestTimeout());
        runAsync2.cancel();
        assertTrue(testCdcConsumer.stopped());
        removeData(orCreateCache, 0, 50);
        atomicBoolean.set(false);
        CdcMain createCdc3 = createCdc(testCdcConsumer, getConfiguration(igniteEx.name()));
        IgniteInternalFuture runAsync3 = GridTestUtils.runAsync((Runnable) createCdc3);
        waitForSize(50, "default", AbstractCdcTest.ChangeEventType.UPDATE, testCdcConsumer);
        waitForSize(50, "default", AbstractCdcTest.ChangeEventType.DELETE, testCdcConsumer);
        checkMetrics(createCdc3, 100 - 25);
        runAsync3.cancel();
        assertTrue(testCdcConsumer.stopped());
    }

    @Test
    public void testDisable() throws Exception {
        IgniteEx startGrid = startGrid(0);
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache orCreateCache = startGrid.getOrCreateCache("default");
        addData(orCreateCache, 0, 1);
        File file = (File) U.field(startGrid.context().cache().context().wal(true), "walCdcDir");
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return 1 == file.list().length;
        }, 10000L));
        DistributedChangeableProperty property = startGrid.context().distributedConfiguration().property("cdc.disabled");
        property.propagate(true);
        addData(orCreateCache, 0, 1);
        Thread.sleep(10000L);
        assertEquals(1, file.list().length);
        property.propagate(false);
        addData(orCreateCache, 0, 1);
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return 2 == file.list().length;
        }, 10000L));
    }

    @Test
    public void testCdcDirectoryMaxSize() throws Exception {
        this.cdcWalDirMaxSize = 10485760L;
        int i = (int) (this.cdcWalDirMaxSize / 2);
        IgniteEx startGrid = startGrid(0);
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache orCreateCache = startGrid.getOrCreateCache("default");
        IgniteWriteAheadLogManager wal = startGrid.context().cache().context().wal(true);
        File file = (File) U.field(startGrid.context().cache().context().wal(true), "walCdcDir");
        RunnableX runnableX = () -> {
            int walArchiveSegments = wal.walArchiveSegments();
            int i2 = (int) (i * 0.8d);
            for (int i3 = 0; i3 < i2 / TrackingPageIOTest.PAGE_SIZE; i3++) {
                wal.log(new PageSnapshot(new FullPageId(-1L, -1), new byte[TrackingPageIOTest.PAGE_SIZE], 1));
            }
            addData(orCreateCache, 0, 1);
            GridTestUtils.waitForCondition(() -> {
                return wal.walArchiveSegments() > walArchiveSegments;
            }, 10000L);
        };
        runnableX.run();
        runnableX.run();
        runnableX.run();
        assertTrue(this.cdcWalDirMaxSize >= Arrays.stream(file.listFiles()).mapToLong((v0) -> {
            return v0.length();
        }).sum());
        AbstractCdcTest.UserCdcConsumer userCdcConsumer = new AbstractCdcTest.UserCdcConsumer();
        IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc(userCdcConsumer, getConfiguration(startGrid.name())));
        waitForSize(2, "default", AbstractCdcTest.ChangeEventType.UPDATE, userCdcConsumer);
        assertFalse(runAsync.isDone());
        runnableX.run();
        GridTestUtils.assertThrows(log, (Callable<?>) () -> {
            return runAsync.get(getTestTimeout());
        }, (Class<? extends Throwable>) IgniteCheckedException.class, "Found missed segments. Some events are missed.");
        startGrid.compute().execute(VisorCdcDeleteLostSegmentsTask.class, new VisorTaskArgument(startGrid.localNode().id(), false));
        userCdcConsumer.data.clear();
        IgniteInternalFuture runAsync2 = GridTestUtils.runAsync((Runnable) createCdc(userCdcConsumer, getConfiguration(startGrid.name())));
        waitForSize(1, "default", AbstractCdcTest.ChangeEventType.UPDATE, userCdcConsumer);
        assertFalse(runAsync2.isDone());
        runAsync2.cancel();
    }

    public static void addData(IgniteCache<Integer, AbstractCdcTest.User> igniteCache, int i, int i2) {
        for (int i3 = i; i3 < i2; i3++) {
            igniteCache.put(Integer.valueOf(i3), createUser(i3));
        }
    }

    private void removeData(IgniteCache<Integer, ?> igniteCache, int i, int i2) {
        for (int i3 = i; i3 < i2; i3++) {
            igniteCache.remove(Integer.valueOf(i3));
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1483659570:
                if (implMethodName.equals("lambda$testReadOneByOneForBackup$9e94cab7$1")) {
                    z = 3;
                    break;
                }
                break;
            case 458088037:
                if (implMethodName.equals("lambda$testMultiNodeConsumption$f55b82e2$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1351564494:
                if (implMethodName.equals("lambda$testCdcDirectoryMaxSize$415a6fd3$1")) {
                    z = false;
                    break;
                }
                break;
            case 1607196284:
                if (implMethodName.equals("lambda$testMultiNodeConsumption$e6930ed$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/internal/util/lang/RunnableX") && serializedLambda.getFunctionalInterfaceMethodName().equals("runx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("org/apache/ignite/cdc/CdcSelfTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager;ILorg/apache/ignite/IgniteCache;)V")) {
                    IgniteWriteAheadLogManager igniteWriteAheadLogManager = (IgniteWriteAheadLogManager) serializedLambda.getCapturedArg(0);
                    int intValue = ((Integer) serializedLambda.getCapturedArg(1)).intValue();
                    IgniteCache igniteCache = (IgniteCache) serializedLambda.getCapturedArg(2);
                    return () -> {
                        int walArchiveSegments = igniteWriteAheadLogManager.walArchiveSegments();
                        int i2 = (int) (intValue * 0.8d);
                        for (int i3 = 0; i3 < i2 / TrackingPageIOTest.PAGE_SIZE; i3++) {
                            igniteWriteAheadLogManager.log(new PageSnapshot(new FullPageId(-1L, -1), new byte[TrackingPageIOTest.PAGE_SIZE], 1));
                        }
                        addData(igniteCache, 0, 1);
                        GridTestUtils.waitForCondition(() -> {
                            return igniteWriteAheadLogManager.walArchiveSegments() > walArchiveSegments;
                        }, 10000L);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/internal/util/lang/RunnableX") && serializedLambda.getFunctionalInterfaceMethodName().equals("runx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("org/apache/ignite/cdc/CdcSelfTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/ignite/IgniteCache;)V")) {
                    IgniteCache igniteCache2 = (IgniteCache) serializedLambda.getCapturedArg(0);
                    return () -> {
                        addData(igniteCache2, 0, 50);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/internal/util/lang/RunnableX") && serializedLambda.getFunctionalInterfaceMethodName().equals("runx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("org/apache/ignite/cdc/CdcSelfTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/ignite/IgniteCache;)V")) {
                    IgniteCache igniteCache3 = (IgniteCache) serializedLambda.getCapturedArg(0);
                    return () -> {
                        addData(igniteCache3, 50, 100);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteBiPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/ignite/cdc/CdcSelfTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/ignite/internal/pagemem/wal/record/WALRecord$RecordType;Lorg/apache/ignite/internal/processors/cache/persistence/wal/WALPointer;)Z")) {
                    return (recordType, wALPointer) -> {
                        return recordType == WALRecord.RecordType.DATA_RECORD_V2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
