/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobInputStream;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BlobClient
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(BlobClient.class);
    private Socket socket;

    public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) throws IOException {
        try {
            SSLContext clientSSLContext = null;
            if (clientConfig != null && clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
                clientSSLContext = SSLUtils.createSSLClientContext(clientConfig);
            }
            if (clientSSLContext != null) {
                LOG.info("Using ssl connection to the blob server");
                SSLSocket sslSocket = (SSLSocket)clientSSLContext.getSocketFactory().createSocket(serverAddress.getAddress(), serverAddress.getPort());
                if (!serverAddress.getAddress().isLoopbackAddress()) {
                    SSLParameters newSSLParameters = sslSocket.getSSLParameters();
                    SSLUtils.setSSLVerifyHostname(clientConfig, newSSLParameters);
                    sslSocket.setSSLParameters(newSSLParameters);
                }
                this.socket = sslSocket;
            } else {
                this.socket = new Socket();
                this.socket.connect(serverAddress);
            }
        }
        catch (Exception e) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("Could not connect to BlobServer at address " + serverAddress, e);
        }
    }

    /*
     * Exception decompiling
     */
    static void downloadFromBlobServer(@Nullable JobID jobId, BlobKey blobKey, File localJarFile, InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 4 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public void close() throws IOException {
        this.socket.close();
    }

    public boolean isClosed() {
        return this.socket.isClosed();
    }

    InputStream getInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("GET BLOB {}/{} from {}.", new Object[]{jobId, blobKey, this.socket.getLocalSocketAddress()});
        }
        try {
            OutputStream os = this.socket.getOutputStream();
            InputStream is = this.socket.getInputStream();
            BlobClient.sendGetHeader(os, jobId, blobKey);
            BlobClient.receiveAndCheckGetResponse(is);
            return new BlobInputStream(is, blobKey, os);
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("GET operation failed: " + t.getMessage(), t);
        }
    }

    private static void sendGetHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey blobKey) throws IOException {
        Preconditions.checkNotNull((Object)blobKey);
        Preconditions.checkArgument((jobId != null || blobKey instanceof TransientBlobKey ? 1 : 0) != 0, (Object)"permanent BLOBs must be job-related");
        outputStream.write(1);
        if (jobId == null) {
            outputStream.write(0);
        } else {
            outputStream.write(2);
            outputStream.write(jobId.getBytes());
        }
        blobKey.writeToOutputStream(outputStream);
    }

    private static void receiveAndCheckGetResponse(InputStream is) throws IOException {
        int response = is.read();
        if (response < 0) {
            throw new EOFException("Premature end of response");
        }
        if (response == 1) {
            Throwable cause = BlobClient.readExceptionFromStream(is);
            throw new IOException("Server side error: " + cause.getMessage(), cause);
        }
        if (response != 0) {
            throw new IOException("Unrecognized response");
        }
    }

    BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int offset, int len, BlobKey.BlobType blobType) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        Preconditions.checkNotNull((Object)value);
        if (LOG.isDebugEnabled()) {
            LOG.debug("PUT BLOB buffer (" + len + " bytes) to " + this.socket.getLocalSocketAddress() + ".");
        }
        try {
            OutputStream os = this.socket.getOutputStream();
            MessageDigest md = BlobUtils.createMessageDigest();
            BlobClient.sendPutHeader(os, jobId, blobType);
            int remainingBytes = len;
            while (remainingBytes > 0) {
                int bytesToSend = Math.min(65536, remainingBytes);
                BlobUtils.writeLength(bytesToSend, os);
                os.write(value, offset, bytesToSend);
                md.update(value, offset, bytesToSend);
                remainingBytes -= bytesToSend;
                offset += bytesToSend;
            }
            BlobUtils.writeLength(-1, os);
            InputStream is = this.socket.getInputStream();
            return BlobClient.receiveAndCheckPutResponse(is, md, blobType);
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("PUT operation failed: " + t.getMessage(), t);
        }
    }

    BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType) throws IOException {
        if (this.socket.isClosed()) {
            throw new IllegalStateException("BLOB Client is not connected. Client has been shut down or encountered an error before.");
        }
        Preconditions.checkNotNull((Object)inputStream);
        if (LOG.isDebugEnabled()) {
            LOG.debug("PUT BLOB stream to {}.", (Object)this.socket.getLocalSocketAddress());
        }
        try {
            OutputStream os = this.socket.getOutputStream();
            MessageDigest md = BlobUtils.createMessageDigest();
            byte[] xferBuf = new byte[65536];
            BlobClient.sendPutHeader(os, jobId, blobType);
            while (true) {
                int read;
                if ((read = inputStream.read(xferBuf)) < 0) break;
                if (read <= 0) continue;
                BlobUtils.writeLength(read, os);
                os.write(xferBuf, 0, read);
                md.update(xferBuf, 0, read);
            }
            BlobUtils.writeLength(-1, os);
            InputStream is = this.socket.getInputStream();
            return BlobClient.receiveAndCheckPutResponse(is, md, blobType);
        }
        catch (Throwable t) {
            BlobUtils.closeSilently(this.socket, LOG);
            throw new IOException("PUT operation failed: " + t.getMessage(), t);
        }
    }

    private static void sendPutHeader(OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType) throws IOException {
        outputStream.write(0);
        if (jobId == null) {
            outputStream.write(0);
        } else {
            outputStream.write(2);
            outputStream.write(jobId.getBytes());
        }
        outputStream.write(blobType.ordinal());
    }

    private static BlobKey receiveAndCheckPutResponse(InputStream is, MessageDigest md, BlobKey.BlobType blobType) throws IOException {
        int response = is.read();
        if (response < 0) {
            throw new EOFException("Premature end of response");
        }
        if (response == 0) {
            BlobKey remoteKey = BlobKey.readFromInputStream(is);
            byte[] localHash = md.digest();
            if (blobType != remoteKey.getType()) {
                throw new IOException("Detected data corruption during transfer");
            }
            if (!Arrays.equals(localHash, remoteKey.getHash())) {
                throw new IOException("Detected data corruption during transfer");
            }
            return remoteKey;
        }
        if (response == 1) {
            Throwable cause = BlobClient.readExceptionFromStream(is);
            throw new IOException("Server side error: " + cause.getMessage(), cause);
        }
        throw new IOException("Unrecognized response: " + response + '.');
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static List<PermanentBlobKey> uploadJarFiles(InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> jars) throws IOException {
        Preconditions.checkNotNull((Object)jobId);
        if (jars.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<PermanentBlobKey> blobKeys = new ArrayList<PermanentBlobKey>();
        try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig);){
            for (Path jar : jars) {
                FileSystem fs = jar.getFileSystem();
                FSDataInputStream is = null;
                try {
                    is = fs.open(jar);
                    PermanentBlobKey key = (PermanentBlobKey)blobClient.putInputStream(jobId, (InputStream)is, BlobKey.BlobType.PERMANENT_BLOB);
                    blobKeys.add(key);
                }
                finally {
                    if (is == null) continue;
                    is.close();
                }
            }
        }
        return blobKeys;
    }

    private static Throwable readExceptionFromStream(InputStream in) throws IOException {
        int len = BlobUtils.readLength(in);
        byte[] bytes = new byte[len];
        BlobUtils.readFully(in, bytes, 0, len, "Error message");
        try {
            return (Throwable)InstantiationUtil.deserializeObject((byte[])bytes, (ClassLoader)ClassLoader.getSystemClassLoader());
        }
        catch (ClassNotFoundException e) {
            throw new IOException("Could not transfer error message", e);
        }
    }
}

