/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.freon;

import com.codahale.metrics.Timer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.concurrent.TimedSemaphore;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.freon.BaseAppendLogGenerator;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.grpc.Channel;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@CommandLine.Command(name="falg", aliases={"follower-append-log-generator"}, description={"Generate append log entries to a follower server"}, versionProvider=HddsVersionProvider.class, mixinStandardHelpOptions=true, showDefaultValues=true)
public class FollowerAppendLogEntryGenerator
extends BaseAppendLogGenerator
implements Callable<Void>,
StreamObserver<RaftProtos.AppendEntriesReplyProto> {
    public static final String FAKE_LEADER_ADDDRESS = "localhost:1234";
    private static final Logger LOG = LoggerFactory.getLogger(FollowerAppendLogEntryGenerator.class);
    private static final String FAKE_LEADER_ID = "ffffffff-df33-4a20-8e1f-ffffffff6be5";
    @CommandLine.Option(names={"-l", "--pipeline"}, description={"Pipeline to use. By default the first RATIS/THREE pipeline will be used."}, defaultValue="96714307-4bd7-42b5-a65d-e1b13b4ca5c0")
    private String pipelineId = "96714307-4bd7-42b5-a65d-e1b13b4ca5c0";
    @CommandLine.Option(names={"-s", "--size"}, description={"Size of the generated chunks (in bytes)"}, defaultValue="1024")
    private int chunkSize;
    @CommandLine.Option(names={"-b", "--batching"}, description={"Number of write chunks requests in one AppendLogEntry"}, defaultValue="2")
    private int batching;
    @CommandLine.Option(names={"-i", "--next-index"}, description={"The next index in the term 2 to continue a test. (If zero, a new ratis ring will be intialized with configureGroup call and vote)"}, defaultValue="0")
    private long nextIndex;
    @CommandLine.Option(names={"--rate-limit"}, description={"Maximum number of requests per second (if bigger than 0)"}, defaultValue="0")
    private int rateLimit;
    private TimedSemaphore rateLimiter;
    private RaftProtos.RaftPeerProto requestor;
    private long term = 2L;
    private RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub stub;
    private Random callIdRandom = new Random();
    private ByteString dataToWrite;
    private Timer timer;
    private StreamObserver<RaftProtos.AppendEntriesRequestProto> sender;

    @Override
    public Void call() throws Exception {
        this.inFlightMessages = new LinkedBlockingQueue(this.inflightLimit);
        this.timer = this.getMetrics().timer("append-entry");
        byte[] data = RandomStringUtils.randomAscii((int)this.chunkSize).getBytes(StandardCharsets.UTF_8);
        this.dataToWrite = ByteString.copyFrom((byte[])data);
        OzoneConfiguration conf = this.createOzoneConfiguration();
        this.setServerIdFromFile(conf);
        Preconditions.assertTrue((this.getThreadNo() == 1 ? 1 : 0) != 0, (Object)"This test should be executed from one thread");
        this.requestor = RaftProtos.RaftPeerProto.newBuilder().setId(RaftPeerId.valueOf((String)FAKE_LEADER_ID).toByteString()).setAddress(FAKE_LEADER_ADDDRESS).build();
        NettyChannelBuilder channelBuilder = (NettyChannelBuilder)NettyChannelBuilder.forTarget((String)this.serverAddress).proxyDetector(uri -> null);
        channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
        ManagedChannel build = channelBuilder.build();
        this.stub = RaftServerProtocolServiceGrpc.newStub((Channel)build);
        if (this.rateLimit != 0) {
            this.rateLimiter = new TimedSemaphore(1L, TimeUnit.SECONDS, this.rateLimit);
        }
        this.init();
        this.sender = this.stub.appendEntries((StreamObserver)this);
        if (this.nextIndex == 0L) {
            this.configureGroup();
            RaftProtos.RequestVoteReplyProto vote = this.requestVote().get(1000L, TimeUnit.SECONDS);
            LOG.info("Datanode answered to the vote request: {}", (Object)vote);
            if (!vote.getServerReply().getSuccess()) {
                throw new RuntimeException("Datanode didn't vote to the fake freon leader.");
            }
            long callId = this.callIdRandom.nextLong();
            this.inFlightMessages.put(callId);
            this.sender.onNext((Object)this.createInitialLogEntry(callId));
            this.nextIndex = 1L;
        }
        this.runTests(this::sendAppendLogEntryRequest);
        if (this.rateLimiter != null) {
            this.rateLimiter.shutdown();
        }
        return null;
    }

    private void sendAppendLogEntryRequest(long sequence) {
        this.timer.time(() -> {
            try {
                long callId = this.callIdRandom.nextLong();
                this.inFlightMessages.put(callId);
                this.sender.onNext((Object)this.createAppendLogEntry(sequence, callId));
            }
            catch (InterruptedException e) {
                LOG.error("Error while sending new append entry request (HB) to the follower", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
    }

    private RaftProtos.AppendEntriesRequestProto createAppendLogEntry(long sequence, long callId) {
        RaftProtos.AppendEntriesRequestProto.Builder requestBuilder = RaftProtos.AppendEntriesRequestProto.newBuilder();
        if (this.rateLimiter != null) {
            try {
                this.rateLimiter.acquire();
            }
            catch (InterruptedException e) {
                LOG.error("Rate limiter acquire has been interrupted", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        long previousLog = this.nextIndex - 1L;
        for (int i = 0; i < this.batching; ++i) {
            long index;
            ++this.nextIndex;
            long chunkId = (long)this.batching * sequence + (long)i;
            long blockId = chunkId / 1000L;
            long containerId = blockId / 1000L;
            ByteString payload = ContainerProtos.ContainerCommandRequestProto.newBuilder().setContainerID(containerId).setCmdType(ContainerProtos.Type.WriteChunk).setDatanodeUuid(this.serverId).setWriteChunk(ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.EMPTY).setBlockID(ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(containerId).setLocalID(blockId).build()).setChunkData(ContainerProtos.ChunkInfo.newBuilder().setChunkName("chunk" + chunkId).setLen((long)this.dataToWrite.size()).setOffset(0L).setChecksumData(ContainerProtos.ChecksumData.newBuilder().setBytesPerChecksum(0).setType(ContainerProtos.ChecksumType.NONE).build()).build()).build()).build().toByteString();
            RaftProtos.StateMachineLogEntryProto.Builder stateMachinelogEntry = RaftProtos.StateMachineLogEntryProto.newBuilder().setCallId(callId).setClientId(ClientId.randomId().toByteString()).setLogData(payload).setStateMachineEntry(RaftProtos.StateMachineEntryProto.newBuilder().setStateMachineData(this.dataToWrite).build());
            RaftProtos.LogEntryProto logEntry = RaftProtos.LogEntryProto.newBuilder().setTerm(this.term).setIndex(index).setStateMachineLogEntry(stateMachinelogEntry).build();
            requestBuilder.addEntries(logEntry);
        }
        requestBuilder.setPreviousLog(RaftProtos.TermIndexProto.newBuilder().setTerm(this.term).setIndex(previousLog).build()).setLeaderCommit(Math.max(0L, previousLog - (long)this.batching)).addCommitInfos(RaftProtos.CommitInfoProto.newBuilder().setServer(this.requestor).setCommitIndex(Math.max(0L, previousLog - (long)this.batching)).build()).setLeaderTerm(this.term).setServerRequest(this.createServerRequest(callId)).build();
        return requestBuilder.build();
    }

    private void configureGroup() throws IOException {
        ClientId clientId = ClientId.randomId();
        RaftGroupId groupId = RaftGroupId.valueOf((UUID)UUID.fromString(this.pipelineId));
        RaftPeerId peerId = RaftPeerId.getRaftPeerId((String)this.serverId);
        RaftGroup group = RaftGroup.valueOf((RaftGroupId)groupId, (RaftPeer[])new RaftPeer[]{RaftPeer.newBuilder().setId(this.serverId).setAddress(this.serverAddress).build(), RaftPeer.newBuilder().setId(RaftPeerId.valueOf((String)FAKE_LEADER_ID)).setAddress(FAKE_LEADER_ADDDRESS).build()});
        RaftClient client = RaftClient.newBuilder().setClientId(clientId).setProperties(new RaftProperties()).setRaftGroup(group).build();
        RaftClientReply raftClientReply = client.getGroupManagementApi(peerId).add(group);
        LOG.info("Group is configured in the RAFT server (one follower, one fake leader): {}", (Object)raftClientReply);
    }

    public void onNext(RaftProtos.AppendEntriesReplyProto reply) {
        long currentIndex;
        long lastCommit;
        long callId = reply.getServerReply().getCallId();
        if (!this.inFlightMessages.remove(callId)) {
            LOG.warn("Received message with callId which was not used to send message: {}", (Object)callId);
            LOG.info("{}", (Object)reply);
        }
        if ((lastCommit = reply.getFollowerCommit()) % 1000L == 0L && (currentIndex = this.getAttemptCount()) - lastCommit > (long)(this.batching * 3)) {
            LOG.warn("Last committed index ({}) is behind the current index ({}) on the client side.", (Object)lastCommit, (Object)currentIndex);
        }
    }

    public void onError(Throwable t) {
        LOG.error("Error on sending message", t);
    }

    public void onCompleted() {
    }

    private RaftProtos.AppendEntriesRequestProto createInitialLogEntry(long callId) {
        RaftProtos.RaftRpcRequestProto serverRequest = this.createServerRequest(callId);
        long index = 0L;
        RaftProtos.LogEntryProto logEntry = RaftProtos.LogEntryProto.newBuilder().setTerm(this.term).setIndex(index).setConfigurationEntry(RaftProtos.RaftConfigurationProto.newBuilder().addPeers(RaftProtos.RaftPeerProto.newBuilder().setId(RaftPeerId.valueOf((String)this.serverAddress).toByteString()).setAddress(this.serverAddress).build()).addPeers(this.requestor).build()).build();
        return RaftProtos.AppendEntriesRequestProto.newBuilder().setLeaderTerm(this.term).addEntries(logEntry).setServerRequest(serverRequest).build();
    }

    private CompletableFuture<RaftProtos.RequestVoteReplyProto> requestVote() {
        final CompletableFuture<RaftProtos.RequestVoteReplyProto> response = new CompletableFuture<RaftProtos.RequestVoteReplyProto>();
        RaftProtos.RequestVoteRequestProto voteRequest = RaftProtos.RequestVoteRequestProto.newBuilder().setServerRequest(this.createServerRequest(this.callIdRandom.nextLong())).setCandidateLastEntry(RaftProtos.TermIndexProto.newBuilder().setIndex(0L).setTerm(this.term).build()).build();
        this.stub.requestVote(voteRequest, (StreamObserver)new StreamObserver<RaftProtos.RequestVoteReplyProto>(){

            public void onNext(RaftProtos.RequestVoteReplyProto value) {
                response.complete(value);
            }

            public void onError(Throwable t) {
                response.completeExceptionally(t);
            }

            public void onCompleted() {
            }
        });
        return response;
    }

    private RaftProtos.RaftRpcRequestProto createServerRequest(long callId) {
        RaftGroupId raftGroupId = RaftGroupId.valueOf((UUID)UUID.fromString(this.pipelineId));
        return RaftProtos.RaftRpcRequestProto.newBuilder().setRaftGroupId(RaftProtos.RaftGroupIdProto.newBuilder().setId(raftGroupId.toByteString()).build()).setRequestorId(this.requestor.getId()).setCallId(callId).build();
    }
}

