/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.streaming.StreamIn;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.streaming.StreamOutSession;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingRepairTask
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
    private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new ConcurrentHashMap<UUID, StreamingRepairTask>();
    private static final StreamingRepairTaskSerializer serializer = new StreamingRepairTaskSerializer();
    public final UUID id;
    private final InetAddress owner;
    public final InetAddress src;
    public final InetAddress dst;
    private final String tableName;
    private final String cfName;
    private final Collection<Range<Token>> ranges;
    private final Runnable callback;

    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback) {
        this.id = id;
        this.owner = owner;
        this.src = src;
        this.dst = dst;
        this.tableName = tableName;
        this.cfName = cfName;
        this.ranges = ranges;
        this.callback = callback;
    }

    public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback) {
        InetAddress local = FBUtilities.getBroadcastAddress();
        UUID id = UUIDGen.makeType1UUIDFromHost(local);
        InetAddress src = ep2.equals(local) ? ep2 : ep1;
        InetAddress dst = ep2.equals(local) ? ep1 : ep2;
        StreamingRepairTask task = new StreamingRepairTask(id, local, src, dst, tableName, cfName, ranges, StreamingRepairTask.wrapCallback(callback, id, local.equals(src)));
        tasks.put(id, task);
        return task;
    }

    public boolean isLocalTask() {
        return this.owner.equals(this.src);
    }

    @Override
    public void run() {
        if (this.src.equals(FBUtilities.getBroadcastAddress())) {
            this.initiateStreaming();
        } else {
            this.forwardToSource();
        }
    }

    private void initiateStreaming() {
        ColumnFamilyStore cfstore = Table.open(this.tableName).getColumnFamilyStore(this.cfName);
        try {
            logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", this.id, this.ranges.size(), this.dst));
            Collection<SSTableReader> sstables = cfstore.markCurrentSSTablesReferenced();
            StreamOutSession outsession = StreamOutSession.create(this.tableName, this.dst, this.callback);
            StreamOut.transferSSTables(outsession, sstables, this.ranges, OperationType.AES);
            StreamIn.requestRanges(this.dst, this.tableName, Collections.singleton(cfstore), this.ranges, this.callback, OperationType.AES);
        }
        catch (Exception e) {
            throw new RuntimeException("Streaming repair failed", e);
        }
    }

    private void forwardToSource() {
        try {
            logger.info(String.format("[streaming task #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", this.id, this.ranges.size(), this.src, this.dst));
            StreamingRepairRequest.send(this);
        }
        catch (IOException e) {
            throw new RuntimeException("Error forwarding streaming task to " + this.src, e);
        }
    }

    private static Runnable makeReplyingCallback(final InetAddress taskOwner, final UUID taskId) {
        return new Runnable(){
            private final AtomicInteger outstanding = new AtomicInteger(2);

            @Override
            public void run() {
                if (this.outstanding.decrementAndGet() > 0) {
                    return;
                }
                try {
                    StreamingRepairResponse.reply(taskOwner, taskId);
                }
                catch (IOException e) {
                    throw new IOError(e);
                }
            }
        };
    }

    private static Runnable wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask) {
        return new Runnable(){
            private final AtomicInteger outstanding;
            {
                this.outstanding = new AtomicInteger(isLocalTask ? 2 : 1);
            }

            @Override
            public void run() {
                if (this.outstanding.decrementAndGet() > 0) {
                    return;
                }
                tasks.remove(taskid);
                if (callback != null) {
                    callback.run();
                }
            }
        };
    }

    private static class StreamingRepairTaskSerializer
    implements IVersionedSerializer<StreamingRepairTask> {
        private StreamingRepairTaskSerializer() {
        }

        @Override
        public void serialize(StreamingRepairTask task, DataOutput dos, int version) throws IOException {
            UUIDGen.write(task.id, dos);
            CompactEndpointSerializationHelper.serialize(task.owner, dos);
            CompactEndpointSerializationHelper.serialize(task.src, dos);
            CompactEndpointSerializationHelper.serialize(task.dst, dos);
            dos.writeUTF(task.tableName);
            dos.writeUTF(task.cfName);
            dos.writeInt(task.ranges.size());
            for (Range range : task.ranges) {
                AbstractBounds.serializer().serialize(range, dos, version);
            }
        }

        @Override
        public StreamingRepairTask deserialize(DataInput dis, int version) throws IOException {
            UUID id = UUIDGen.read(dis);
            InetAddress owner = CompactEndpointSerializationHelper.deserialize(dis);
            InetAddress src = CompactEndpointSerializationHelper.deserialize(dis);
            InetAddress dst = CompactEndpointSerializationHelper.deserialize(dis);
            String tableName = dis.readUTF();
            String cfName = dis.readUTF();
            int rangesCount = dis.readInt();
            ArrayList<Range> ranges = new ArrayList<Range>(rangesCount);
            for (int i = 0; i < rangesCount; ++i) {
                ranges.add((Range)((AbstractBounds)AbstractBounds.serializer().deserialize(dis, version)).toTokenBounds());
            }
            return new StreamingRepairTask(id, owner, src, dst, tableName, cfName, ranges, StreamingRepairTask.makeReplyingCallback(owner, id));
        }

        @Override
        public long serializedSize(StreamingRepairTask task, int version) {
            throw new UnsupportedOperationException();
        }
    }

    public static class StreamingRepairResponse
    implements IVerbHandler {
        @Override
        public void doVerb(Message message, String id) {
            UUID taskid;
            byte[] bytes = message.getMessageBody();
            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                taskid = UUIDGen.read(dis);
            }
            catch (IOException e) {
                throw new IOError(new IOException("Error reading stream repair response from " + message.getFrom(), e));
            }
            StreamingRepairTask task = (StreamingRepairTask)tasks.get(taskid);
            if (task == null) {
                logger.error(String.format("Received a stream repair response from %s for unknow taks %s (have this node been restarted recently?)", message.getFrom(), taskid));
                return;
            }
            assert (task.owner.equals(FBUtilities.getBroadcastAddress()));
            logger.info(String.format("[streaming task #%s] task succeeded", task.id));
            if (task.callback != null) {
                task.callback.run();
            }
        }

        private static void reply(InetAddress remote, UUID taskid) throws IOException {
            logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote));
            int version = Gossiper.instance.getVersion(remote);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(bos);
            UUIDGen.write(taskid, dos);
            Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version);
            MessagingService.instance().sendOneWay(msg, remote);
        }
    }

    public static class StreamingRepairRequest
    implements IVerbHandler {
        @Override
        public void doVerb(Message message, String id) {
            StreamingRepairTask task;
            byte[] bytes = message.getMessageBody();
            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
            try {
                task = serializer.deserialize(dis, message.getVersion());
            }
            catch (IOException e) {
                throw new IOError(e);
            }
            assert (task.src.equals(FBUtilities.getBroadcastAddress()));
            assert (task.owner.equals(message.getFrom()));
            logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.getFrom(), task.ranges.size(), task.dst));
            task.run();
        }

        private static void send(StreamingRepairTask task) throws IOException {
            int version = Gossiper.instance.getVersion(task.src);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(bos);
            serializer.serialize(task, (DataOutput)dos, version);
            Message msg = new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version);
            MessagingService.instance().sendOneWay(msg, task.src);
        }
    }
}

