package org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.IncrementalSnapshotFinishRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotSelfTest;
import org.apache.ignite.internal.util.distributed.InitMessage;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest.class */
public class IncrementalSnapshotJoiningClientTest extends AbstractIncrementalSnapshotTest {
    private static volatile CountDownLatch blockClientJoinReq;
    private static volatile CountDownLatch unblockClientJoinReq;
    private static volatile CountDownLatch acceptClientReq;
    private static volatile CountDownLatch addClient;
    private static volatile CountDownLatch rcvStartSnpReq;

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest$ClientBlockingDiscoverySpi.class */
    private static class ClientBlockingDiscoverySpi extends TcpDiscoverySpi {
        private ClientBlockingDiscoverySpi() {
        }

        protected void writeToSocket(Socket socket, OutputStream outputStream, TcpDiscoveryAbstractMessage tcpDiscoveryAbstractMessage, long j) throws IOException, IgniteCheckedException {
            if ((tcpDiscoveryAbstractMessage instanceof TcpDiscoveryJoinRequestMessage) && IncrementalSnapshotJoiningClientTest.blockClientJoinReq != null) {
                IncrementalSnapshotJoiningClientTest.blockClientJoinReq.countDown();
                U.awaitQuiet(IncrementalSnapshotJoiningClientTest.unblockClientJoinReq);
            }
            super.writeToSocket(socket, outputStream, tcpDiscoveryAbstractMessage, j);
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/persistence/snapshot/incremental/IncrementalSnapshotJoiningClientTest$CoordinatorBlockingDiscoverySpi.class */
    private static class CoordinatorBlockingDiscoverySpi extends TcpDiscoverySpi {
        private CoordinatorBlockingDiscoverySpi() {
        }

        protected void startMessageProcess(TcpDiscoveryAbstractMessage tcpDiscoveryAbstractMessage) {
            if ((tcpDiscoveryAbstractMessage instanceof TcpDiscoveryNodeAddedMessage) && IncrementalSnapshotJoiningClientTest.acceptClientReq != null) {
                IncrementalSnapshotJoiningClientTest.acceptClientReq.countDown();
                U.awaitQuiet(IncrementalSnapshotJoiningClientTest.addClient);
            }
            if (!(tcpDiscoveryAbstractMessage instanceof TcpDiscoveryCustomEventMessage) || IncrementalSnapshotJoiningClientTest.rcvStartSnpReq == null) {
                return;
            }
            try {
                if (((TcpDiscoveryCustomEventMessage) tcpDiscoveryAbstractMessage).message(marshaller(), U.resolveClassLoader(ignite().configuration())).delegate() instanceof InitMessage) {
                    IncrementalSnapshotJoiningClientTest.rcvStartSnpReq.countDown();
                }
            } catch (Throwable th) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental.AbstractIncrementalSnapshotTest, org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setCommunicationSpi(new TestRecordingCommunicationSpi());
        configuration.setSnapshotThreadPoolSize(1);
        if (getTestIgniteInstanceIndex(str) == 0) {
            configuration.setDiscoverySpi(new CoordinatorBlockingDiscoverySpi().setIpFinder(configuration.getDiscoverySpi().getIpFinder()));
        }
        if (getTestIgniteInstanceIndex(str) == nodes() + 1) {
            configuration.setDiscoverySpi(new ClientBlockingDiscoverySpi().setIpFinder(configuration.getDiscoverySpi().getIpFinder()));
        }
        return configuration;
    }

    @Test
    public void testJoiningClientShouldInitLocalSnapshot() throws Exception {
        checkClientAwarenessOfSnapshot(true, () -> {
            unblockClientJoinReq.countDown();
            U.awaitQuiet(acceptClientReq);
            IgniteFuture createIncrementalSnapshot = AbstractSnapshotSelfTest.snp(grid(0)).createIncrementalSnapshot("base");
            addClient.countDown();
            return createIncrementalSnapshot;
        });
    }

    @Test
    public void testJoiningClientShouldNotInitLocalSnapshot() throws Exception {
        checkClientAwarenessOfSnapshot(false, () -> {
            IgniteFuture createIncrementalSnapshot = AbstractSnapshotSelfTest.snp(grid(0)).createIncrementalSnapshot("base");
            U.awaitQuiet(rcvStartSnpReq);
            unblockClientJoinReq.countDown();
            addClient.countDown();
            return createIncrementalSnapshot;
        });
    }

    private void checkClientAwarenessOfSnapshot(boolean z, Supplier<IgniteFuture<Void>> supplier) throws Exception {
        rcvStartSnpReq = new CountDownLatch(1);
        acceptClientReq = new CountDownLatch(1);
        unblockClientJoinReq = new CountDownLatch(1);
        blockClientJoinReq = new CountDownLatch(1);
        addClient = new CountDownLatch(1);
        IgniteInternalFuture<?> multithreadedAsync = multithreadedAsync(() -> {
            return startClientGrid(nodes() + 1);
        }, 1);
        U.awaitQuiet(blockClientJoinReq);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        grid(0).context().pools().getSnapshotExecutorService().submit(() -> {
            U.awaitQuiet(countDownLatch);
        });
        IgniteFuture<Void> igniteFuture = supplier.get();
        multithreadedAsync.get(getTestTimeout());
        IgniteUuid runTx = runTx(grid(nodes() + 1), 0, 100);
        if (z) {
            assertNotNull(AbstractSnapshotSelfTest.snp(grid(nodes() + 1)).incrementalSnapshotId());
        } else {
            assertNull(AbstractSnapshotSelfTest.snp(grid(nodes() + 1)).incrementalSnapshotId());
        }
        countDownLatch.countDown();
        igniteFuture.get(getTestTimeout());
        assertTrue(transactionExcluded(0, runTx));
        checkRestoredSnapshotIsEmpty();
    }

    private IgniteUuid runTx(IgniteEx igniteEx, int i, int i2) {
        Transaction txStart = igniteEx.transactions().txStart();
        Throwable th = null;
        for (int i3 = 0; i3 < 10; i3++) {
            try {
                try {
                    igniteEx.cache("CACHE").put(Integer.valueOf(i + ThreadLocalRandom.current().nextInt(i2 - i)), 0);
                } finally {
                }
            } catch (Throwable th2) {
                if (txStart != null) {
                    if (th != null) {
                        try {
                            txStart.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        txStart.close();
                    }
                }
                throw th2;
            }
        }
        txStart.commit();
        IgniteUuid xid = txStart.xid();
        if (txStart != null) {
            if (0 != 0) {
                try {
                    txStart.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                txStart.close();
            }
        }
        return xid;
    }

    private boolean transactionExcluded(int i, IgniteUuid igniteUuid) throws Exception {
        IncrementalSnapshotFinishRecord incrementalSnapshotFinishRecord;
        WALIterator walIter = walIter(i);
        Throwable th = null;
        do {
            try {
                try {
                    if (!walIter.hasNext()) {
                        if (walIter == null) {
                            return false;
                        }
                        if (0 == 0) {
                            walIter.close();
                            return false;
                        }
                        try {
                            walIter.close();
                            return false;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return false;
                        }
                    }
                    incrementalSnapshotFinishRecord = (WALRecord) ((IgniteBiTuple) walIter.next()).getValue();
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (walIter != null) {
                    if (th != null) {
                        try {
                            walIter.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        walIter.close();
                    }
                }
                throw th4;
            }
        } while (incrementalSnapshotFinishRecord.type() != WALRecord.RecordType.INCREMENTAL_SNAPSHOT_FINISH_RECORD);
        assertTrue(incrementalSnapshotFinishRecord.excluded().stream().anyMatch(gridCacheVersion -> {
            return gridCacheVersion.asIgniteUuid().equals(igniteUuid);
        }));
        if (walIter != null) {
            if (0 != 0) {
                try {
                    walIter.close();
                } catch (Throwable th6) {
                    th.addSuppressed(th6);
                }
            } else {
                walIter.close();
            }
        }
        return true;
    }

    private void checkRestoredSnapshotIsEmpty() throws Exception {
        stopAllGrids();
        cleanPersistenceDir(true);
        IgniteEx startGrids = startGrids(nodes());
        startGrids.cluster().state(ClusterState.ACTIVE);
        startGrids.destroyCache("CACHE");
        awaitPartitionMapExchange();
        startGrids.snapshot().restoreSnapshot("base", (Collection) null, 1).get(getTestTimeout());
        assertPartitionsSame(idleVerify(grid(0), new String[0]));
        assertEquals(0, grid(0).cache("CACHE").size(new CachePeekMode[0]));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental.AbstractIncrementalSnapshotTest
    public int nodes() {
        return 3;
    }

    @Override // org.apache.ignite.internal.processors.cache.persistence.snapshot.incremental.AbstractIncrementalSnapshotTest
    protected int backups() {
        return 2;
    }
}
