/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.simulacron.server;

import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.request.Batch;
import com.datastax.oss.protocol.internal.request.Execute;
import com.datastax.oss.protocol.internal.request.Options;
import com.datastax.oss.protocol.internal.request.Prepare;
import com.datastax.oss.protocol.internal.request.Register;
import com.datastax.oss.protocol.internal.request.Startup;
import com.datastax.oss.protocol.internal.response.Ready;
import com.datastax.oss.protocol.internal.response.Supported;
import com.datastax.oss.protocol.internal.response.error.Unprepared;
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
import com.datastax.oss.simulacron.common.cluster.AbstractDataCenter;
import com.datastax.oss.simulacron.common.cluster.AbstractNode;
import com.datastax.oss.simulacron.common.cluster.ActivityLog;
import com.datastax.oss.simulacron.common.cluster.ClusterConnectionReport;
import com.datastax.oss.simulacron.common.cluster.ClusterQueryLogReport;
import com.datastax.oss.simulacron.common.cluster.NodeConnectionReport;
import com.datastax.oss.simulacron.common.cluster.NodeProperties;
import com.datastax.oss.simulacron.common.cluster.NodeQueryLogReport;
import com.datastax.oss.simulacron.common.cluster.NodeSpec;
import com.datastax.oss.simulacron.common.cluster.QueryLog;
import com.datastax.oss.simulacron.common.request.Query;
import com.datastax.oss.simulacron.common.request.Request;
import com.datastax.oss.simulacron.common.result.Result;
import com.datastax.oss.simulacron.common.stubbing.Action;
import com.datastax.oss.simulacron.common.stubbing.CloseType;
import com.datastax.oss.simulacron.common.stubbing.DisconnectAction;
import com.datastax.oss.simulacron.common.stubbing.MessageResponseAction;
import com.datastax.oss.simulacron.common.stubbing.NoResponseAction;
import com.datastax.oss.simulacron.common.stubbing.Prime;
import com.datastax.oss.simulacron.common.stubbing.PrimeDsl;
import com.datastax.oss.simulacron.common.stubbing.StubMapping;
import com.datastax.oss.simulacron.common.utils.FrameUtils;
import com.datastax.oss.simulacron.server.BindNodeException;
import com.datastax.oss.simulacron.server.BoundCluster;
import com.datastax.oss.simulacron.server.BoundDataCenter;
import com.datastax.oss.simulacron.server.BoundTopic;
import com.datastax.oss.simulacron.server.ChannelUtils;
import com.datastax.oss.simulacron.server.FrameCodecUtils;
import com.datastax.oss.simulacron.server.FrameCodecWrapper;
import com.datastax.oss.simulacron.server.QueryListenerWrapper;
import com.datastax.oss.simulacron.server.RejectScope;
import com.datastax.oss.simulacron.server.Server;
import com.datastax.oss.simulacron.server.StubStore;
import com.datastax.oss.simulacron.server.UnsupportedProtocolVersionMessage;
import com.datastax.oss.simulacron.server.listener.QueryListener;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.math.BigInteger;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BoundNode
extends AbstractNode<BoundCluster, BoundDataCenter>
implements BoundTopic<NodeConnectionReport, NodeQueryLogReport> {
    static final Predicate<QueryLog> ALWAYS_TRUE = x -> true;
    private static Logger logger = LoggerFactory.getLogger(BoundNode.class);
    private static final Pattern useKeyspacePattern = Pattern.compile("\\s*use\\s+(.*)$", 2);
    private final transient ServerBootstrap bootstrap;
    final transient AtomicReference<Channel> channel;
    final transient ChannelGroup clientChannelGroup = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    private final transient AtomicReference<RejectState> rejectState = new AtomicReference<RejectState>(new RejectState());
    private final transient Timer timer;
    private final transient StubStore stubStore;
    private final boolean activityLogging;
    private final Server server;
    private final BoundCluster cluster;
    private final transient List<QueryListenerWrapper> queryListeners = new ArrayList<QueryListenerWrapper>();
    final transient ActivityLog activityLog = new ActivityLog();
    private final transient FrameCodecWrapper frameCodec;

    BoundNode(SocketAddress address, NodeSpec delegate, Map<String, Object> peerInfo, BoundCluster cluster, BoundDataCenter parent, Server server, Timer timer, Channel channel, boolean activityLogging) {
        super(address, delegate.getName(), Long.valueOf(delegate.getId() != null ? delegate.getId() : 0L), delegate.getHostId() != null ? delegate.getHostId() : UUID.randomUUID(), delegate.getCassandraVersion(), delegate.getDSEVersion(), peerInfo, (AbstractDataCenter)parent);
        this.cluster = cluster;
        this.server = server;
        this.bootstrap = server != null ? server.serverBootstrap : null;
        this.timer = timer;
        this.channel = new AtomicReference<Channel>(channel);
        this.stubStore = new StubStore();
        this.activityLogging = activityLogging;
        this.frameCodec = FrameCodecUtils.buildFrameCodec((NodeProperties)delegate).orElse(parent.getFrameCodec());
    }

    public Long getActiveConnections() {
        return this.clientChannelGroup.stream().filter(Channel::isActive).count();
    }

    private CompletableFuture<Void> unbind() {
        logger.debug("Unbinding listener on {}", this.channel);
        return ChannelUtils.completable(this.channel.get().close()).thenApply(v -> null);
    }

    private CompletableFuture<Void> rebind() {
        if (this.channel.get().isOpen()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ChannelFuture bindFuture = this.bootstrap.bind(this.getAddress());
        bindFuture.addListener((GenericFutureListener)((ChannelFutureListener)channelFuture -> {
            if (channelFuture.isSuccess()) {
                channelFuture.channel().attr(Server.HANDLER).set((Object)this);
                logger.debug("Bound {} to {}", (Object)this, (Object)channelFuture.channel());
                future.complete(null);
                this.channel.set(channelFuture.channel());
            } else {
                future.completeExceptionally(new BindNodeException(this, this.getAddress(), channelFuture.cause()));
            }
        }));
        return future;
    }

    private CompletionStage<Void> disconnectConnections() {
        return ChannelUtils.completable(this.clientChannelGroup.disconnect()).thenApply(v -> null);
    }

    @Override
    public CompletionStage<Void> acceptConnectionsAsync() {
        logger.debug("Accepting New Connections");
        this.rejectState.set(new RejectState());
        if (!this.channel.get().isOpen()) {
            return this.rebind();
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    @JsonIgnore
    public NodeQueryLogReport getLogs() {
        ClusterQueryLogReport clusterQueryLogReportReport = new ClusterQueryLogReport(this.cluster.getId());
        return clusterQueryLogReportReport.addNode((AbstractNode)this, this.activityLog.getLogs());
    }

    @Override
    @JsonIgnore
    public NodeQueryLogReport getLogs(boolean primed) {
        ClusterQueryLogReport clusterQueryLogReportReport = new ClusterQueryLogReport(this.cluster.getId());
        return clusterQueryLogReportReport.addNode((AbstractNode)this, this.activityLog.getLogs(primed));
    }

    @Override
    public void clearLogs() {
        this.activityLog.clear();
    }

    @Override
    public void registerQueryListener(QueryListener queryListener, boolean after, Predicate<QueryLog> filter) {
        this.queryListeners.add(new QueryListenerWrapper(queryListener, after, filter));
    }

    @Override
    public CompletionStage<Void> rejectConnectionsAsync(int after, RejectScope scope) {
        RejectState state;
        if (after <= 0) {
            logger.debug("Rejecting new connections with scope {}", (Object)scope);
            state = new RejectState(false, Integer.MIN_VALUE, scope);
        } else {
            logger.debug("Rejecting new connections after {} attempts with scope {}", (Object)after, (Object)scope);
            state = new RejectState(true, after, scope);
        }
        this.rejectState.set(state);
        if (after <= 0 && scope != RejectScope.REJECT_STARTUP) {
            CompletableFuture<Void> unbindFuture = this.unbind();
            if (scope == RejectScope.STOP) {
                return unbindFuture.thenCompose(n -> this.disconnectConnections());
            }
            return unbindFuture;
        }
        return CompletableFuture.completedFuture(null);
    }

    private Optional<StubMapping> find(Frame frame) {
        Optional<StubMapping> stub = this.stubStore.find(this, frame);
        if (!stub.isPresent()) {
            return ((BoundDataCenter)this.getDataCenter()).find(this, frame);
        }
        return stub;
    }

    void handle(ChannelHandlerContext ctx, UnsupportedProtocolVersionMessage message) {
        if (this.activityLogging) {
            QueryLog queryLog = this.activityLog.addLog(message.getFrame(), ctx.channel().remoteAddress(), System.currentTimeMillis(), Optional.empty());
            this.notifyQueryListeners(queryLog, false);
        }
    }

    void handle(ChannelHandlerContext ctx, Frame frame) {
        logger.debug("Got request streamId: {} msg: {}", (Object)frame.streamId, (Object)frame.message);
        Optional<StubMapping> stubOption = this.find(frame);
        List actions = null;
        if (stubOption.isPresent()) {
            StubMapping stub = stubOption.get();
            actions = stub.getActions((AbstractNode)this, frame);
        }
        QueryLog queryLog = null;
        if (this.activityLogging) {
            queryLog = this.activityLog.addLog(frame, ctx.channel().remoteAddress(), System.currentTimeMillis(), stubOption);
            this.notifyQueryListeners(queryLog, false);
        }
        if (actions != null && !actions.isEmpty()) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            this.handleActions(actions.iterator(), ctx, frame, future, queryLog);
        } else {
            CompletableFuture<Void> deferFuture = null;
            Ready response = null;
            if (frame.message instanceof Startup || frame.message instanceof Register) {
                RejectState state = this.rejectState.get();
                if (!state.listeningForNewConnections) {
                    return;
                }
                if (state.rejectAfter > 0) {
                    state.rejectAfter--;
                    if (state.rejectAfter == 0) {
                        state.rejectAfter = -1;
                        state.listeningForNewConnections = false;
                        deferFuture = this.rejectConnectionsAsync(-1, state.scope).toCompletableFuture();
                    }
                }
                response = new Ready();
            } else if (frame.message instanceof Options) {
                HashMap<String, List<String>> options = new HashMap<String, List<String>>();
                options.put("PROTOCOL_VERSIONS", Arrays.asList("3/v3", "4/v4", "5/v5-beta"));
                options.put("CQL_VERSION", Collections.singletonList("3.4.4"));
                options.put("COMPRESSION", Arrays.asList("snappy", "lz4"));
                response = new Supported(options);
            } else if (frame.message instanceof com.datastax.oss.protocol.internal.request.Query) {
                com.datastax.oss.protocol.internal.request.Query query = (com.datastax.oss.protocol.internal.request.Query)frame.message;
                String queryStr = query.query;
                if (queryStr.startsWith("USE") || queryStr.startsWith("use")) {
                    Matcher matcher = useKeyspacePattern.matcher(queryStr);
                    assert (matcher.matches());
                    if (matcher.matches()) {
                        String keyspace = matcher.group(1).replaceAll("^\"|\"$", "");
                        response = new SetKeyspace(keyspace);
                    }
                } else {
                    response = com.datastax.oss.protocol.internal.response.result.Void.INSTANCE;
                }
            } else if (frame.message instanceof Batch) {
                response = com.datastax.oss.protocol.internal.response.result.Void.INSTANCE;
            } else if (frame.message instanceof Execute) {
                Execute execute = (Execute)frame.message;
                String hex = new BigInteger(1, execute.queryId).toString(16);
                response = new Unprepared("No prepared statement with id: " + hex, execute.queryId);
            } else if (frame.message instanceof Prepare) {
                Prepare prepare = (Prepare)frame.message;
                String query = prepare.cqlQuery;
                Prime prime = BoundNode.whenWithInferredParams(query).then((Result)PrimeDsl.noRows()).build();
                ((BoundCluster)this.getCluster()).getStubStore().registerInternal(prime);
                response = prime.toPrepared();
            }
            if (response != null) {
                QueryLog fQueryLog = queryLog;
                if (deferFuture != null) {
                    Ready fResponse = response;
                    deferFuture.thenRun(() -> this.lambda$handle$6(ctx, frame, (Message)fResponse, fQueryLog));
                } else {
                    this.sendMessage(ctx, frame, (Message)response).addListener(x -> this.notifyQueryListeners(fQueryLog, true));
                }
            } else {
                this.notifyQueryListeners(queryLog, true);
            }
        }
    }

    private void notifyQueryListeners(QueryLog queryLog, boolean after) {
        if (queryLog != null && !this.queryListeners.isEmpty()) {
            for (QueryListenerWrapper wrapper : this.queryListeners) {
                if (after != wrapper.after) continue;
                wrapper.apply(this, queryLog);
            }
        }
        ((BoundDataCenter)this.getDataCenter()).notifyQueryListeners(this, queryLog, after);
    }

    private void handleActions(Iterator<Action> nextActions, ChannelHandlerContext ctx, Frame frame, CompletableFuture<Void> doneFuture, QueryLog queryLog) {
        if (!nextActions.hasNext()) {
            doneFuture.complete(null);
            this.notifyQueryListeners(queryLog, true);
            return;
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Action action = nextActions.next();
        ActionHandler handler = new ActionHandler(action, ctx, frame, future);
        if (action.delayInMs() > 0L) {
            this.timer.newTimeout((TimerTask)handler, action.delayInMs().longValue(), TimeUnit.MILLISECONDS);
        } else {
            handler.run(null);
        }
        future.whenComplete((v, ex) -> {
            if (ex != null) {
                doneFuture.completeExceptionally((Throwable)ex);
            } else {
                this.handleActions(nextActions, ctx, frame, doneFuture, queryLog);
            }
        });
    }

    private static CompletableFuture<Void> closeNodes(Stream<BoundNode> nodes, CloseType closeType) {
        return CompletableFuture.allOf(nodes.map(n -> n.closeConnectionsAsync(closeType).toCompletableFuture()).collect(Collectors.toList()).toArray(new CompletableFuture[0]));
    }

    private ChannelFuture sendMessage(ChannelHandlerContext ctx, Frame requestFrame, Message responseMessage) {
        Frame responseFrame = FrameUtils.wrapResponse((Frame)requestFrame, (Message)responseMessage);
        logger.debug("Sending response for streamId: {} with msg {}", (Object)responseFrame.streamId, (Object)responseFrame.message);
        return ctx.writeAndFlush((Object)responseFrame);
    }

    @Override
    public StubStore getStubStore() {
        return this.stubStore;
    }

    @Override
    public int clearPrimes(boolean nested) {
        return this.stubStore.clear();
    }

    @Override
    public CompletionStage<BoundCluster> unregisterAsync() {
        return this.getServer().unregisterAsync(this);
    }

    public int clearPrimes() {
        return this.stubStore.clear();
    }

    @Override
    public NodeConnectionReport getConnections() {
        ClusterConnectionReport clusterConnectionReport = new ClusterConnectionReport(this.cluster.getId());
        return clusterConnectionReport.addNode((AbstractNode)this, this.clientChannelGroup.stream().map(Channel::remoteAddress).collect(Collectors.toList()), this.getAddress());
    }

    @Override
    public CompletionStage<NodeConnectionReport> closeConnectionsAsync(CloseType closeType) {
        NodeConnectionReport report = this.getConnections();
        return BoundNode.closeChannelGroup(this.clientChannelGroup, closeType).thenApply(v -> report);
    }

    private static CompletableFuture<Void> closeChannelGroup(ChannelGroup channelGroup, CloseType closeType) {
        switch (closeType) {
            case DISCONNECT: {
                return ChannelUtils.completable(channelGroup.disconnect());
            }
        }
        return CompletableFuture.allOf(channelGroup.stream().map(c -> {
            CompletableFuture<Void> f;
            Function<SocketChannel, ChannelFuture> shutdownMethod;
            Function<SocketChannel, ChannelFuture> function = shutdownMethod = closeType == CloseType.SHUTDOWN_READ ? DuplexChannel::shutdownInput : DuplexChannel::shutdownOutput;
            if (c instanceof SocketChannel) {
                f = ChannelUtils.completable(shutdownMethod.apply((SocketChannel)c));
            } else {
                logger.warn("Got {} request for non-SocketChannel {}, disconnecting instead.", (Object)closeType, c);
                f = ChannelUtils.completable(c.disconnect());
            }
            return f;
        }).collect(Collectors.toList()).toArray(new CompletableFuture[0]));
    }

    @Override
    public CompletionStage<NodeConnectionReport> closeConnectionAsync(SocketAddress connection, CloseType type) {
        Optional<Channel> channel = this.clientChannelGroup.stream().filter(c -> c.remoteAddress().equals(connection)).findFirst();
        if (channel.isPresent()) {
            DefaultChannelGroup channelGroup = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
            channelGroup.add((Object)channel.get());
            ClusterConnectionReport clusterReport = new ClusterConnectionReport(((BoundCluster)this.getCluster()).getId());
            NodeConnectionReport report = clusterReport.addNode((AbstractNode)this, Collections.singletonList(connection), this.getAddress());
            return BoundNode.closeChannelGroup((ChannelGroup)channelGroup, type).thenApply(f -> report);
        }
        CompletableFuture<NodeConnectionReport> failedFuture = new CompletableFuture<NodeConnectionReport>();
        failedFuture.completeExceptionally(new IllegalArgumentException("Not found"));
        return failedFuture;
    }

    @Override
    public Collection<BoundNode> getNodes() {
        return Collections.singleton(this);
    }

    @Override
    public Server getServer() {
        return this.server;
    }

    @Override
    @JsonIgnore
    public FrameCodecWrapper getFrameCodec() {
        return this.frameCodec;
    }

    private static PrimeDsl.PrimeBuilder whenWithInferredParams(String query) {
        long posParamCount = query.chars().filter(num -> num == 63).count();
        HashMap<String, String> paramTypes = new HashMap<String, String>();
        HashMap<String, String> params = new HashMap<String, String>();
        if (posParamCount > 0L) {
            int i = 0;
            while ((long)i < posParamCount) {
                params.put(Integer.toString(i), "*");
                paramTypes.put(Integer.toString(i), "varchar");
                ++i;
            }
        } else {
            ArrayList<String> allMatches = new ArrayList<String>();
            Pattern p = Pattern.compile("([\\w']+)\\s=\\s:[\\w]+");
            Matcher m = p.matcher(query);
            while (m.find()) {
                allMatches.add(m.group(1));
            }
            for (String match : allMatches) {
                params.put(match, "*");
                paramTypes.put(match, "varchar");
            }
        }
        return PrimeDsl.when((Request)new Query(query, Collections.emptyList(), params, paramTypes));
    }

    private /* synthetic */ void lambda$handle$6(ChannelHandlerContext ctx, Frame frame, Message fResponse, QueryLog fQueryLog) {
        this.sendMessage(ctx, frame, fResponse).addListener(x -> this.notifyQueryListeners(fQueryLog, true));
    }

    private class ActionHandler
    implements TimerTask {
        private final Action action;
        private final ChannelHandlerContext ctx;
        private final Frame frame;
        private final CompletableFuture<Void> doneFuture;

        ActionHandler(Action action, ChannelHandlerContext ctx, Frame frame, CompletableFuture<Void> doneFuture) {
            this.action = action;
            this.ctx = ctx;
            this.frame = frame;
            this.doneFuture = doneFuture;
        }

        public void run(Timeout timeout) {
            CompletableFuture future;
            if (this.action instanceof MessageResponseAction) {
                MessageResponseAction mAction = (MessageResponseAction)this.action;
                future = ChannelUtils.completable(BoundNode.this.sendMessage(this.ctx, this.frame, mAction.getMessage()));
            } else if (this.action instanceof DisconnectAction) {
                DisconnectAction cAction = (DisconnectAction)this.action;
                switch (cAction.getScope()) {
                    case CONNECTION: {
                        future = BoundNode.this.closeConnectionAsync(this.ctx.channel().remoteAddress(), cAction.getCloseType()).toCompletableFuture().thenApply(v -> null);
                        break;
                    }
                    default: {
                        Stream<BoundNode> nodes = cAction.getScope() == DisconnectAction.Scope.NODE ? Stream.of(BoundNode.this) : (cAction.getScope() == DisconnectAction.Scope.CLUSTER ? ((BoundCluster)BoundNode.this.getCluster()).getNodes().stream() : ((BoundDataCenter)BoundNode.this.getDataCenter()).getNodes().stream());
                        future = BoundNode.closeNodes(nodes, cAction.getCloseType());
                        break;
                    }
                }
            } else if (this.action instanceof NoResponseAction) {
                future = new CompletableFuture<Object>();
                future.complete(null);
            } else {
                logger.warn("Got action {} that we don't know how to handle.", (Object)this.action);
                future = new CompletableFuture();
                future.complete(null);
            }
            future.whenComplete((v, t) -> {
                if (t != null) {
                    this.doneFuture.completeExceptionally((Throwable)t);
                } else {
                    this.doneFuture.complete((Void)v);
                }
            });
        }
    }

    private static class RejectState {
        private final RejectScope scope;
        private volatile int rejectAfter;
        private volatile boolean listeningForNewConnections;

        RejectState() {
            this(true, Integer.MIN_VALUE, null);
        }

        RejectState(boolean listeningForNewConnections, int rejectAfter, RejectScope scope) {
            this.listeningForNewConnections = listeningForNewConnections;
            this.rejectAfter = rejectAfter;
            this.scope = scope;
        }
    }
}

