/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobKeyTest;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobServerGetTest;
import org.apache.flink.runtime.blob.BlobServerPutTest;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlobServerRecoveryTest
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Test
    public void testBlobServerRecovery() throws Exception {
        Configuration config = new Configuration();
        config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getPath());
        BlobStoreService blobStoreService = null;
        try {
            blobStoreService = BlobUtils.createBlobStoreFromConfig((Configuration)config);
            BlobServerRecoveryTest.testBlobServerRecovery(config, (BlobStore)blobStoreService, TEMPORARY_FOLDER.newFolder());
        }
        finally {
            if (blobStoreService != null) {
                blobStoreService.closeAndCleanupAllData();
            }
        }
    }

    public static void testBlobServerRecovery(Configuration config, BlobStore blobStore, File blobStorage) throws Exception {
        String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
        String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
        Random rand = new Random();
        try (BlobServer server0 = new BlobServer(config, new File(blobStorage, "server0"), blobStore);
             BlobServer server1 = new BlobServer(config, new File(blobStorage, "server1"), blobStore);
             BlobCacheService cache1 = new BlobCacheService(config, new File(blobStorage, "cache1"), (BlobView)new VoidBlobStore(), new InetSocketAddress("localhost", server1.getPort()));){
            server0.start();
            server1.start();
            byte[] expected = new byte[1024];
            rand.nextBytes(expected);
            byte[] expected2 = Arrays.copyOfRange(expected, 32, 288);
            BlobKey[] keys = new BlobKey[2];
            JobID[] jobId = new JobID[]{new JobID(), new JobID()};
            keys[0] = BlobServerPutTest.put((BlobService)server0, jobId[0], expected, BlobKey.BlobType.PERMANENT_BLOB);
            keys[1] = BlobServerPutTest.put((BlobService)server0, jobId[1], expected2, BlobKey.BlobType.PERMANENT_BLOB);
            BlobKey nonHAKey = BlobServerPutTest.put((BlobService)server0, jobId[0], expected2, BlobKey.BlobType.TRANSIENT_BLOB);
            BlobKeyTest.verifyKeyDifferentHashEquals(keys[1], nonHAKey);
            Path blobServerPath = new Path(storagePath, "blob");
            FileSystem fs = blobServerPath.getFileSystem();
            Assert.assertTrue((String)("Unknown storage dir: " + blobServerPath), (boolean)fs.exists(blobServerPath));
            BlobServerPutTest.verifyContents((BlobService)cache1, jobId[0], keys[0], expected);
            BlobServerPutTest.verifyContents((BlobService)cache1, jobId[1], keys[1], expected2);
            BlobServerGetTest.verifyDeleted((BlobService)cache1, jobId[0], nonHAKey);
            server1.globalCleanupAsync(jobId[0], Executors.directExecutor()).join();
            server1.globalCleanupAsync(jobId[1], Executors.directExecutor()).join();
            Assert.assertTrue((String)"HA storage directory does not exist", (boolean)fs.exists(new Path(storagePath)));
            if (fs.exists(blobServerPath)) {
                FileStatus[] recoveryFiles = fs.listStatus(blobServerPath);
                ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
                for (FileStatus file : recoveryFiles) {
                    filenames.add(file.toString());
                }
                Assert.fail((String)("Unclean state backend: " + filenames));
            }
        }
    }
}

