/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServerConnection;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.FileSystemBlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobServer
extends Thread
implements BlobService {
    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
    private final AtomicInteger tempFileCounter = new AtomicInteger(0);
    private final ServerSocket serverSocket;
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final File storageDir;
    private final BlobStore blobStore;
    private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>();
    private final int maxConnections;
    private final Thread shutdownHook;

    public BlobServer(Configuration config) throws IOException {
        int finalBacklog;
        Preconditions.checkNotNull(config, "Configuration");
        RecoveryMode recoveryMode = RecoveryMode.fromConfig(config);
        String storageDirectory = config.getString("blob.storage.directory", null);
        this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
        LOG.info("Created BLOB server storage directory {}", (Object)this.storageDir);
        if (recoveryMode == RecoveryMode.STANDALONE) {
            this.blobStore = new VoidBlobStore();
        } else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
            this.blobStore = new FileSystemBlobStore(config);
        } else {
            throw new IllegalConfigurationException("Unexpected recovery mode '" + (Object)((Object)recoveryMode) + ".");
        }
        int maxConnections = config.getInteger("blob.fetch.num-concurrent", 50);
        if (maxConnections >= 1) {
            this.maxConnections = maxConnections;
        } else {
            LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", (Object)maxConnections, (Object)50);
            this.maxConnections = 50;
        }
        int backlog = config.getInteger("blob.fetch.backlog", 1000);
        if (backlog < 1) {
            LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", (Object)backlog, (Object)1000);
            backlog = 1000;
        }
        this.shutdownHook = recoveryMode == RecoveryMode.STANDALONE ? BlobUtils.addShutdownHook(this, LOG) : null;
        String serverPortRange = config.getString("blob.server.port", "0");
        Iterator ports = NetUtils.getPortRangeFromString((String)serverPortRange);
        ServerSocket socketAttempt = NetUtils.createSocketFromPorts((Iterator)ports, (NetUtils.SocketFactory)new NetUtils.SocketFactory(finalBacklog = backlog){
            final /* synthetic */ int val$finalBacklog;
            {
                this.val$finalBacklog = n;
            }

            public ServerSocket createSocket(int port) throws IOException {
                return new ServerSocket(port, this.val$finalBacklog);
            }
        });
        if (socketAttempt == null) {
            throw new IOException("Unable to allocate socket for blob server in specified port range: " + serverPortRange);
        }
        this.serverSocket = socketAttempt;
        this.setName("BLOB Server listener at " + this.getPort());
        this.setDaemon(true);
        this.start();
        if (LOG.isInfoEnabled()) {
            LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", new Object[]{this.serverSocket.getInetAddress().getHostAddress(), this.getPort(), maxConnections, backlog});
        }
    }

    File getStorageLocation(BlobKey key) {
        return BlobUtils.getStorageLocation(this.storageDir, key);
    }

    File getStorageLocation(JobID jobID, String key) {
        return BlobUtils.getStorageLocation(this.storageDir, jobID, key);
    }

    void deleteJobDirectory(JobID jobID) throws IOException {
        BlobUtils.deleteJobDirectory(this.storageDir, jobID);
    }

    File createTemporaryFilename() {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir), String.format("temp-%08d", this.tempFileCounter.getAndIncrement()));
    }

    BlobStore getBlobStore() {
        return this.blobStore;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            while (!this.shutdownRequested.get()) {
                Set<BlobServerConnection> set;
                BlobServerConnection conn = new BlobServerConnection(this.serverSocket.accept(), this);
                try {
                    set = this.activeConnections;
                    synchronized (set) {
                        while (this.activeConnections.size() >= this.maxConnections) {
                            this.activeConnections.wait(2000L);
                        }
                        this.activeConnections.add(conn);
                    }
                    conn.start();
                    conn = null;
                }
                finally {
                    if (conn == null) continue;
                    conn.close();
                    set = this.activeConnections;
                    synchronized (set) {
                        this.activeConnections.remove(conn);
                    }
                }
            }
            return;
        }
        catch (Throwable t) {
            if (this.shutdownRequested.get()) return;
            LOG.error("BLOB server stopped working. Shutting down", t);
            this.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            try {
                this.serverSocket.close();
            }
            catch (IOException ioe) {
                LOG.debug("Error while closing the server socket.", (Throwable)ioe);
            }
            this.interrupt();
            try {
                this.join();
            }
            catch (InterruptedException ie) {
                LOG.debug("Error while waiting for this thread to die.", (Throwable)ie);
            }
            Set<BlobServerConnection> ie = this.activeConnections;
            synchronized (ie) {
                if (!this.activeConnections.isEmpty()) {
                    for (BlobServerConnection conn : this.activeConnections) {
                        LOG.debug("Shutting down connection " + conn.getName());
                        conn.close();
                    }
                    this.activeConnections.clear();
                }
            }
            try {
                FileUtils.deleteDirectory((File)this.storageDir);
            }
            catch (IOException e) {
                LOG.error("BLOB server failed to properly clean up its storage directory.");
            }
            this.blobStore.cleanUp();
            if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                try {
                    Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                }
                catch (IllegalStateException e) {
                }
                catch (Throwable t) {
                    LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
                }
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Stopped BLOB server at {}:{}", (Object)this.serverSocket.getInetAddress().getHostAddress(), (Object)this.getPort());
            }
        }
    }

    @Override
    public URL getURL(BlobKey requiredBlob) throws IOException {
        if (requiredBlob == null) {
            throw new IllegalArgumentException("Required BLOB cannot be null.");
        }
        File localFile = BlobUtils.getStorageLocation(this.storageDir, requiredBlob);
        if (localFile.exists()) {
            return localFile.toURI().toURL();
        }
        try {
            this.blobStore.get(requiredBlob, localFile);
        }
        catch (Exception e) {
            throw new IOException("Failed to copy from blob store.", e);
        }
        if (localFile.exists()) {
            return localFile.toURI().toURL();
        }
        throw new FileNotFoundException("Local file " + localFile + " does not exist " + "and failed to copy from blob store.");
    }

    @Override
    public void delete(BlobKey key) throws IOException {
        File localFile = BlobUtils.getStorageLocation(this.storageDir, key);
        if (localFile.exists() && !localFile.delete()) {
            LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath());
        }
        this.blobStore.delete(key);
    }

    @Override
    public int getPort() {
        return this.serverSocket.getLocalPort();
    }

    public boolean isShutdown() {
        return this.shutdownRequested.get();
    }

    ServerSocket getServerSocket() {
        return this.serverSocket;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void unregisterConnection(BlobServerConnection conn) {
        Set<BlobServerConnection> set = this.activeConnections;
        synchronized (set) {
            this.activeConnections.remove(conn);
            this.activeConnections.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<BlobServerConnection> getCurrentActiveConnections() {
        Set<BlobServerConnection> set = this.activeConnections;
        synchronized (set) {
            return new ArrayList<BlobServerConnection>(this.activeConnections);
        }
    }
}

