/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.lang.invoke.LambdaMetafactory;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.streaming.ConnectionHandler;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamConnectionFactory;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamRequest;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSummary;
import org.apache.cassandra.streaming.StreamTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.streaming.messages.IncomingFileMessage;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.streaming.messages.PrepareMessage;
import org.apache.cassandra.streaming.messages.ReceivedMessage;
import org.apache.cassandra.streaming.messages.SessionFailedMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamSession
implements IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
    public final InetAddress peer;
    private final int index;
    public final InetAddress connecting;
    private StreamResultFuture streamResult;
    protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();
    @VisibleForTesting
    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap();
    private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap<UUID, StreamReceiveTask>();
    private final StreamingMetrics metrics;
    private final StreamConnectionFactory factory;
    public final ConnectionHandler handler;
    private AtomicBoolean isAborted = new AtomicBoolean(false);
    private final boolean keepSSTableLevel;
    private final boolean isIncremental;
    private volatile State state = State.INITIALIZED;
    private volatile boolean completeSent = false;

    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental) {
        this.peer = peer;
        this.connecting = connecting;
        this.index = index;
        this.factory = factory;
        this.handler = new ConnectionHandler(this);
        this.metrics = StreamingMetrics.get(connecting);
        this.keepSSTableLevel = keepSSTableLevel;
        this.isIncremental = isIncremental;
    }

    public UUID planId() {
        return this.streamResult == null ? null : this.streamResult.planId;
    }

    public int sessionIndex() {
        return this.index;
    }

    public String description() {
        return this.streamResult == null ? null : this.streamResult.description;
    }

    public boolean keepSSTableLevel() {
        return this.keepSSTableLevel;
    }

    public boolean isIncremental() {
        return this.isIncremental;
    }

    StreamReceiveTask getReceivingTask(UUID cfId) {
        assert (this.receivers.containsKey(cfId));
        return this.receivers.get(cfId);
    }

    public void init(StreamResultFuture streamResult) {
        this.streamResult = streamResult;
    }

    public void start() {
        if (this.requests.isEmpty() && this.transfers.isEmpty()) {
            logger.info("[Stream #{}] Session does not have any tasks.", (Object)this.planId());
            this.closeSession(State.COMPLETE);
            return;
        }
        try {
            logger.info("[Stream #{}] Starting streaming to {}{}", new Object[]{this.planId(), this.peer, this.peer.equals(this.connecting) ? "" : " through " + this.connecting});
            this.handler.initiate();
            this.onInitializationComplete();
        }
        catch (Exception e) {
            JVMStabilityInspector.inspectThrowable(e);
            this.onError(e);
        }
    }

    public Socket createConnection() throws IOException {
        assert (this.factory != null);
        return this.factory.createConnection(this.connecting);
    }

    public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt) {
        this.requests.add(new StreamRequest(keyspace, ranges, columnFamilies, repairedAt));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt) {
        this.failIfFinished();
        Collection<ColumnFamilyStore> stores = this.getColumnFamilyStores(keyspace, columnFamilies);
        if (flushTables) {
            this.flushSSTables(stores);
        }
        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
        List<SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, this.isIncremental);
        try {
            this.addTransferFiles(sections);
        }
        catch (Throwable throwable) {
            for (SSTableStreamingSections release : sections) {
                release.ref.release();
            }
            throw throwable;
        }
        for (SSTableStreamingSections release : sections) {
            release.ref.release();
        }
    }

    private void failIfFinished() {
        if (this.state().isFinalState()) {
            throw new RuntimeException(String.format("Stream %s is finished with state %s", this.planId(), this.state().name()));
        }
    }

    private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies) {
        HashSet<ColumnFamilyStore> stores = new HashSet<ColumnFamilyStore>();
        if (columnFamilies.isEmpty()) {
            stores.addAll(Keyspace.open(keyspace).getColumnFamilyStores());
        } else {
            for (String cf : columnFamilies) {
                stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
            }
        }
        return stores;
    }

    @VisibleForTesting
    public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental) {
        Refs<SSTableReader> refs = new Refs<SSTableReader>();
        try {
            for (ColumnFamilyStore cfStore : stores) {
                ArrayList<Range<PartitionPosition>> keyRanges = new ArrayList<Range<PartitionPosition>>(ranges.size());
                for (Range<Token> range : ranges) {
                    keyRanges.add(Range.makeRowRange(range));
                }
                refs.addAll(cfStore.selectAndReference((Function<View, Iterable<SSTableReader>>)(Function)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$getSSTableSectionsForRanges$0(java.util.List boolean org.apache.cassandra.db.lifecycle.View ), (Lorg/apache/cassandra/db/lifecycle/View;)Ljava/lang/Iterable;)(keyRanges, (boolean)isIncremental)).refs);
            }
            ArrayList<SSTableStreamingSections> sections = new ArrayList<SSTableStreamingSections>(refs.size());
            for (SSTableReader sstable : refs) {
                long repairedAt = overriddenRepairedAt;
                if (overriddenRepairedAt == 0L) {
                    repairedAt = sstable.getSSTableMetadata().repairedAt;
                }
                sections.add(new SSTableStreamingSections(refs.get(sstable), sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges), repairedAt));
            }
            return sections;
        }
        catch (Throwable t) {
            refs.release();
            throw t;
        }
    }

    public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails) {
        this.failIfFinished();
        Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
        while (iter.hasNext()) {
            StreamTransferTask newTask;
            SSTableStreamingSections details = iter.next();
            if (details.sections.isEmpty()) {
                details.ref.release();
                iter.remove();
                continue;
            }
            UUID cfId = details.ref.get().metadata.cfId;
            StreamTransferTask task = this.transfers.get(cfId);
            if (task == null && (task = this.transfers.putIfAbsent(cfId, newTask = new StreamTransferTask(this, cfId))) == null) {
                task = newTask;
            }
            task.addTransferFile(details.ref, details.estimatedKeys, details.sections, details.repairedAt);
            iter.remove();
        }
    }

    private synchronized void closeSession(State finalState) {
        if (this.isAborted.compareAndSet(false, true)) {
            this.state(finalState);
            if (finalState == State.FAILED) {
                for (StreamTask task : Iterables.concat(this.receivers.values(), this.transfers.values())) {
                    task.abort();
                }
            }
            this.handler.close();
            this.streamResult.handleSessionComplete(this);
        }
    }

    public void state(State newState) {
        this.state = newState;
    }

    public State state() {
        return this.state;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public void messageReceived(StreamMessage message) {
        switch (message.type) {
            case PREPARE: {
                PrepareMessage msg = (PrepareMessage)message;
                this.prepare(msg.requests, msg.summaries);
                break;
            }
            case FILE: {
                this.receive((IncomingFileMessage)message);
                break;
            }
            case RECEIVED: {
                ReceivedMessage received = (ReceivedMessage)message;
                this.received(received.cfId, received.sequenceNumber);
                break;
            }
            case COMPLETE: {
                this.complete();
                break;
            }
            case SESSION_FAILED: {
                this.sessionFailed();
            }
        }
    }

    public void onInitializationComplete() {
        this.state(State.PREPARING);
        PrepareMessage prepare = new PrepareMessage();
        prepare.requests.addAll(this.requests);
        for (StreamTransferTask task : this.transfers.values()) {
            prepare.summaries.add(task.getSummary());
        }
        this.handler.sendMessage(prepare);
        if (this.requests.isEmpty()) {
            this.startStreamingFiles();
        }
    }

    public void onError(Throwable e) {
        if (e instanceof SocketTimeoutException) {
            logger.error("[Stream #{}] Streaming socket timed out. This means the session peer stopped responding or is still processing received data. If there is no sign of failure in the other end or a very dense table is being transferred you may want to increase streaming_socket_timeout_in_ms property. Current value is {}ms.", new Object[]{this.planId(), DatabaseDescriptor.getStreamingSocketTimeout(), e});
        } else {
            logger.error("[Stream #{}] Streaming error occurred", (Object)this.planId(), (Object)e);
        }
        if (this.handler.isOutgoingConnected()) {
            this.handler.sendMessage(new SessionFailedMessage());
        }
        this.closeSession(State.FAILED);
    }

    public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries) {
        this.state(State.PREPARING);
        for (StreamRequest request : requests) {
            this.addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true, request.repairedAt);
        }
        for (StreamSummary summary : summaries) {
            this.prepareReceiving(summary);
        }
        if (!requests.isEmpty()) {
            PrepareMessage prepare = new PrepareMessage();
            for (StreamTransferTask task : this.transfers.values()) {
                prepare.summaries.add(task.getSummary());
            }
            this.handler.sendMessage(prepare);
        }
        if (!this.maybeCompleted()) {
            this.startStreamingFiles();
        }
    }

    public void fileSent(FileMessageHeader header) {
        long headerSize = header.size();
        StreamingMetrics.totalOutgoingBytes.inc(headerSize);
        this.metrics.outgoingBytes.inc(headerSize);
        StreamTransferTask task = this.transfers.get(header.cfId);
        if (task != null) {
            task.scheduleTimeout(header.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    public void receive(IncomingFileMessage message) {
        long headerSize = message.header.size();
        StreamingMetrics.totalIncomingBytes.inc(headerSize);
        this.metrics.incomingBytes.inc(headerSize);
        this.handler.sendMessage(new ReceivedMessage(message.header.cfId, message.header.sequenceNumber));
        this.receivers.get(message.header.cfId).received(message.sstable);
    }

    public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total) {
        ProgressInfo progress = new ProgressInfo(this.peer, this.index, desc.filenameFor(Component.DATA), direction, bytes, total);
        this.streamResult.handleProgress(progress);
    }

    public void received(UUID cfId, int sequenceNumber) {
        this.transfers.get(cfId).complete(sequenceNumber);
    }

    public synchronized void complete() {
        if (this.state == State.WAIT_COMPLETE) {
            if (!this.completeSent) {
                this.handler.sendMessage(new CompleteMessage());
                this.completeSent = true;
            }
            this.closeSession(State.COMPLETE);
        } else {
            this.state(State.WAIT_COMPLETE);
            this.handler.closeIncoming();
        }
    }

    public synchronized void sessionFailed() {
        logger.error("[Stream #{}] Remote peer {} failed stream session.", (Object)this.planId(), (Object)this.peer.getHostAddress());
        this.closeSession(State.FAILED);
    }

    public SessionInfo getSessionInfo() {
        ArrayList receivingSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.receivers.values()) {
            receivingSummaries.add(streamTask.getSummary());
        }
        ArrayList transferSummaries = Lists.newArrayList();
        for (StreamTask streamTask : this.transfers.values()) {
            transferSummaries.add(streamTask.getSummary());
        }
        return new SessionInfo(this.peer, this.index, this.connecting, receivingSummaries, transferSummaries, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask completedTask) {
        this.receivers.remove(completedTask.cfId);
        this.maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask completedTask) {
        this.transfers.remove(completedTask.cfId);
        this.maybeCompleted();
    }

    @Override
    public void onRemove(InetAddress endpoint) {
        logger.error("[Stream #{}] Session failed because remote peer {} has left.", (Object)this.planId(), (Object)this.peer.getHostAddress());
        this.closeSession(State.FAILED);
    }

    @Override
    public void onRestart(InetAddress endpoint, EndpointState epState) {
        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", (Object)this.planId(), (Object)this.peer.getHostAddress());
        this.closeSession(State.FAILED);
    }

    private boolean maybeCompleted() {
        boolean completed;
        boolean bl = completed = this.receivers.isEmpty() && this.transfers.isEmpty();
        if (completed) {
            if (this.state == State.WAIT_COMPLETE) {
                if (!this.completeSent) {
                    this.handler.sendMessage(new CompleteMessage());
                    this.completeSent = true;
                }
                this.closeSession(State.COMPLETE);
            } else {
                this.handler.sendMessage(new CompleteMessage());
                this.completeSent = true;
                this.state(State.WAIT_COMPLETE);
                this.handler.closeOutgoing();
            }
        }
        return completed;
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> stores) {
        ArrayList<ListenableFuture<ReplayPosition>> flushes = new ArrayList<ListenableFuture<ReplayPosition>>();
        for (ColumnFamilyStore cfs : stores) {
            flushes.add(cfs.forceFlush());
        }
        FBUtilities.waitOnFutures(flushes);
    }

    private synchronized void prepareReceiving(StreamSummary summary) {
        this.failIfFinished();
        if (summary.files > 0) {
            this.receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
        }
    }

    private void startStreamingFiles() {
        this.streamResult.handleSessionPrepared(this);
        this.state(State.STREAMING);
        for (StreamTransferTask task : this.transfers.values()) {
            Collection<OutgoingFileMessage> messages = task.getFileMessages();
            if (messages.size() > 0) {
                this.handler.sendMessages(messages);
                continue;
            }
            this.taskCompleted(task);
        }
    }

    private static /* synthetic */ Iterable lambda$getSSTableSectionsForRanges$0(List keyRanges, boolean isIncremental, View view) {
        HashSet sstables = Sets.newHashSet();
        SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
        for (Range keyRange : keyRanges) {
            for (SSTableReader sstable : View.sstablesInBounds((PartitionPosition)keyRange.left, (PartitionPosition)keyRange.right, intervalTree)) {
                if (isIncremental && sstable.isRepaired()) continue;
                sstables.add(sstable);
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("ViewFilter for {}/{} sstables", (Object)sstables.size(), (Object)Iterables.size(view.select(SSTableSet.CANONICAL)));
        }
        return sstables;
    }

    public static class SSTableStreamingSections {
        public final Ref<SSTableReader> ref;
        public final List<Pair<Long, Long>> sections;
        public final long estimatedKeys;
        public final long repairedAt;

        public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt) {
            this.ref = ref;
            this.sections = sections;
            this.estimatedKeys = estimatedKeys;
            this.repairedAt = repairedAt;
        }
    }

    public static enum State {
        INITIALIZED(false),
        PREPARING(false),
        STREAMING(false),
        WAIT_COMPLETE(false),
        COMPLETE(true),
        FAILED(true);

        private final boolean finalState;

        private State(boolean finalState) {
            this.finalState = finalState;
        }

        public boolean isFinalState() {
            return this.finalState;
        }
    }
}

