/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.OutgoingStream;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamHook;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamRequest;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSummary;
import org.apache.cassandra.streaming.StreamTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.PrepareAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynAckMessage;
import org.apache.cassandra.streaming.messages.PrepareSynMessage;
import org.apache.cassandra.streaming.messages.ReceivedMessage;
import org.apache.cassandra.streaming.messages.SessionFailedMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamSession
implements IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
    private final StreamOperation streamOperation;
    public final InetAddressAndPort peer;
    private final OutboundConnectionSettings template;
    private final int index;
    private StreamResultFuture streamResult;
    protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
    @VisibleForTesting
    protected final ConcurrentHashMap<TableId, StreamTransferTask> transfers = new ConcurrentHashMap();
    private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<TableId, StreamReceiveTask>();
    private final StreamingMetrics metrics;
    final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<String, Set<Range<Token>>>();
    private final NettyStreamingMessageSender messageSender;
    private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<ChannelId, Channel>();
    private final AtomicBoolean isAborted = new AtomicBoolean(false);
    private final UUID pendingRepair;
    private final PreviewKind previewKind;
    private volatile State state = State.INITIALIZED;
    private volatile boolean completeSent = false;

    public StreamSession(StreamOperation streamOperation, InetAddressAndPort peer, StreamConnectionFactory factory, int index, UUID pendingRepair, PreviewKind previewKind) {
        this(streamOperation, new OutboundConnectionSettings(peer), factory, index, pendingRepair, previewKind);
    }

    public StreamSession(StreamOperation streamOperation, OutboundConnectionSettings template, StreamConnectionFactory factory, int index, UUID pendingRepair, PreviewKind previewKind) {
        this.streamOperation = streamOperation;
        this.peer = template.to;
        this.template = template;
        this.index = index;
        this.messageSender = new NettyStreamingMessageSender(this, template, factory, 12, previewKind.isPreview());
        this.metrics = StreamingMetrics.get(this.peer);
        this.pendingRepair = pendingRepair;
        this.previewKind = previewKind;
        logger.debug("Creating stream session to {}", (Object)template);
    }

    public UUID planId() {
        return this.streamResult == null ? null : this.streamResult.planId;
    }

    public int sessionIndex() {
        return this.index;
    }

    public StreamOperation streamOperation() {
        return this.streamResult == null ? null : this.streamResult.streamOperation;
    }

    public StreamOperation getStreamOperation() {
        return this.streamOperation;
    }

    public UUID getPendingRepair() {
        return this.pendingRepair;
    }

    public boolean isPreview() {
        return this.previewKind.isPreview();
    }

    public PreviewKind getPreviewKind() {
        return this.previewKind;
    }

    public StreamReceiver getAggregator(TableId tableId) {
        assert (this.receivers.containsKey(tableId)) : "Missing tableId " + tableId;
        return this.receivers.get(tableId).getReceiver();
    }

    public void init(StreamResultFuture streamResult) {
        this.streamResult = streamResult;
        StreamHook.instance.reportStreamFuture(this, streamResult);
    }

    public boolean attach(Channel channel) {
        if (!this.messageSender.hasControlChannel()) {
            this.messageSender.injectControlMessageChannel(channel);
        }
        return this.incomingChannels.putIfAbsent(channel.id(), channel) == null;
    }

    public void start() {
        if (this.requests.isEmpty() && this.transfers.isEmpty()) {
            logger.info("[Stream #{}] Session does not have any tasks.", (Object)this.planId());
            this.closeSession(State.COMPLETE);
            return;
        }
        try {
            logger.info("[Stream #{}] Starting streaming to {}{}", new Object[]{this.planId(), this.peer, this.template.connectTo == null ? "" : " through " + this.template.connectTo});
            this.messageSender.initialize();
            this.onInitializationComplete();
        }
        catch (Exception e) {
            JVMStabilityInspector.inspectThrowable(e);
            this.onError(e);
        }
    }

    public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies) {
        assert (Iterables.all((Iterable)fullRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(fullRanges)) : fullRanges.toString();
        assert (Iterables.all((Iterable)transientRanges, Replica::isSelf) || RangesAtEndpoint.isDummyList(transientRanges)) : transientRanges.toString();
        this.requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
    }

    synchronized void addTransferRanges(String keyspace, RangesAtEndpoint replicas, Collection<String> columnFamilies, boolean flushTables) {
        this.failIfFinished();
        Collection<ColumnFamilyStore> stores = this.getColumnFamilyStores(keyspace, columnFamilies);
        if (flushTables) {
            this.flushSSTables(stores);
        }
        RangesAtEndpoint unwrappedRanges = replicas.unwrap();
        List<OutgoingStream> streams = this.getOutgoingStreamsForRanges(unwrappedRanges, stores, this.pendingRepair, this.previewKind);
        this.addTransferStreams(streams);
        Set<Range<Token>> toBeUpdated = this.transferredRangesPerKeyspace.get(keyspace);
        if (toBeUpdated == null) {
            toBeUpdated = new HashSet<Range<Token>>();
        }
        toBeUpdated.addAll(replicas.ranges());
        this.transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
    }

    private void failIfFinished() {
        if (this.state() == State.COMPLETE || this.state() == State.FAILED) {
            throw new RuntimeException(String.format("Stream %s is finished with state %s", this.planId(), this.state().name()));
        }
    }

    private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies) {
        HashSet<ColumnFamilyStore> stores = new HashSet<ColumnFamilyStore>();
        if (columnFamilies.isEmpty()) {
            stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores());
        } else {
            for (String cf : columnFamilies) {
                stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
            }
        }
        return stores;
    }

    @VisibleForTesting
    public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint replicas, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind) {
        ArrayList<OutgoingStream> streams = new ArrayList<OutgoingStream>();
        try {
            for (ColumnFamilyStore cfs : stores) {
                streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, replicas, pendingRepair, previewKind));
            }
        }
        catch (Throwable t) {
            streams.forEach(OutgoingStream::finish);
            throw t;
        }
        return streams;
    }

    synchronized void addTransferStreams(Collection<OutgoingStream> streams) {
        this.failIfFinished();
        for (OutgoingStream stream : streams) {
            StreamTransferTask newTask;
            TableId tableId = stream.getTableId();
            StreamTransferTask task = this.transfers.get(tableId);
            if (task == null && (task = this.transfers.putIfAbsent(tableId, newTask = new StreamTransferTask(this, tableId))) == null) {
                task = newTask;
            }
            task.addTransferStream(stream);
        }
    }

    private synchronized Future closeSession(State finalState) {
        Object abortedTasksFuture = null;
        if (this.isAborted.compareAndSet(false, true)) {
            this.state(finalState);
            if (finalState == State.FAILED) {
                abortedTasksFuture = ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
            }
            this.incomingChannels.values().stream().map(channel -> channel.close());
            this.messageSender.close();
            this.streamResult.handleSessionComplete(this);
        }
        return abortedTasksFuture != null ? abortedTasksFuture : Futures.immediateFuture(null);
    }

    private void abortTasks() {
        try {
            this.receivers.values().forEach(StreamReceiveTask::abort);
            this.transfers.values().forEach(StreamTransferTask::abort);
        }
        catch (Exception e) {
            logger.warn("failed to abort some streaming tasks", (Throwable)e);
        }
    }

    public void state(State newState) {
        this.state = newState;
    }

    public State state() {
        return this.state;
    }

    public NettyStreamingMessageSender getMessageSender() {
        return this.messageSender;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public void messageReceived(StreamMessage message) {
        switch (message.type) {
            case STREAM_INIT: {
                break;
            }
            case PREPARE_SYN: {
                PrepareSynMessage msg = (PrepareSynMessage)message;
                this.prepare(msg.requests, msg.summaries);
                break;
            }
            case PREPARE_SYNACK: {
                this.prepareSynAck((PrepareSynAckMessage)message);
                break;
            }
            case PREPARE_ACK: {
                this.prepareAck((PrepareAckMessage)message);
                break;
            }
            case STREAM: {
                this.receive((IncomingStreamMessage)message);
                break;
            }
            case RECEIVED: {
                ReceivedMessage received = (ReceivedMessage)message;
                this.received(received.tableId, received.sequenceNumber);
                break;
            }
            case COMPLETE: {
                this.complete();
                break;
            }
            case KEEP_ALIVE: {
                break;
            }
            case SESSION_FAILED: {
                this.sessionFailed();
                break;
            }
            default: {
                throw new AssertionError((Object)("unhandled StreamMessage type: " + message.getClass().getName()));
            }
        }
    }

    public void onInitializationComplete() {
        this.state(State.PREPARING);
        PrepareSynMessage prepare = new PrepareSynMessage();
        prepare.requests.addAll(this.requests);
        for (StreamTransferTask task : this.transfers.values()) {
            prepare.summaries.add(task.getSummary());
        }
        this.messageSender.sendMessage(prepare);
    }

    public Future onError(Throwable e) {
        this.logError(e);
        if (this.messageSender.connected()) {
            this.messageSender.sendMessage(new SessionFailedMessage());
        }
        return this.closeSession(State.FAILED);
    }

    private void logError(Throwable e) {
        if (e instanceof SocketTimeoutException) {
            logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? If not, maybe try increasing streaming_keep_alive_period_in_secs.", new Object[]{this.planId(), this.peer.getHostAddress(true), this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddress(true), 2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(), e});
        } else {
            logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", new Object[]{this.planId(), this.peer.getHostAddress(true), this.template.connectTo == null ? "" : " through " + this.template.connectTo.getHostAddress(true), e});
        }
    }

    public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) {
        this.state(State.PREPARING);
        ScheduledExecutors.nonPeriodicTasks.execute(() -> this.prepareAsync(requests, summaries));
    }

    private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) {
        for (StreamRequest request : requests) {
            this.addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true);
        }
        for (StreamSummary summary : summaries) {
            this.prepareReceiving(summary);
        }
        PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage();
        if (!this.peer.equals(FBUtilities.getBroadcastAddressAndPort())) {
            for (StreamTransferTask task : this.transfers.values()) {
                prepareSynAck.summaries.add(task.getSummary());
            }
        }
        this.messageSender.sendMessage(prepareSynAck);
        this.streamResult.handleSessionPrepared(this);
        this.maybeCompleted();
    }

    private void prepareSynAck(PrepareSynAckMessage msg) {
        if (!msg.summaries.isEmpty()) {
            for (StreamSummary summary : msg.summaries) {
                this.prepareReceiving(summary);
            }
            this.messageSender.sendMessage(new PrepareAckMessage());
        }
        if (this.isPreview()) {
            this.completePreview();
        } else {
            this.startStreamingFiles(true);
        }
    }

    private void prepareAck(PrepareAckMessage msg) {
        if (this.isPreview()) {
            this.completePreview();
        } else {
            this.startStreamingFiles(true);
        }
    }

    public void streamSent(OutgoingStreamMessage message) {
        long headerSize = message.stream.getSize();
        StreamingMetrics.totalOutgoingBytes.inc(headerSize);
        this.metrics.outgoingBytes.inc(headerSize);
        StreamTransferTask task = this.transfers.get(message.header.tableId);
        if (task != null) {
            task.scheduleTimeout(message.header.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    public void receive(IncomingStreamMessage message) {
        if (this.isPreview()) {
            throw new RuntimeException("Cannot receive files for preview session");
        }
        long headerSize = message.stream.getSize();
        StreamingMetrics.totalIncomingBytes.inc(headerSize);
        this.metrics.incomingBytes.inc(headerSize);
        this.messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
        StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
        this.receivers.get(message.header.tableId).received(message.stream);
    }

    public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total) {
        ProgressInfo progress = new ProgressInfo(this.peer, this.index, filename, direction, bytes, total);
        this.streamResult.handleProgress(progress);
    }

    public void received(TableId tableId, int sequenceNumber) {
        this.transfers.get(tableId).complete(sequenceNumber);
    }

    public synchronized void complete() {
        logger.debug("handling Complete message, state = {}, completeSent = {}", (Object)this.state, (Object)this.completeSent);
        if (this.state == State.WAIT_COMPLETE) {
            if (!this.completeSent) {
                this.messageSender.sendMessage(new CompleteMessage());
                this.completeSent = true;
            }
            this.closeSession(State.COMPLETE);
        } else {
            this.state(State.WAIT_COMPLETE);
        }
    }

    public synchronized void sessionFailed() {
        logger.error("[Stream #{}] Remote peer {} failed stream session.", (Object)this.planId(), (Object)this.peer.toString());
        this.closeSession(State.FAILED);
    }

    public SessionInfo getSessionInfo() {
        ArrayList receivingSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.receivers.values()) {
            receivingSummaries.add(streamTask.getSummary());
        }
        ArrayList transferSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.transfers.values()) {
            transferSummaries.add(streamTask.getSummary());
        }
        return new SessionInfo(this.peer, this.index, this.template.connectTo == null ? this.peer : this.template.connectTo, receivingSummaries, transferSummaries, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask completedTask) {
        this.receivers.remove(completedTask.tableId);
        this.maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask completedTask) {
        this.transfers.remove(completedTask.tableId);
        this.maybeCompleted();
    }

    @Override
    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {
    }

    @Override
    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {
    }

    @Override
    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {
    }

    @Override
    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onDead(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        logger.error("[Stream #{}] Session failed because remote peer {} has left.", (Object)this.planId(), (Object)this.peer.toString());
        this.closeSession(State.FAILED);
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState epState) {
        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", (Object)this.planId(), (Object)this.peer.toString());
        this.closeSession(State.FAILED);
    }

    private void completePreview() {
        try {
            this.state(State.WAIT_COMPLETE);
            this.closeSession(State.COMPLETE);
        }
        finally {
            for (StreamTask task : Iterables.concat(this.receivers.values(), this.transfers.values())) {
                task.abort();
            }
        }
    }

    private boolean maybeCompleted() {
        boolean completed;
        boolean bl = completed = this.receivers.isEmpty() && this.transfers.isEmpty();
        if (completed) {
            if (this.state == State.WAIT_COMPLETE) {
                if (!this.completeSent) {
                    this.messageSender.sendMessage(new CompleteMessage());
                    this.completeSent = true;
                }
                this.closeSession(State.COMPLETE);
            } else {
                this.messageSender.sendMessage(new CompleteMessage());
                this.completeSent = true;
                this.state(State.WAIT_COMPLETE);
            }
        }
        return completed;
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> stores) {
        ArrayList<ListenableFuture<CommitLogPosition>> flushes = new ArrayList<ListenableFuture<CommitLogPosition>>();
        for (ColumnFamilyStore cfs : stores) {
            flushes.add(cfs.forceFlush());
        }
        FBUtilities.waitOnFutures(flushes);
    }

    @VisibleForTesting
    public synchronized void prepareReceiving(StreamSummary summary) {
        this.failIfFinished();
        if (summary.files > 0) {
            this.receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize));
        }
    }

    private void startStreamingFiles(boolean notifyPrepared) {
        if (notifyPrepared) {
            this.streamResult.handleSessionPrepared(this);
        }
        this.state(State.STREAMING);
        for (StreamTransferTask task : this.transfers.values()) {
            Collection<OutgoingStreamMessage> messages = task.getFileMessages();
            if (!messages.isEmpty()) {
                for (OutgoingStreamMessage ofm : messages) {
                    ofm.header.addSessionInfo(this);
                    this.messageSender.sendMessage(ofm);
                }
                continue;
            }
            this.taskCompleted(task);
        }
        this.maybeCompleted();
    }

    @VisibleForTesting
    public int getNumRequests() {
        return this.requests.size();
    }

    @VisibleForTesting
    public int getNumTransfers() {
        return this.transferredRangesPerKeyspace.size();
    }

    public static enum State {
        INITIALIZED,
        PREPARING,
        STREAMING,
        WAIT_COMPLETE,
        COMPLETE,
        FAILED;

    }
}

