/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.protobuf.ProtocolStringList;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class ReplicationSourceShipper
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
    protected final Configuration conf;
    protected final String walGroupId;
    protected final PriorityBlockingQueue<Path> queue;
    protected final ReplicationSourceInterface source;
    protected long lastLoggedPosition = -1L;
    protected volatile Path currentPath;
    private WorkerState state;
    protected ReplicationSourceWALReader entryReader;
    protected final long sleepForRetries;
    protected final int maxRetriesMultiplier;

    public ReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
        this.conf = conf;
        this.walGroupId = walGroupId;
        this.queue = queue;
        this.source = source;
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
    }

    @Override
    public void run() {
        this.setWorkerState(WorkerState.RUNNING);
        while (this.isActive()) {
            int sleepMultiplier = 1;
            if (!this.source.isPeerEnabled()) {
                if (!this.sleepForRetries("Replication is disabled", sleepMultiplier)) continue;
                ++sleepMultiplier;
                continue;
            }
            while (this.entryReader == null) {
                if (!this.sleepForRetries("Replication WAL entry reader thread not initialized", sleepMultiplier)) continue;
                ++sleepMultiplier;
            }
            try {
                ReplicationSourceWALReader.WALEntryBatch entryBatch = this.entryReader.take();
                this.shipEdits(entryBatch);
            }
            catch (InterruptedException e) {
                LOG.trace("Interrupted while waiting for next replication entry batch", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        if (this.state != WorkerState.FINISHED) {
            this.setWorkerState(WorkerState.STOPPED);
        }
    }

    protected void shipEdits(ReplicationSourceWALReader.WALEntryBatch entryBatch) {
        List<WAL.Entry> entries = entryBatch.getWalEntries();
        long lastReadPosition = entryBatch.getLastWalPosition();
        this.currentPath = entryBatch.getLastWalPath();
        int sleepMultiplier = 0;
        if (entries.isEmpty()) {
            if (this.lastLoggedPosition != lastReadPosition) {
                this.updateLogPosition(lastReadPosition);
                this.source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), this.walGroupId);
            }
            return;
        }
        int currentSize = (int)entryBatch.getHeapSize();
        while (this.isActive()) {
            try {
                try {
                    this.source.tryThrottle(currentSize);
                }
                catch (InterruptedException e) {
                    LOG.debug("Interrupted while sleeping for throttling control");
                    Thread.currentThread().interrupt();
                    continue;
                }
                ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
                replicateContext.setEntries(entries).setSize(currentSize);
                replicateContext.setWalGroupId(this.walGroupId);
                long startTimeNs = System.nanoTime();
                boolean replicated = this.source.getReplicationEndpoint().replicate(replicateContext);
                long endTimeNs = System.nanoTime();
                if (!replicated) continue;
                sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
                if (this.lastLoggedPosition != lastReadPosition) {
                    int size = entries.size();
                    for (int i = 0; i < size; ++i) {
                        this.cleanUpHFileRefs(entries.get(i).getEdit());
                    }
                    this.updateLogPosition(lastReadPosition);
                }
                this.source.postShipEdits(entries, currentSize);
                this.source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles());
                this.source.getSourceMetrics().setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), this.walGroupId);
                if (!LOG.isTraceEnabled()) break;
                LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations() + " operations in " + (endTimeNs - startTimeNs) / 1000000L + " ms");
                break;
            }
            catch (Exception ex) {
                LOG.warn(this.source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:" + StringUtils.stringifyException((Throwable)ex));
                if (!this.sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) continue;
                ++sleepMultiplier;
            }
        }
    }

    private void cleanUpHFileRefs(WALEdit edit) throws IOException {
        String peerId = this.source.getPeerId();
        if (peerId.contains("-")) {
            peerId = peerId.split("-")[0];
        }
        ArrayList<Cell> cells = edit.getCells();
        int totalCells = cells.size();
        for (int i = 0; i < totalCells; ++i) {
            Cell cell = (Cell)cells.get(i);
            if (!CellUtil.matchingQualifier((Cell)cell, (byte[])WALEdit.BULK_LOAD)) continue;
            WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
            List stores = bld.getStoresList();
            int totalStores = stores.size();
            for (int j = 0; j < totalStores; ++j) {
                ProtocolStringList storeFileList = ((WALProtos.StoreDescriptor)stores.get(j)).getStoreFileList();
                this.source.getSourceManager().cleanUpHFileRefs(peerId, (List<String>)storeFileList);
                this.source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
            }
        }
    }

    protected void updateLogPosition(long lastReadPosition) {
        this.source.getSourceManager().logPositionAndCleanOldLogs(this.currentPath, this.source.getPeerClusterZnode(), lastReadPosition, false, false);
        this.lastLoggedPosition = lastReadPosition;
    }

    public void startup(Thread.UncaughtExceptionHandler handler) {
        String name = Thread.currentThread().getName();
        Threads.setDaemonThreadRunning((Thread)this, (String)(name + ".replicationSource." + this.walGroupId + "," + this.source.getPeerClusterZnode()), (Thread.UncaughtExceptionHandler)handler);
    }

    public PriorityBlockingQueue<Path> getLogQueue() {
        return this.queue;
    }

    public Path getCurrentPath() {
        return this.entryReader.getCurrentPath();
    }

    public long getCurrentPosition() {
        return this.lastLoggedPosition;
    }

    public void setWALReader(ReplicationSourceWALReader entryReader) {
        this.entryReader = entryReader;
    }

    public long getStartPosition() {
        return 0L;
    }

    protected boolean isActive() {
        return this.source.isSourceActive() && this.state == WorkerState.RUNNING && !this.isInterrupted();
    }

    public void setWorkerState(WorkerState state) {
        this.state = state;
    }

    public WorkerState getWorkerState() {
        return this.state;
    }

    public void stopWorker() {
        this.setWorkerState(WorkerState.STOPPED);
    }

    public boolean isFinished() {
        return this.state == WorkerState.FINISHED;
    }

    public boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace(msg + ", sleeping " + this.sleepForRetries + " times " + sleepMultiplier);
            }
            Thread.sleep(this.sleepForRetries * (long)sleepMultiplier);
        }
        catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping between retries");
            Thread.currentThread().interrupt();
        }
        return sleepMultiplier < this.maxRetriesMultiplier;
    }

    public static enum WorkerState {
        RUNNING,
        STOPPED,
        FINISHED;

    }
}

