/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.streaming.runtime.operators.sink.CommitRetrier;
import org.apache.flink.streaming.runtime.operators.sink.CommitterHandler;
import org.apache.flink.streaming.runtime.operators.sink.ForwardCommittingHandler;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;

class CommitRetrierTest {
    CommitRetrierTest() {
    }

    @Test
    void testRetry() throws Exception {
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        CommitterHandlerWithRetries committerHandler = new CommitterHandlerWithRetries();
        CommitRetrier retryer = new CommitRetrier((ProcessingTimeService)processingTimeService, (CommitterHandler)committerHandler);
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)false));
        committerHandler.addRetries(2);
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)retryer.retry(0L), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)retryer.retry(1L), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)retryer.retry(1L), (Matcher)CoreMatchers.equalTo((Object)false));
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)0));
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    @Test
    void testInfiniteRetry() throws Exception {
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        CommitterHandlerWithRetries committerHandler = new CommitterHandlerWithRetries();
        CommitRetrier retryer = new CommitRetrier((ProcessingTimeService)processingTimeService, (CommitterHandler)committerHandler);
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)false));
        committerHandler.addRetries(2);
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)2));
        retryer.retryIndefinitely();
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)0));
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    @Test
    void testTimedRetry() throws Exception {
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        ManualClock manualClock = new ManualClock();
        processingTimeService.setCurrentTime(manualClock.absoluteTimeMillis());
        CommitterHandlerWithRetries committerHandler = new CommitterHandlerWithRetries();
        CommitRetrier retryer = new CommitRetrier((ProcessingTimeService)processingTimeService, (CommitterHandler)committerHandler, (Clock)manualClock);
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)false));
        committerHandler.addRetries(2);
        retryer.retryWithDelay();
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)2));
        processingTimeService.advance(1000L);
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)1));
        processingTimeService.advance(1000L);
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)0));
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)false));
        processingTimeService.advance(1000L);
        MatcherAssert.assertThat((Object)committerHandler.getPendingRetries(), (Matcher)CoreMatchers.equalTo((Object)0));
        MatcherAssert.assertThat((Object)committerHandler.needsRetry(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    private static class CommitterHandlerWithRetries
    extends ForwardCommittingHandler<String> {
        private AtomicInteger retriesNeeded = new AtomicInteger(0);

        private CommitterHandlerWithRetries() {
        }

        void addRetries(int retries) {
            this.retriesNeeded.addAndGet(retries);
        }

        int getPendingRetries() {
            return this.retriesNeeded.get();
        }

        public boolean needsRetry() {
            return this.getPendingRetries() > 0;
        }

        public void retry() throws IOException, InterruptedException {
            this.retriesNeeded.decrementAndGet();
        }
    }
}

