/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.internal.net;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.neo4j.driver.internal.handlers.InitResponseHandler;
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
import org.neo4j.driver.internal.handlers.ResetAsyncResponseHandler;
import org.neo4j.driver.internal.logging.DelegatingLogger;
import org.neo4j.driver.internal.messaging.AckFailureMessage;
import org.neo4j.driver.internal.messaging.DiscardAllMessage;
import org.neo4j.driver.internal.messaging.InitMessage;
import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.PullAllMessage;
import org.neo4j.driver.internal.messaging.ResetMessage;
import org.neo4j.driver.internal.messaging.RunMessage;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.LoggingResponseHandler;
import org.neo4j.driver.internal.net.SocketClient;
import org.neo4j.driver.internal.net.SocketResponseHandler;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.summary.InternalServerInfo;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.summary.ServerInfo;

public class SocketConnection
implements Connection {
    private static final String LOG_NAME = "Connection";
    private final Queue<Message> pendingMessages = new LinkedList<Message>();
    private final SocketResponseHandler responseHandler;
    private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
    private final AtomicBoolean isAckFailureMuted = new AtomicBoolean(false);
    private InternalServerInfo serverInfo;
    private final SocketClient socket;

    SocketConnection(BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logging logging) {
        DelegatingLogger logger = new DelegatingLogger(logging.getLog(LOG_NAME), String.valueOf(this.hashCode()));
        this.socket = new SocketClient(address, securityPlan, timeoutMillis, logger);
        this.responseHandler = this.createResponseHandler(logger);
        this.startSocketClient();
    }

    public SocketConnection(SocketClient socket, InternalServerInfo serverInfo, Logger logger) {
        this.socket = socket;
        this.serverInfo = serverInfo;
        this.responseHandler = this.createResponseHandler(logger);
        this.startSocketClient();
    }

    private void startSocketClient() {
        try {
            this.socket.start();
        }
        catch (Throwable e) {
            this.close();
            throw e;
        }
    }

    private SocketResponseHandler createResponseHandler(Logger logger) {
        if (logger.isDebugEnabled()) {
            return new LoggingResponseHandler(logger);
        }
        return new SocketResponseHandler();
    }

    @Override
    public void init(String clientName, Map<String, Value> authToken) {
        InitResponseHandler initHandler = new InitResponseHandler();
        this.queueMessage(new InitMessage(clientName, authToken), initHandler);
        this.sync();
        this.serverInfo = new InternalServerInfo(this.socket.address(), initHandler.serverVersion());
        this.socket.updateProtocol(this.serverInfo.version());
    }

    @Override
    public void run(String statement, Map<String, Value> parameters, ResponseHandler handler) {
        this.queueMessage(new RunMessage(statement, parameters), handler);
    }

    @Override
    public void discardAll(ResponseHandler handler) {
        this.queueMessage(DiscardAllMessage.DISCARD_ALL, handler);
    }

    @Override
    public void pullAll(ResponseHandler handler) {
        this.queueMessage(PullAllMessage.PULL_ALL, handler);
    }

    @Override
    public void reset() {
        this.queueMessage(ResetMessage.RESET, NoOpResponseHandler.INSTANCE);
    }

    @Override
    public void ackFailure() {
        this.queueMessage(AckFailureMessage.ACK_FAILURE, new ResponseHandler(){

            @Override
            public void onSuccess(Map<String, Value> metadata) {
                SocketConnection.this.responseHandler.clearError();
            }

            @Override
            public void onFailure(Throwable error) {
            }

            @Override
            public void onRecord(Value[] fields) {
            }
        });
    }

    @Override
    public void sync() {
        this.flush();
        this.receiveAll();
    }

    @Override
    public synchronized void flush() {
        this.ensureNotInterrupted();
        try {
            this.socket.send(this.pendingMessages);
        }
        catch (IOException e) {
            this.close();
            throw new ServiceUnavailableException("Unable to send messages to server: " + e.getMessage(), e);
        }
    }

    private void ensureNotInterrupted() {
        try {
            if (this.isInterrupted.get()) {
                while (this.responseHandler.handlersWaiting() > 0) {
                    this.receiveOne();
                }
            }
        }
        catch (Neo4jException e) {
            throw new ClientException("An error has occurred due to the cancellation of executing a previous statement. You received this error probably because you did not consume the result immediately after running the statement which get reset in this session.", e);
        }
    }

    private void receiveAll() {
        try {
            this.socket.receiveAll(this.responseHandler);
            this.assertNoServerFailure();
        }
        catch (IOException e) {
            throw this.mapRecieveError(e);
        }
    }

    @Override
    public void receiveOne() {
        try {
            this.socket.receiveOne(this.responseHandler);
            this.assertNoServerFailure();
        }
        catch (IOException e) {
            throw this.mapRecieveError(e);
        }
    }

    private void assertNoServerFailure() {
        if (this.responseHandler.serverFailureOccurred()) {
            Neo4jException exception = this.responseHandler.serverFailure();
            this.responseHandler.clearError();
            this.isInterrupted.set(false);
            throw exception;
        }
    }

    private ClientException mapRecieveError(IOException e) {
        String message = e.getMessage();
        if (message == null) {
            return new ClientException("Unable to read response from server: " + e.getClass().getSimpleName(), e);
        }
        if (e instanceof SocketTimeoutException) {
            return new ClientException("Server did not reply within the network timeout limit.", e);
        }
        return new ClientException("Unable to read response from server: " + message, e);
    }

    private synchronized void queueMessage(Message msg, ResponseHandler handler) {
        this.ensureNotInterrupted();
        this.pendingMessages.add(msg);
        this.responseHandler.appendResponseHandler(handler);
    }

    @Override
    public void close() {
        this.socket.stop();
    }

    @Override
    public boolean isOpen() {
        return this.socket.isOpen();
    }

    @Override
    public synchronized void resetAsync() {
        this.queueMessage(ResetMessage.RESET, new ResetAsyncResponseHandler(new Runnable(){

            @Override
            public void run() {
                SocketConnection.this.isInterrupted.set(false);
                SocketConnection.this.isAckFailureMuted.set(false);
            }
        }));
        this.flush();
        this.isInterrupted.set(true);
        this.isAckFailureMuted.set(true);
    }

    @Override
    public boolean isAckFailureMuted() {
        return this.isAckFailureMuted.get();
    }

    @Override
    public ServerInfo server() {
        return this.serverInfo;
    }

    @Override
    public BoltServerAddress boltServerAddress() {
        return this.serverInfo.boltServerAddress();
    }
}

