/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.execution.librarycache;

import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BlobLibraryCacheRecoveryITCase
extends TestLogger {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Parameterized.Parameter
    public boolean wrapsSystemClassLoader;

    @Parameterized.Parameters(name="Use system class loader: {0}")
    public static List<Boolean> useSystemClassLoader() {
        return Arrays.asList(true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoveryRegisterAndDownload() throws Exception {
        Random rand = new Random();
        BlobServer[] server = new BlobServer[2];
        InetSocketAddress[] serverAddress = new InetSocketAddress[2];
        BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
        PermanentBlobCache cache = null;
        BlobStoreService blobStoreService = null;
        Configuration config = new Configuration();
        config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.newFolder().getAbsolutePath());
        config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3600L);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try {
            blobStoreService = BlobUtils.createBlobStoreFromConfig((Configuration)config);
            BlobLibraryCacheManager.ClassLoaderFactory classLoaderFactory = BlobLibraryCacheManager.defaultClassLoaderFactory((FlinkUserCodeClassLoaders.ResolveOrder)FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, (String[])new String[0], null, (boolean)true);
            for (int i2 = 0; i2 < server.length; ++i2) {
                server[i2] = new BlobServer(config, this.temporaryFolder.newFolder(), (BlobStore)blobStoreService);
                server[i2].start();
                serverAddress[i2] = new InetSocketAddress("localhost", server[i2].getPort());
                libServer[i2] = new BlobLibraryCacheManager((PermanentBlobService)server[i2], classLoaderFactory, this.wrapsSystemClassLoader);
            }
            byte[] expected = new byte[1024];
            rand.nextBytes(expected);
            ArrayList<PermanentBlobKey> keys = new ArrayList<PermanentBlobKey>(2);
            JobID jobID = new JobID();
            keys.add(server[0].putPermanent(jobID, expected));
            byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
            keys.add(server[0].putPermanent(jobID, expected2));
            cache = new PermanentBlobCache(config, this.temporaryFolder.newFolder(), (BlobView)blobStoreService, serverAddress[0]);
            LibraryCacheManager.ClassLoaderLease classLoaderLease = libServer[0].registerClassLoaderLease(jobID);
            classLoaderLease.getOrResolveClassLoader(keys, Collections.emptyList());
            File f = cache.getFile(jobID, (PermanentBlobKey)keys.get(0));
            Assert.assertEquals((long)expected.length, (long)f.length());
            try (FileInputStream fis = new FileInputStream(f);){
                for (int i = 0; i < expected.length && fis.available() > 0; ++i) {
                    Assert.assertEquals((long)expected[i], (long)((byte)fis.read()));
                }
                Assert.assertEquals((long)0L, (long)fis.available());
            }
            cache.close();
            cache = new PermanentBlobCache(config, this.temporaryFolder.newFolder(), (BlobView)blobStoreService, serverAddress[1]);
            f = cache.getFile(jobID, (PermanentBlobKey)keys.get(0));
            Assert.assertEquals((long)expected.length, (long)f.length());
            fis = new FileInputStream(f);
            var17_24 = null;
            try {
                for (int i3 = 0; i3 < expected.length && fis.available() > 0; ++i3) {
                    Assert.assertEquals((long)expected[i3], (long)((byte)fis.read()));
                }
                Assert.assertEquals((long)0L, (long)fis.available());
            }
            catch (Throwable i3) {
                var17_24 = i3;
                throw i3;
            }
            finally {
                if (fis != null) {
                    if (var17_24 != null) {
                        try {
                            fis.close();
                        }
                        catch (Throwable i3) {
                            var17_24.addSuppressed(i3);
                        }
                    } else {
                        fis.close();
                    }
                }
            }
            f = cache.getFile(jobID, (PermanentBlobKey)keys.get(1));
            Assert.assertEquals((long)expected2.length, (long)f.length());
            fis = new FileInputStream(f);
            var17_24 = null;
            try {
                for (int i4 = 0; i4 < 256 && fis.available() > 0; ++i4) {
                    Assert.assertEquals((long)expected2[i4], (long)((byte)fis.read()));
                }
                Assert.assertEquals((long)0L, (long)fis.available());
            }
            catch (Throwable i4) {
                var17_24 = i4;
                throw i4;
            }
            finally {
                if (fis != null) {
                    if (var17_24 != null) {
                        try {
                            fis.close();
                        }
                        catch (Throwable i4) {
                            var17_24.addSuppressed(i4);
                        }
                    } else {
                        fis.close();
                    }
                }
            }
            server[1].globalCleanupAsync(jobID, (Executor)executorService).join();
            String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
            String haBlobStorePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
            File haBlobStoreDir = new File(haBlobStorePath, clusterId);
            Object[] recoveryFiles = haBlobStoreDir.listFiles();
            Assert.assertNotNull((String)"HA storage directory does not exist", (Object)recoveryFiles);
            Assert.assertEquals((String)("Unclean state backend: " + Arrays.toString(recoveryFiles)), (long)0L, (long)recoveryFiles.length);
        }
        catch (Throwable throwable) {
            MatcherAssert.assertThat(executorService.shutdownNow(), (Matcher)IsEmptyCollection.empty());
            for (BlobLibraryCacheManager s : libServer) {
                if (s == null) continue;
                s.shutdown();
            }
            for (BlobLibraryCacheManager s : server) {
                if (s == null) continue;
                s.close();
            }
            if (cache != null) {
                cache.close();
            }
            if (blobStoreService != null) {
                blobStoreService.closeAndCleanupAllData();
            }
            throw throwable;
        }
        MatcherAssert.assertThat(executorService.shutdownNow(), (Matcher)IsEmptyCollection.empty());
        for (BlobLibraryCacheManager blobLibraryCacheManager : libServer) {
            if (blobLibraryCacheManager == null) continue;
            blobLibraryCacheManager.shutdown();
        }
        for (BlobLibraryCacheManager blobLibraryCacheManager : server) {
            if (blobLibraryCacheManager == null) continue;
            blobLibraryCacheManager.close();
        }
        if (cache != null) {
            cache.close();
        }
        if (blobStoreService != null) {
            blobStoreService.closeAndCleanupAllData();
        }
    }
}

