/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job.metrics;

import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.util.NoOpExecutionGraphCache;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class JobVertexWatermarksHandlerTest {
    private static final JobID TEST_JOB_ID = new JobID();
    private static final JobVertexID TEST_VERTEX_ID = new JobVertexID();
    private MetricFetcher metricFetcher;
    private MetricStore.TaskMetricStore taskMetricStore;
    private JobVertexWatermarksHandler watermarkHandler;
    private HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request;
    private AccessExecutionJobVertex vertex;

    @Before
    public void before() throws Exception {
        this.taskMetricStore = (MetricStore.TaskMetricStore)Mockito.mock(MetricStore.TaskMetricStore.class);
        MetricStore metricStore = (MetricStore)Mockito.mock(MetricStore.class);
        Mockito.when((Object)metricStore.getTaskMetricStore(TEST_JOB_ID.toString(), TEST_VERTEX_ID.toString())).thenReturn((Object)this.taskMetricStore);
        this.metricFetcher = (MetricFetcher)Mockito.mock(MetricFetcher.class);
        Mockito.when((Object)this.metricFetcher.getMetricStore()).thenReturn((Object)metricStore);
        this.watermarkHandler = new JobVertexWatermarksHandler((GatewayRetriever)Mockito.mock(LeaderGatewayRetriever.class), Time.seconds((long)1L), Collections.emptyMap(), this.metricFetcher, (ExecutionGraphCache)NoOpExecutionGraphCache.INSTANCE, (Executor)Mockito.mock(Executor.class));
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID.toString());
        pathParameters.put("vertexid", TEST_VERTEX_ID.toString());
        this.request = new HandlerRequest((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap());
        this.vertex = (AccessExecutionJobVertex)Mockito.mock(AccessExecutionJobVertex.class);
        Mockito.when((Object)this.vertex.getJobVertexId()).thenReturn((Object)TEST_VERTEX_ID);
        AccessExecutionVertex firstTask = (AccessExecutionVertex)Mockito.mock(AccessExecutionVertex.class);
        AccessExecutionVertex secondTask = (AccessExecutionVertex)Mockito.mock(AccessExecutionVertex.class);
        Mockito.when((Object)firstTask.getParallelSubtaskIndex()).thenReturn((Object)0);
        Mockito.when((Object)secondTask.getParallelSubtaskIndex()).thenReturn((Object)1);
        AccessExecutionVertex[] accessExecutionVertices = new AccessExecutionVertex[]{firstTask, secondTask};
        Mockito.when((Object)this.vertex.getTaskVertices()).thenReturn((Object)accessExecutionVertices);
    }

    @After
    public void after() {
        ((MetricFetcher)Mockito.verify((Object)this.metricFetcher)).update();
    }

    @Test
    public void testWatermarksRetrieval() throws Exception {
        Mockito.when((Object)this.taskMetricStore.getMetric("0.currentInputWatermark")).thenReturn((Object)"23");
        Mockito.when((Object)this.taskMetricStore.getMetric("1.currentInputWatermark")).thenReturn((Object)"42");
        MetricCollectionResponseBody response = this.watermarkHandler.handleRequest(this.request, this.vertex);
        Assert.assertThat((Object)response.getMetrics(), (Matcher)Matchers.containsInAnyOrder((Matcher[])new Matcher[]{new MetricMatcher("0.currentInputWatermark", "23"), new MetricMatcher("1.currentInputWatermark", "42")}));
    }

    @Test
    public void testPartialWatermarksAvailable() throws Exception {
        Mockito.when((Object)this.taskMetricStore.getMetric("0.currentInputWatermark")).thenReturn((Object)"23");
        Mockito.when((Object)this.taskMetricStore.getMetric("1.currentInputWatermark")).thenReturn(null);
        MetricCollectionResponseBody response = this.watermarkHandler.handleRequest(this.request, this.vertex);
        Assert.assertThat((Object)response.getMetrics(), (Matcher)Matchers.contains((Matcher)new MetricMatcher("0.currentInputWatermark", "23")));
    }

    @Test
    public void testNoWatermarksAvailable() throws Exception {
        Mockito.when((Object)this.taskMetricStore.getMetric("0.currentInputWatermark")).thenReturn(null);
        Mockito.when((Object)this.taskMetricStore.getMetric("1.currentInputWatermark")).thenReturn(null);
        MetricCollectionResponseBody response = this.watermarkHandler.handleRequest(this.request, this.vertex);
        Assert.assertThat((Object)response.getMetrics(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    private static class MetricMatcher
    extends BaseMatcher<Metric> {
        private String id;
        @Nullable
        private String value;

        MetricMatcher(String id, @Nullable String value) {
            this.id = id;
            this.value = value;
        }

        public boolean matches(Object o) {
            if (!(o instanceof Metric)) {
                return false;
            }
            Metric actual = (Metric)o;
            return actual.getId().equals(this.id) && Objects.equals(this.value, actual.getValue());
        }

        public void describeTo(Description description) {
            description.appendValue((Object)new Metric(this.id, this.value));
        }
    }
}

