/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.events.endpoint.EndpointConnectedEvent;
import com.couchbase.client.core.cnc.events.endpoint.EndpointConnectionAbortedEvent;
import com.couchbase.client.core.cnc.events.endpoint.EndpointConnectionFailedEvent;
import com.couchbase.client.core.cnc.events.endpoint.EndpointConnectionIgnoredEvent;
import com.couchbase.client.core.cnc.events.endpoint.EndpointDisconnectedEvent;
import com.couchbase.client.core.cnc.events.endpoint.EndpointDisconnectionFailedEvent;
import com.couchbase.client.core.cnc.events.endpoint.EndpointStateChangedEvent;
import com.couchbase.client.core.cnc.events.endpoint.EndpointWriteFailedEvent;
import com.couchbase.client.core.cnc.events.endpoint.UnexpectedEndpointConnectionFailedEvent;
import com.couchbase.client.core.cnc.events.endpoint.UnexpectedEndpointDisconnectedEvent;
import com.couchbase.client.core.deps.io.netty.bootstrap.Bootstrap;
import com.couchbase.client.core.deps.io.netty.buffer.ByteBufAllocator;
import com.couchbase.client.core.deps.io.netty.buffer.PooledByteBufAllocator;
import com.couchbase.client.core.deps.io.netty.channel.Channel;
import com.couchbase.client.core.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.core.deps.io.netty.channel.ChannelHandler;
import com.couchbase.client.core.deps.io.netty.channel.ChannelInitializer;
import com.couchbase.client.core.deps.io.netty.channel.ChannelOption;
import com.couchbase.client.core.deps.io.netty.channel.ChannelPipeline;
import com.couchbase.client.core.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.core.deps.io.netty.channel.WriteBufferWaterMark;
import com.couchbase.client.core.deps.io.netty.channel.epoll.EpollChannelOption;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.InternalEndpointDiagnostics;
import com.couchbase.client.core.endpoint.CircuitBreaker;
import com.couchbase.client.core.endpoint.CircuitBreakerConfig;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.endpoint.EndpointContext;
import com.couchbase.client.core.endpoint.EndpointState;
import com.couchbase.client.core.endpoint.LazyCircuitBreaker;
import com.couchbase.client.core.endpoint.NoopCircuitBreaker;
import com.couchbase.client.core.endpoint.PipelineInitializer;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.IoConfig;
import com.couchbase.client.core.env.SecurityConfig;
import com.couchbase.client.core.error.BucketNotFoundException;
import com.couchbase.client.core.error.SecurityException;
import com.couchbase.client.core.io.netty.EventLoopGroups;
import com.couchbase.client.core.io.netty.PipelineErrorHandler;
import com.couchbase.client.core.io.netty.SslHandlerFactory;
import com.couchbase.client.core.io.netty.SslSessionLoggingHandler;
import com.couchbase.client.core.io.netty.TrafficCaptureHandler;
import com.couchbase.client.core.io.netty.kv.ChannelAttributes;
import com.couchbase.client.core.io.netty.kv.ConnectTimings;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.retry.RetryOrchestrator;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.service.ServiceContext;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbThrowables;
import com.couchbase.client.core.util.HostAndPort;
import com.couchbase.client.core.util.SingleStateful;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

public abstract class BaseEndpoint
implements Endpoint {
    private static final Logger log = LoggerFactory.getLogger(BaseEndpoint.class);
    private static final String ALLOCATOR_SYSTEM_PROPERTY_NAME = "com.couchbase.client.core.deps.io.netty.allocator.type";
    private static final ByteBufAllocator allocator;
    private final SingleStateful<EndpointState> state;
    final AtomicReference<EndpointContext> endpointContext;
    private final AtomicBoolean disconnect = new AtomicBoolean(false);
    private final CircuitBreaker circuitBreaker;
    private final boolean circuitBreakerEnabled;
    private final AtomicInteger outstandingRequests;
    private final EventLoopGroup eventLoopGroup;
    private final CircuitBreaker.CompletionCallback circuitBreakerCallback;
    private final ServiceType serviceType;
    private final boolean pipelined;
    private final String hostname;
    private final int port;
    volatile Channel channel;
    private volatile long lastResponseTimestamp;
    private volatile long lastConnectedAt;
    private volatile Throwable lastConnectAttemptFailure;

    BaseEndpoint(String hostname, int port, EventLoopGroup eventLoopGroup, ServiceContext serviceContext, CircuitBreakerConfig circuitBreakerConfig, ServiceType serviceType, boolean pipelined) {
        this.hostname = hostname;
        this.port = port;
        this.pipelined = pipelined;
        if (circuitBreakerConfig.enabled()) {
            this.circuitBreaker = new LazyCircuitBreaker(circuitBreakerConfig);
            this.circuitBreakerEnabled = true;
        } else {
            this.circuitBreaker = NoopCircuitBreaker.INSTANCE;
            this.circuitBreakerEnabled = false;
        }
        this.circuitBreakerCallback = circuitBreakerConfig.completionCallback();
        this.endpointContext = new AtomicReference<EndpointContext>(new EndpointContext(serviceContext, new HostAndPort(hostname, port), this.circuitBreaker, serviceType, Optional.empty(), serviceContext.bucket(), Optional.empty()));
        this.state = SingleStateful.fromInitial(EndpointState.DISCONNECTED, (from, to) -> serviceContext.environment().eventBus().publish(new EndpointStateChangedEvent(this.endpointContext.get(), (EndpointState)((Object)from), (EndpointState)((Object)to))));
        this.outstandingRequests = new AtomicInteger(0);
        this.lastResponseTimestamp = 0L;
        this.eventLoopGroup = eventLoopGroup;
        this.serviceType = serviceType;
    }

    @Override
    public EndpointContext context() {
        return this.endpointContext.get();
    }

    @Stability.Internal
    public static Class<? extends Channel> channelFrom(EventLoopGroup eventLoopGroup) {
        return EventLoopGroups.channelType(eventLoopGroup);
    }

    protected abstract PipelineInitializer pipelineInitializer();

    @Override
    public void connect() {
        if (this.state.compareAndTransition(EndpointState.DISCONNECTED, EndpointState.CONNECTING)) {
            this.reconnect();
        }
    }

    protected SocketAddress remoteAddress() {
        EndpointContext ctx = this.endpointContext.get();
        return InetSocketAddress.createUnresolved(ctx.remoteSocket().host(), ctx.remoteSocket().port());
    }

    private static String paddedHex(long input) {
        return String.format("%016X", input);
    }

    private static String getChannelId(Channel channel, EndpointContext endpointContext) {
        long convertedChannelId;
        String channelId = "0x" + channel.id().asShortText();
        try {
            convertedChannelId = channelId.equals("0xembedded") ? 1L : Long.decode(channelId);
        }
        catch (NumberFormatException ex) {
            convertedChannelId = new Random().nextInt();
        }
        String paddedChannelId = BaseEndpoint.paddedHex(convertedChannelId);
        return BaseEndpoint.paddedHex(endpointContext.id()) + "/" + paddedChannelId;
    }

    private void reconnect() {
        if (this.disconnect.get()) {
            return;
        }
        this.state.transition(EndpointState.CONNECTING);
        final EndpointContext endpointContext = this.endpointContext.get();
        AtomicLong attemptStart = new AtomicLong();
        Mono.defer(() -> {
            final CoreEnvironment env = endpointContext.environment();
            long connectTimeoutMs = env.timeoutConfig().connectTimeout().toMillis();
            if (this.eventLoopGroup.isShutdown()) {
                throw new IllegalStateException("Event Loop is already shut down, not pursuing connect attempt!");
            }
            IoConfig io = env.ioConfig();
            WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(io.lowWaterMark().bytesAsInt(), io.highWaterMark().bytesAsInt());
            Bootstrap channelBootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().remoteAddress(this.remoteAddress()).group(this.eventLoopGroup)).channel(BaseEndpoint.channelFrom(this.eventLoopGroup))).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)connectTimeoutMs)).option(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark)).option(ChannelOption.ALLOCATOR, allocator)).handler(new ChannelInitializer<Channel>(){

                @Override
                protected void initChannel(Channel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    ch.attr(ChannelAttributes.CHANNEL_ID_KEY).set(BaseEndpoint.getChannelId(ch, endpointContext));
                    SecurityConfig config = env.securityConfig();
                    if (config.tlsEnabled()) {
                        try {
                            pipeline.addFirst(SslHandlerFactory.get(ch.alloc(), config, endpointContext), SslSessionLoggingHandler.INSTANCE);
                        }
                        catch (Exception e) {
                            throw new SecurityException("Could not instantiate SSL Handler", e);
                        }
                    }
                    if (env.ioConfig().servicesToCapture().contains((Object)BaseEndpoint.this.serviceType)) {
                        pipeline.addLast(TrafficCaptureHandler.class.getName(), (ChannelHandler)new TrafficCaptureHandler(endpointContext));
                    }
                    BaseEndpoint.this.pipelineInitializer().init(BaseEndpoint.this, pipeline);
                    pipeline.addLast(new PipelineErrorHandler(BaseEndpoint.this));
                }
            });
            Optional.ofNullable(io.sendBuffer()).ifPresent(it -> channelBootstrap.option(ChannelOption.SO_SNDBUF, it.bytesAsInt()));
            Optional.ofNullable(io.receiveBuffer()).ifPresent(it -> channelBootstrap.option(ChannelOption.SO_RCVBUF, it.bytesAsInt()));
            if (env.ioConfig().tcpKeepAlivesEnabled() && !EventLoopGroups.isLocal(this.eventLoopGroup)) {
                channelBootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                if (EventLoopGroups.isEpoll(this.eventLoopGroup)) {
                    channelBootstrap.option(EpollChannelOption.TCP_KEEPIDLE, (int)TimeUnit.MILLISECONDS.toSeconds(env.ioConfig().tcpKeepAliveTime().toMillis()));
                }
            }
            this.state.transition(EndpointState.CONNECTING);
            attemptStart.set(System.nanoTime());
            return this.channelFutureIntoMono(channelBootstrap.connect()).publishOn(endpointContext.environment().scheduler());
        }).timeout(endpointContext.environment().timeoutConfig().connectTimeout()).onErrorResume(throwable -> {
            this.state.transition(EndpointState.DISCONNECTED);
            if (this.disconnect.get()) {
                endpointContext.environment().eventBus().publish(new EndpointConnectionAbortedEvent(Duration.ofNanos(System.nanoTime() - attemptStart.get()), endpointContext, ConnectTimings.toMap(this.channel)));
                return Mono.empty();
            }
            return Mono.error((Throwable)throwable);
        }).retryWhen(Retry.any().exponentialBackoff(Duration.ofMillis(32L), Duration.ofMillis(4096L)).retryMax(Long.MAX_VALUE).doOnRetry(retryContext -> {
            Throwable ex = retryContext.exception();
            Event.Severity severity = ex instanceof BucketNotFoundException ? Event.Severity.DEBUG : Event.Severity.WARN;
            Duration duration = ex instanceof TimeoutException ? endpointContext.environment().timeoutConfig().connectTimeout() : Duration.ofNanos(System.nanoTime() - attemptStart.get());
            this.lastConnectAttemptFailure = ex = BaseEndpoint.trimNettyFromStackTrace(BaseEndpoint.annotateConnectException(ex));
            endpointContext.environment().eventBus().publish(new EndpointConnectionFailedEvent(severity, duration, endpointContext, retryContext.iteration(), ex));
        }).toReactorRetry()).subscribe(channel -> {
            long now = System.nanoTime();
            if (this.disconnect.get()) {
                this.channel = null;
                endpointContext.environment().eventBus().publish(new EndpointConnectionIgnoredEvent(Duration.ofNanos(now - attemptStart.get()), endpointContext, ConnectTimings.toMap(channel)));
                this.closeChannel((Channel)channel);
            } else {
                this.lastConnectAttemptFailure = null;
                this.channel = channel;
                Optional<HostAndPort> localSocket = Optional.empty();
                if (channel.localAddress() instanceof InetSocketAddress) {
                    InetSocketAddress so = (InetSocketAddress)channel.localAddress();
                    localSocket = Optional.of(new HostAndPort(so.getHostString(), so.getPort()));
                }
                EndpointContext newContext = new EndpointContext(endpointContext, endpointContext.remoteSocket(), endpointContext.circuitBreaker(), endpointContext.serviceType(), localSocket, endpointContext.bucket(), Optional.ofNullable(channel.attr(ChannelAttributes.CHANNEL_ID_KEY).get()));
                this.endpointContext.get().environment().eventBus().publish(new EndpointConnectedEvent(Duration.ofNanos(now - attemptStart.get()), newContext, ConnectTimings.toMap(channel)));
                this.endpointContext.set(newContext);
                this.circuitBreaker.reset();
                this.lastConnectedAt = now;
                this.state.transition(EndpointState.CONNECTED);
            }
        }, error -> endpointContext.environment().eventBus().publish(new UnexpectedEndpointConnectionFailedEvent(Duration.ofNanos(System.nanoTime() - attemptStart.get()), endpointContext, (Throwable)error)));
    }

    private static Throwable annotateConnectException(Throwable ex) {
        if (!(ex instanceof ConnectException)) {
            return ex;
        }
        return new ConnectException(ex.getMessage() + " - Check server ports and cluster encryption setting."){

            @Override
            public synchronized Throwable fillInStackTrace() {
                return this;
            }
        };
    }

    private static Throwable trimNettyFromStackTrace(Throwable input) {
        CbThrowables.filterStackTrace(input, frame -> !frame.getClassName().startsWith("com.couchbase.client.core.deps.io.netty"));
        return input;
    }

    @Override
    public void disconnect() {
        if (this.disconnect.compareAndSet(false, true)) {
            this.state.transition(EndpointState.DISCONNECTING);
            this.closeChannel(this.channel);
        }
    }

    @Override
    public boolean receivedDisconnectSignal() {
        return this.disconnect.get();
    }

    @Override
    public String remoteHostname() {
        return this.hostname;
    }

    @Override
    public int remotePort() {
        return this.port;
    }

    @Stability.Internal
    public void notifyChannelInactive() {
        int outstandingBeforeInactive = this.outstandingRequests.getAndSet(0);
        if (this.disconnect.get()) {
            return;
        }
        long connectedSince = System.nanoTime() - this.lastConnectedAt;
        if (this.state() == EndpointState.CONNECTED) {
            this.endpointContext.get().environment().eventBus().publish(new UnexpectedEndpointDisconnectedEvent(this.endpointContext.get(), outstandingBeforeInactive, connectedSince));
            this.state.transition(EndpointState.DISCONNECTED);
            this.connect();
        }
    }

    void closeChannel(Channel channel) {
        if (channel != null && !channel.eventLoop().isShutdown()) {
            EndpointContext endpointContext = this.endpointContext.get();
            long start = System.nanoTime();
            channel.disconnect().addListener(future -> {
                Duration latency = Duration.ofNanos(System.nanoTime() - start);
                this.state.transition(EndpointState.DISCONNECTED);
                this.state.close();
                if (future.isSuccess()) {
                    endpointContext.environment().eventBus().publish(new EndpointDisconnectedEvent(latency, endpointContext));
                } else {
                    endpointContext.environment().eventBus().publish(new EndpointDisconnectionFailedEvent(latency, endpointContext, future.cause()));
                }
            });
        } else {
            this.state.transition(EndpointState.DISCONNECTED);
            this.state.close();
        }
    }

    @Override
    public <R extends Request<? extends Response>> void send(R request) {
        if (request.timeoutElapsed()) {
            request.cancel(CancellationReason.TIMEOUT);
        }
        if (request.completed()) {
            return;
        }
        EndpointContext ctx = this.endpointContext.get();
        if (this.canWrite()) {
            request.context().lastDispatchedFrom(ctx.localSocket().orElse(null)).lastDispatchedTo(ctx.remoteSocket()).lastChannelId(ctx.channelId().orElse(null));
            if (!this.pipelined) {
                this.outstandingRequests.incrementAndGet();
            }
            if (this.circuitBreakerEnabled) {
                this.circuitBreaker.track();
                request.response().whenComplete((response, throwable) -> {
                    if (((Boolean)this.circuitBreakerCallback.apply(response, throwable)).booleanValue()) {
                        this.circuitBreaker.markSuccess();
                    } else {
                        this.circuitBreaker.markFailure();
                    }
                });
            }
            this.channel.writeAndFlush(request).addListener(f -> {
                if (!f.isSuccess()) {
                    EndpointContext context = this.endpointContext.get();
                    Event.Severity severity = this.disconnect.get() ? Event.Severity.DEBUG : Event.Severity.WARN;
                    context.environment().eventBus().publish(new EndpointWriteFailedEvent(severity, context, f.cause()));
                    RetryOrchestrator.maybeRetry(context, request, RetryReason.ENDPOINT_NOT_WRITABLE);
                }
            });
        } else {
            RetryReason retryReason = this.circuitBreaker.allowsRequest() ? RetryReason.ENDPOINT_NOT_WRITABLE : RetryReason.ENDPOINT_CIRCUIT_OPEN;
            RetryOrchestrator.maybeRetry(this.endpointContext.get(), request, retryReason);
        }
    }

    @Override
    public boolean freeToWrite() {
        return this.pipelined || this.outstandingRequests.get() == 0;
    }

    @Override
    public long outstandingRequests() {
        return this.outstandingRequests.get();
    }

    @Override
    public long lastResponseReceived() {
        return this.lastResponseTimestamp;
    }

    @Stability.Internal
    public void markRequestCompletion() {
        this.decrementOutstandingRequests();
        this.lastResponseTimestamp = System.nanoTime();
    }

    @Stability.Internal
    public void decrementOutstandingRequests() {
        if (!this.pipelined) {
            this.outstandingRequests.decrementAndGet();
        }
    }

    @Override
    @Stability.Internal
    public long lastConnectedAt() {
        return this.lastConnectedAt;
    }

    private boolean canWrite() {
        return this.state.state() == EndpointState.CONNECTED && this.channel.isActive() && this.channel.isWritable() && this.circuitBreaker.allowsRequest() && this.freeToWrite();
    }

    @Override
    public EndpointState state() {
        return this.state.state();
    }

    @Override
    public Flux<EndpointState> states() {
        return this.state.states();
    }

    protected Mono<Channel> channelFutureIntoMono(ChannelFuture channelFuture) {
        CompletableFuture completableFuture = new CompletableFuture();
        channelFuture.addListener(f -> {
            if (f.isSuccess()) {
                completableFuture.complete(f.channel());
            } else if (!completableFuture.isCancelled()) {
                completableFuture.completeExceptionally(f.cause());
            }
        });
        return Mono.fromFuture(completableFuture).doFinally(signalType -> {
            if (signalType == SignalType.CANCEL) {
                completableFuture.cancel(false);
            }
        });
    }

    public boolean pipelined() {
        return this.pipelined;
    }

    @Override
    public EndpointDiagnostics diagnostics() {
        String remote = null;
        String local = null;
        if (this.channel != null) {
            SocketAddress remoteAddr = this.channel.remoteAddress();
            SocketAddress localAddr = this.channel.localAddress();
            if (remoteAddr instanceof InetSocketAddress) {
                InetSocketAddress ra = (InetSocketAddress)remoteAddr;
                remote = RedactableArgument.redactMeta(ra.getHostString()) + ":" + ra.getPort();
            }
            if (localAddr instanceof InetSocketAddress) {
                InetSocketAddress la = (InetSocketAddress)localAddr;
                local = RedactableArgument.redactMeta(la.getHostString()) + ":" + la.getPort();
            }
        }
        Optional<Long> lastActivity = this.lastResponseTimestamp == 0L ? Optional.empty() : Optional.of(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - this.lastResponseTimestamp));
        Optional<String> id = Optional.ofNullable(this.channel).map(c -> "0x" + c.id().asShortText());
        return new EndpointDiagnostics(this.context().serviceType(), this.state(), this.circuitBreaker.state(), local, remote, this.context().bucket(), lastActivity, id, Optional.ofNullable(this.lastConnectAttemptFailure()));
    }

    @Override
    public InternalEndpointDiagnostics internalDiagnostics() {
        EndpointContext ctx = this.context();
        return new InternalEndpointDiagnostics(this.diagnostics(), ctx.authenticationStatus(), ctx.tlsHandshakeFailure());
    }

    @Override
    public Throwable lastConnectAttemptFailure() {
        return this.lastConnectAttemptFailure;
    }

    static {
        String propertyValue = System.getProperty(ALLOCATOR_SYSTEM_PROPERTY_NAME);
        if (propertyValue == null) {
            allocator = PooledByteBufAllocator.DEFAULT;
            log.debug("ByteBufAllocator: Defaulting to {}. A future version of the Couchbase SDK might default to AdaptiveByteBufAllocator instead. If you'd like to use AdaptiveByteBufAllocator now, set the Java system property '{}' to 'adaptive'. If you'd like to isolate yourself from future changes to the default, set the property to 'pooled'.", (Object)allocator, (Object)ALLOCATOR_SYSTEM_PROPERTY_NAME);
        } else {
            allocator = ByteBufAllocator.DEFAULT;
            log.debug("ByteBufAllocator: Using {} because the Java system property '{}' was set to '{}'.", new Object[]{allocator, ALLOCATOR_SYSTEM_PROPERTY_NAME, propertyValue});
        }
    }
}

