/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.collect.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionInitializationContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.junit.Assert;

public class CollectSinkFunctionTestWrapper<IN> {
    public static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
    private static final int SOCKET_TIMEOUT_MILLIS = 1000;
    private static final int FUTURE_TIMEOUT_MILLIS = 10000;
    private static final int MAX_RETIRES = 100;
    private final TypeSerializer<IN> serializer;
    private final int maxBytesPerBatch;
    private final IOManager ioManager;
    private final StreamingRuntimeContext runtimeContext;
    private final MockOperatorEventGateway gateway;
    private final CollectSinkOperatorCoordinator coordinator;
    private final MockFunctionInitializationContext functionInitializationContext;
    private CollectSinkFunction<IN> function;

    public CollectSinkFunctionTestWrapper(TypeSerializer<IN> serializer, int maxBytesPerBatch) throws Exception {
        this.serializer = serializer;
        this.maxBytesPerBatch = maxBytesPerBatch;
        this.ioManager = new IOManagerAsync();
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager(this.ioManager).build();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
        this.gateway = new MockOperatorEventGateway();
        this.coordinator = new CollectSinkOperatorCoordinator(1000);
        this.coordinator.start();
        this.functionInitializationContext = new MockFunctionInitializationContext();
    }

    public void closeWrapper() throws Exception {
        this.coordinator.close();
        this.ioManager.close();
    }

    public CollectSinkOperatorCoordinator getCoordinator() {
        return this.coordinator;
    }

    public void openFunction() throws Exception {
        this.function = new CollectSinkFunction(this.serializer, (long)this.maxBytesPerBatch, ACCUMULATOR_NAME);
        this.function.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.function.setOperatorEventGateway((OperatorEventGateway)this.gateway);
        this.function.open(DefaultOpenContext.INSTANCE);
        this.coordinator.handleEventFromOperator(0, 0, this.gateway.getNextEvent());
    }

    public void openFunctionWithState() throws Exception {
        this.functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
        this.function = new CollectSinkFunction(this.serializer, (long)this.maxBytesPerBatch, ACCUMULATOR_NAME);
        this.function.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.function.setOperatorEventGateway((OperatorEventGateway)this.gateway);
        this.function.initializeState((FunctionInitializationContext)this.functionInitializationContext);
        this.function.open(DefaultOpenContext.INSTANCE);
        this.coordinator.handleEventFromOperator(0, 0, this.gateway.getNextEvent());
    }

    public void invoke(IN record) throws Exception {
        this.function.invoke(record, null);
    }

    public void checkpointFunction(long checkpointId) throws Exception {
        this.function.snapshotState((FunctionSnapshotContext)new MockFunctionSnapshotContext(checkpointId));
        this.functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
    }

    public void checkpointComplete(long checkpointId) {
        this.function.notifyCheckpointComplete(checkpointId);
        this.functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
    }

    public void closeFunctionNormally() throws Exception {
        this.function.accumulateFinalResults();
        this.function.close();
    }

    public void closeFunctionAbnormally() throws Exception {
        this.function.close();
        this.coordinator.executionAttemptFailed(0, 0, null);
    }

    public CollectCoordinationResponse sendRequestAndGetResponse(String version, long offset) throws Exception {
        for (int i = 0; i < 100; ++i) {
            CollectCoordinationResponse response = this.sendRequest(version, offset);
            if (response.getLastCheckpointedOffset() < 0L) continue;
            return response;
        }
        throw new RuntimeException("Too many retries in sendRequestAndGetValidResponse");
    }

    private CollectCoordinationResponse sendRequest(String version, long offset) throws Exception {
        CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
        return (CollectCoordinationResponse)this.coordinator.handleCoordinationRequest((CoordinationRequest)request).get(10000L, TimeUnit.MILLISECONDS);
    }

    public Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws Exception {
        ArrayList<byte[]> accLocalValue = this.getAccumulatorLocalValue();
        List serializedResults = SerializedListAccumulator.deserializeList(accLocalValue, (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        Assert.assertEquals((long)1L, (long)serializedResults.size());
        byte[] serializedResult = (byte[])serializedResults.get(0);
        return CollectSinkFunction.deserializeAccumulatorResult((byte[])serializedResult);
    }

    public ArrayList<byte[]> getAccumulatorLocalValue() {
        Accumulator accumulator = this.runtimeContext.getAccumulator(ACCUMULATOR_NAME);
        return ((SerializedListAccumulator)accumulator).getLocalValue();
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }
}

