/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming.async;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.net.AsyncChannelPromise;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamingMessageSender;
import org.apache.cassandra.streaming.async.StreamingInboundHandler;
import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyStreamingMessageSender
implements StreamingMessageSender {
    private static final Logger logger = LoggerFactory.getLogger(NettyStreamingMessageSender.class);
    private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = FBUtilities.getAvailableProcessors();
    private static final int MAX_PARALLEL_TRANSFERS = Integer.parseInt(System.getProperty("cassandra.streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
    private static final long DEFAULT_CLOSE_WAIT_IN_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final Semaphore fileTransferSemaphore = new Semaphore(DEFAULT_MAX_PARALLEL_TRANSFERS, true);
    private final StreamSession session;
    private final boolean isPreview;
    private final int streamingVersion;
    private final OutboundConnectionSettings template;
    private final StreamConnectionFactory factory;
    private volatile boolean closed;
    private Channel controlMessageChannel;
    private final Collection<ScheduledFuture<?>> channelKeepAlives = new LinkedBlockingQueue();
    private final ThreadPoolExecutor fileTransferExecutor;
    private final ConcurrentMap<Thread, Channel> threadToChannelMap = new ConcurrentHashMap<Thread, Channel>();
    @VisibleForTesting
    static final AttributeKey<Boolean> TRANSFERRING_FILE_ATTR = AttributeKey.valueOf((String)"transferringFile");

    public NettyStreamingMessageSender(StreamSession session, OutboundConnectionSettings template, StreamConnectionFactory factory, int streamingVersion, boolean isPreview) {
        this.session = session;
        this.streamingVersion = streamingVersion;
        this.template = template;
        this.factory = factory;
        this.isPreview = isPreview;
        String name = session.peer.toString().replace(':', '.');
        this.fileTransferExecutor = new DebuggableThreadPoolExecutor(1, MAX_PARALLEL_TRANSFERS, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("NettyStreaming-Outbound-" + name));
        this.fileTransferExecutor.allowCoreThreadTimeOut(true);
    }

    @Override
    public void initialize() {
        StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddressAndPort(), this.session.sessionIndex(), this.session.planId(), this.session.streamOperation(), this.session.getPendingRepair(), this.session.getPreviewKind());
        this.sendMessage(message);
    }

    public boolean hasControlChannel() {
        return this.controlMessageChannel != null;
    }

    public void injectControlMessageChannel(Channel channel) {
        this.controlMessageChannel = channel;
        channel.attr(TRANSFERRING_FILE_ATTR).set((Object)Boolean.FALSE);
        this.scheduleKeepAliveTask(channel);
    }

    private void setupControlMessageChannel() throws IOException {
        if (this.controlMessageChannel == null) {
            this.controlMessageChannel = this.createChannel();
            this.scheduleKeepAliveTask(this.controlMessageChannel);
        }
    }

    private void scheduleKeepAliveTask(Channel channel) {
        int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
        if (logger.isDebugEnabled()) {
            logger.debug("{} Scheduling keep-alive task with {}s period.", (Object)NettyStreamingMessageSender.createLogTag(this.session, channel), (Object)keepAlivePeriod);
        }
        KeepAliveTask task = new KeepAliveTask(channel, this.session);
        io.netty.util.concurrent.ScheduledFuture scheduledFuture = channel.eventLoop().scheduleAtFixedRate((Runnable)task, 0L, (long)keepAlivePeriod, TimeUnit.SECONDS);
        this.channelKeepAlives.add((ScheduledFuture<?>)scheduledFuture);
        task.future = scheduledFuture;
    }

    private Channel createChannel() throws IOException {
        Channel channel = this.factory.createConnection(this.template, this.streamingVersion);
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast("stream", (ChannelHandler)new StreamingInboundHandler(this.template.to, this.streamingVersion, this.session));
        channel.attr(TRANSFERRING_FILE_ATTR).set((Object)Boolean.FALSE);
        logger.debug("Creating channel id {} local {} remote {}", new Object[]{channel.id(), channel.localAddress(), channel.remoteAddress()});
        return channel;
    }

    static String createLogTag(StreamSession session, Channel channel) {
        StringBuilder sb = new StringBuilder(64);
        sb.append("[Stream");
        if (session != null) {
            sb.append(" #").append(session.planId());
        }
        if (channel != null) {
            sb.append(" channel: ").append(channel.id());
        }
        sb.append(']');
        return sb.toString();
    }

    @Override
    public void sendMessage(StreamMessage message) {
        if (this.closed) {
            throw new RuntimeException("stream has been closed, cannot send " + message);
        }
        if (message instanceof OutgoingStreamMessage) {
            if (this.isPreview) {
                throw new RuntimeException("Cannot send stream data messages for preview streaming sessions");
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{} Sending {}", (Object)NettyStreamingMessageSender.createLogTag(this.session, null), (Object)message);
            }
            this.fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage)message));
            return;
        }
        try {
            this.setupControlMessageChannel();
            this.sendControlMessage(this.controlMessageChannel, message, future -> this.onControlMessageComplete(future, message));
        }
        catch (Exception e) {
            this.close();
            this.session.onError(e);
        }
    }

    private void sendControlMessage(Channel channel, StreamMessage message, GenericFutureListener listener) throws IOException {
        long messageSize;
        if (logger.isDebugEnabled()) {
            logger.debug("{} Sending {}", (Object)NettyStreamingMessageSender.createLogTag(this.session, channel), (Object)message);
        }
        if ((messageSize = StreamMessage.serializedSize(message, this.streamingVersion)) > 0x40000000L) {
            throw new IllegalStateException(String.format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s", new Object[]{NettyStreamingMessageSender.createLogTag(this.session, channel), messageSize, message.type}));
        }
        ByteBuf buf = channel.alloc().directBuffer((int)messageSize, (int)messageSize);
        ByteBuffer nioBuf = buf.nioBuffer(0, (int)messageSize);
        DataOutputBufferFixed out = new DataOutputBufferFixed(nioBuf);
        StreamMessage.serialize(message, out, this.streamingVersion, this.session);
        assert (nioBuf.position() == nioBuf.limit());
        buf.writerIndex(nioBuf.position());
        AsyncChannelPromise.writeAndFlush(channel, (Object)buf, (GenericFutureListener<? extends Future<? super Void>>)listener);
    }

    java.util.concurrent.Future onControlMessageComplete(Future<?> future, StreamMessage msg) {
        ChannelFuture channelFuture = (ChannelFuture)future;
        Throwable cause = future.cause();
        if (cause == null) {
            return null;
        }
        Channel channel = channelFuture.channel();
        logger.error("{} failed to send a stream message/data to peer {}: msg = {}", new Object[]{NettyStreamingMessageSender.createLogTag(this.session, channel), this.template.to, msg, future.cause()});
        return this.session.onError(cause);
    }

    public void setClosed() {
        this.closed = true;
    }

    void setControlMessageChannel(Channel channel) {
        this.controlMessageChannel = channel;
    }

    int semaphoreAvailablePermits() {
        return fileTransferSemaphore.availablePermits();
    }

    @Override
    public boolean connected() {
        return !this.closed && (this.controlMessageChannel == null || this.controlMessageChannel.isOpen());
    }

    @Override
    public void close() {
        this.closed = true;
        if (logger.isDebugEnabled()) {
            logger.debug("{} Closing stream connection channels on {}", (Object)NettyStreamingMessageSender.createLogTag(this.session, null), (Object)this.template.to);
        }
        for (ScheduledFuture<?> future : this.channelKeepAlives) {
            future.cancel(false);
        }
        this.channelKeepAlives.clear();
        ArrayList<ChannelFuture> futures = new ArrayList<ChannelFuture>(this.threadToChannelMap.size());
        for (Channel channel : this.threadToChannelMap.values()) {
            futures.add(channel.close());
        }
        FBUtilities.waitOnFutures(futures, 10L, TimeUnit.SECONDS);
        this.threadToChannelMap.clear();
        this.fileTransferExecutor.shutdownNow();
        if (this.controlMessageChannel != null) {
            this.controlMessageChannel.close();
        }
    }

    class KeepAliveTask
    implements Runnable {
        private final Channel channel;
        private final StreamSession session;
        ScheduledFuture<?> future;

        KeepAliveTask(Channel channel, StreamSession session) {
            this.channel = channel;
            this.session = session;
        }

        @Override
        public void run() {
            if (!this.channel.isOpen() || NettyStreamingMessageSender.this.closed) {
                this.future.cancel(false);
                return;
            }
            if (((Boolean)this.channel.attr(TRANSFERRING_FILE_ATTR).get()).booleanValue()) {
                return;
            }
            try {
                if (logger.isTraceEnabled()) {
                    logger.trace("{} Sending keep-alive to {}.", (Object)NettyStreamingMessageSender.createLogTag(this.session, this.channel), (Object)this.session.peer);
                }
                NettyStreamingMessageSender.this.sendControlMessage(this.channel, new KeepAliveMessage(), this::keepAliveListener);
            }
            catch (IOException ioe) {
                this.future.cancel(false);
            }
        }

        private void keepAliveListener(Future<? super Void> future) {
            if (future.isSuccess() || future.isCancelled()) {
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).", (Object)NettyStreamingMessageSender.createLogTag(this.session, this.channel), (Object)future.cause());
            }
        }
    }

    class FileStreamTask
    implements Runnable {
        private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
        private final StreamMessage msg;

        FileStreamTask(OutgoingStreamMessage ofm) {
            this.msg = ofm;
        }

        FileStreamTask(StreamMessage msg) {
            this.msg = msg;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (!this.acquirePermit(3)) {
                return;
            }
            try {
                Channel channel = this.getOrCreateChannel();
                if (!channel.attr(TRANSFERRING_FILE_ATTR).compareAndSet((Object)false, (Object)true)) {
                    throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream");
                }
                try (AsyncStreamingOutputPlus outPlus = new AsyncStreamingOutputPlus(channel);){
                    StreamMessage.serialize(this.msg, outPlus, NettyStreamingMessageSender.this.streamingVersion, NettyStreamingMessageSender.this.session);
                }
                finally {
                    channel.attr(TRANSFERRING_FILE_ATTR).set((Object)Boolean.FALSE);
                }
            }
            catch (Exception e) {
                NettyStreamingMessageSender.this.session.onError(e);
            }
            finally {
                fileTransferSemaphore.release();
            }
        }

        boolean acquirePermit(int logInterval) {
            long logIntervalNanos = TimeUnit.MINUTES.toNanos(logInterval);
            long timeOfLastLogging = System.nanoTime();
            while (!NettyStreamingMessageSender.this.closed) {
                try {
                    if (fileTransferSemaphore.tryAcquire(1L, TimeUnit.SECONDS)) {
                        return true;
                    }
                    long now = System.nanoTime();
                    if (now - timeOfLastLogging <= logIntervalNanos) continue;
                    timeOfLastLogging = now;
                    OutgoingStreamMessage ofm = (OutgoingStreamMessage)this.msg;
                    if (!logger.isInfoEnabled()) continue;
                    logger.info("{} waiting to acquire a permit to begin streaming {}. This message logs every {} minutes", new Object[]{NettyStreamingMessageSender.createLogTag(NettyStreamingMessageSender.this.session, null), ofm.getName(), logInterval});
                }
                catch (InterruptedException interruptedException) {
                }
            }
            return false;
        }

        private Channel getOrCreateChannel() {
            Thread currentThread = Thread.currentThread();
            try {
                Channel channel = (Channel)NettyStreamingMessageSender.this.threadToChannelMap.get(currentThread);
                if (channel != null) {
                    return channel;
                }
                channel = NettyStreamingMessageSender.this.createChannel();
                NettyStreamingMessageSender.this.threadToChannelMap.put(currentThread, channel);
                return channel;
            }
            catch (Exception e) {
                throw new IOError(e);
            }
        }

        private void onError(Throwable t) {
            try {
                NettyStreamingMessageSender.this.session.onError(t).get(DEFAULT_CLOSE_WAIT_IN_MILLIS, TimeUnit.MILLISECONDS);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }

        void injectChannel(Channel channel) {
            Thread currentThread = Thread.currentThread();
            if (NettyStreamingMessageSender.this.threadToChannelMap.get(currentThread) != null) {
                throw new IllegalStateException("previous channel already set");
            }
            NettyStreamingMessageSender.this.threadToChannelMap.put(currentThread, channel);
        }

        void unsetChannel() {
            NettyStreamingMessageSender.this.threadToChannelMap.remove(Thread.currentThread());
        }
    }
}

