/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.index.shard.recovery;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.recovery.RecoverFilesRecoveryException;
import org.elasticsearch.index.shard.recovery.RecoveryCleanFilesRequest;
import org.elasticsearch.index.shard.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.index.shard.recovery.RecoveryFilesInfoRequest;
import org.elasticsearch.index.shard.recovery.RecoveryFinalizeRecoveryRequest;
import org.elasticsearch.index.shard.recovery.RecoveryPrepareForTranslogOperationsRequest;
import org.elasticsearch.index.shard.recovery.RecoveryResponse;
import org.elasticsearch.index.shard.recovery.RecoveryTranslogOperationsRequest;
import org.elasticsearch.index.shard.recovery.StartRecoveryRequest;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;

public class RecoverySource
extends AbstractComponent {
    private final ThreadPool threadPool;
    private final TransportService transportService;
    private final IndicesService indicesService;
    private final ByteSizeValue fileChunkSize;
    private final boolean compress;
    private final int translogOps;
    private final ByteSizeValue translogSize;
    private final ExecutorService concurrentStreamPool;

    @Inject
    public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) {
        super(settings);
        this.threadPool = threadPool;
        this.transportService = transportService;
        this.indicesService = indicesService;
        int concurrentStreams = this.componentSettings.getAsInt("concurrent_streams", 5);
        this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5L).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
        this.fileChunkSize = this.componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100L, ByteSizeUnit.KB));
        this.translogOps = this.componentSettings.getAsInt("translog_ops", 1000);
        this.translogSize = this.componentSettings.getAsBytesSize("translog_size", new ByteSizeValue(100L, ByteSizeUnit.KB));
        this.compress = this.componentSettings.getAsBoolean("compress", true);
        this.logger.debug("using concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", concurrentStreams, this.fileChunkSize, this.translogSize, this.translogOps, this.compress);
        transportService.registerHandler("index/shard/recovery/startRecovery", new StartRecoveryTransportRequestHandler());
    }

    public void close() {
        this.concurrentStreamPool.shutdown();
    }

    private RecoveryResponse recover(final StartRecoveryRequest request) {
        final InternalIndexShard shard = (InternalIndexShard)this.indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
        this.logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
        final RecoveryResponse response = new RecoveryResponse();
        shard.recover(new Engine.RecoveryHandler(){

            @Override
            public void phase1(final SnapshotIndexCommit snapshot) throws ElasticSearchException {
                long totalSize = 0L;
                long existingTotalSize = 0L;
                try {
                    StopWatch stopWatch = new StopWatch().start();
                    for (String name : snapshot.getFiles()) {
                        StoreFileMetaData md = shard.store().metaData(name);
                        boolean useExisting = false;
                        if (request.existingFiles().containsKey(name) && !name.startsWith("segments") && md.isSame(request.existingFiles().get(name))) {
                            response.phase1ExistingFileNames.add(name);
                            response.phase1ExistingFileSizes.add(md.length());
                            existingTotalSize += md.length();
                            useExisting = true;
                            if (RecoverySource.this.logger.isTraceEnabled()) {
                                RecoverySource.this.logger.trace("[{}][{}] recovery [phase1] to {}: not recovering [{}], exists in local store and has checksum [{}], size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, md.checksum(), md.length());
                            }
                        }
                        if (!useExisting) {
                            if (request.existingFiles().containsKey(name)) {
                                RecoverySource.this.logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name, request.existingFiles().get(name), md);
                            } else {
                                RecoverySource.this.logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], does not exists in remote", request.shardId().index().name(), request.shardId().id(), request.targetNode(), name);
                            }
                            response.phase1FileNames.add(name);
                            response.phase1FileSizes.add(md.length());
                        }
                        totalSize += md.length();
                    }
                    response.phase1TotalSize = totalSize;
                    response.phase1ExistingTotalSize = existingTotalSize;
                    RecoverySource.this.logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
                    RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize);
                    RecoverySource.this.transportService.submitRequest(request.targetNode(), "index/shard/recovery/filesInfo", recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE_SAME).txGet();
                    final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
                    final AtomicReference lastException = new AtomicReference();
                    for (final String name : response.phase1FileNames) {
                        RecoverySource.this.concurrentStreamPool.execute(new Runnable(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void run() {
                                IndexInput indexInput = null;
                                try {
                                    int toRead;
                                    int BUFFER_SIZE = (int)RecoverySource.this.fileChunkSize.bytes();
                                    byte[] buf = new byte[BUFFER_SIZE];
                                    StoreFileMetaData md = shard.store().metaData(name);
                                    indexInput = snapshot.getDirectory().openInput(name);
                                    long len = indexInput.length();
                                    for (long readCount = 0L; readCount < len; readCount += (long)toRead) {
                                        if (shard.state() == IndexShardState.CLOSED) {
                                            throw new IndexShardClosedException(shard.shardId());
                                        }
                                        toRead = readCount + (long)BUFFER_SIZE > len ? (int)(len - readCount) : BUFFER_SIZE;
                                        long position = indexInput.getFilePointer();
                                        indexInput.readBytes(buf, 0, toRead, false);
                                        RecoverySource.this.transportService.submitRequest(request.targetNode(), "index/shard/recovery/fileChunk", new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead), TransportRequestOptions.options().withCompress(RecoverySource.this.compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
                                    }
                                    indexInput.close();
                                }
                                catch (Exception e) {
                                    lastException.set(e);
                                }
                                finally {
                                    if (indexInput != null) {
                                        try {
                                            indexInput.close();
                                        }
                                        catch (IOException iOException) {}
                                    }
                                    latch.countDown();
                                }
                            }
                        });
                    }
                    latch.await();
                    if (lastException.get() != null) {
                        throw (Exception)lastException.get();
                    }
                    HashSet<String> snapshotFiles = Sets.newHashSet(snapshot.getFiles());
                    RecoverySource.this.transportService.submitRequest(request.targetNode(), "index/shard/recovery/cleanFiles", new RecoveryCleanFilesRequest(shard.shardId(), snapshotFiles), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
                    stopWatch.stop();
                    RecoverySource.this.logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
                    response.phase1Time = stopWatch.totalTime().millis();
                }
                catch (Throwable e) {
                    throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
                }
            }

            @Override
            public void phase2(Translog.Snapshot snapshot) throws ElasticSearchException {
                if (shard.state() == IndexShardState.CLOSED) {
                    throw new IndexShardClosedException(request.shardId());
                }
                RecoverySource.this.logger.trace("[{}][{}] recovery [phase2] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
                StopWatch stopWatch = new StopWatch().start();
                RecoverySource.this.transportService.submitRequest(request.targetNode(), "index/shard/recovery/prepareTranslog", new RecoveryPrepareForTranslogOperationsRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
                int totalOperations = this.sendSnapshot(snapshot);
                stopWatch.stop();
                RecoverySource.this.logger.trace("[{}][{}] recovery [phase2] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
                response.phase2Time = stopWatch.totalTime().millis();
                response.phase2Operations = totalOperations;
            }

            @Override
            public void phase3(Translog.Snapshot snapshot) throws ElasticSearchException {
                if (shard.state() == IndexShardState.CLOSED) {
                    throw new IndexShardClosedException(request.shardId());
                }
                RecoverySource.this.logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", request.shardId().index().name(), request.shardId().id(), request.targetNode());
                StopWatch stopWatch = new StopWatch().start();
                int totalOperations = this.sendSnapshot(snapshot);
                RecoverySource.this.transportService.submitRequest(request.targetNode(), "index/shard/recovery/finalize", new RecoveryFinalizeRecoveryRequest(request.shardId()), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
                if (request.markAsRelocated()) {
                    try {
                        shard.relocated("to " + request.targetNode());
                    }
                    catch (IllegalIndexShardStateException e) {
                        // empty catch block
                    }
                }
                stopWatch.stop();
                RecoverySource.this.logger.trace("[{}][{}] recovery [phase3] to {}: took [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), stopWatch.totalTime());
                response.phase3Time = stopWatch.totalTime().millis();
                response.phase3Operations = totalOperations;
            }

            private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
                int ops = 0;
                long size = 0L;
                int totalOperations = 0;
                ArrayList<Translog.Operation> operations = Lists.newArrayList();
                while (snapshot.hasNext()) {
                    if (shard.state() == IndexShardState.CLOSED) {
                        throw new IndexShardClosedException(request.shardId());
                    }
                    Translog.Operation operation = snapshot.next();
                    operations.add(operation);
                    ++totalOperations;
                    if (++ops < RecoverySource.this.translogOps && (size += operation.estimateSize()) < RecoverySource.this.translogSize.bytes()) continue;
                    RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
                    RecoverySource.this.transportService.submitRequest(request.targetNode(), "index/shard/recovery/translogOps", translogOperationsRequest, TransportRequestOptions.options().withCompress(RecoverySource.this.compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
                    ops = 0;
                    size = 0L;
                    operations.clear();
                }
                if (!operations.isEmpty()) {
                    RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
                    RecoverySource.this.transportService.submitRequest(request.targetNode(), "index/shard/recovery/translogOps", translogOperationsRequest, TransportRequestOptions.options().withCompress(RecoverySource.this.compress).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
                }
                return totalOperations;
            }
        });
        return response;
    }

    class StartRecoveryTransportRequestHandler
    extends BaseTransportRequestHandler<StartRecoveryRequest> {
        StartRecoveryTransportRequestHandler() {
        }

        @Override
        public StartRecoveryRequest newInstance() {
            return new StartRecoveryRequest();
        }

        @Override
        public String executor() {
            return "cached";
        }

        @Override
        public void messageReceived(StartRecoveryRequest request, TransportChannel channel) throws Exception {
            RecoveryResponse response = RecoverySource.this.recover(request);
            channel.sendResponse(response);
        }
    }

    public static class Actions {
        public static final String START_RECOVERY = "index/shard/recovery/startRecovery";
    }
}

