/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.changelog.inmemory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
    private final Random random = new Random();
    @TempDir
    public File temporaryFolder;

    public static Stream<Boolean> parameters() {
        return Stream.of(Boolean.valueOf(true));
    }

    @Disabled(value="FLINK-30729")
    @MethodSource(value={"parameters"})
    @ParameterizedTest(name="compression = {0}")
    public void testNoAppendAfterClose(boolean compression) throws IOException {
        Assertions.assertThatThrownBy(() -> {
            StateChangelogWriter writer = this.getFactory(compression, this.temporaryFolder).createWriter(new OperatorID().toString(), KeyGroupRange.of((int)0, (int)0), (MailboxExecutor)new SyncMailboxExecutor());
            writer.close();
            writer.append(0, new byte[0]);
        }).isInstanceOf(IllegalStateException.class);
    }

    @MethodSource(value={"parameters"})
    @ParameterizedTest(name="compression = {0}")
    public void testWriteAndRead(boolean compression) throws Exception {
        KeyGroupRange kgRange = KeyGroupRange.of((int)0, (int)5);
        Map<Integer, List<byte[]>> appendsByKeyGroup = this.generateAppends(kgRange, 405, 20);
        try (StateChangelogStorage<T> client = this.getFactory(compression, this.temporaryFolder);
             StateChangelogWriter writer = client.createWriter(new OperatorID().toString(), kgRange, (MailboxExecutor)new SyncMailboxExecutor());){
            SequenceNumber prev = writer.initialSequenceNumber();
            for (Map.Entry<Integer, List<byte[]>> entry : appendsByKeyGroup.entrySet()) {
                Integer group = entry.getKey();
                List<byte[]> appends = entry.getValue();
                for (byte[] bytes : appends) {
                    writer.append(group.intValue(), bytes);
                }
                writer.nextSequenceNumber();
            }
            SnapshotResult res = (SnapshotResult)writer.persist(prev, 1L).get();
            ChangelogStateHandle jmHandle = (ChangelogStateHandle)res.getJobManagerOwnedSnapshot();
            StateChangelogHandleReader reader = client.createReader();
            this.assertByteMapsEqual(appendsByKeyGroup, this.extract(jmHandle, reader));
        }
    }

    private void assertByteMapsEqual(Map<Integer, List<byte[]>> expected, Map<Integer, List<byte[]>> actual) {
        Assertions.assertThat(actual).hasSameSizeAs(expected);
        for (Map.Entry<Integer, List<byte[]>> e : expected.entrySet()) {
            List<byte[]> expectedList = e.getValue();
            List<byte[]> actualList = actual.get(e.getKey());
            Iterator<byte[]> ite = expectedList.iterator();
            Iterator<byte[]> ale = actualList.iterator();
            while (ite.hasNext() && ale.hasNext()) {
                Assertions.assertThat((byte[])ale.next()).isEqualTo((Object)ite.next());
            }
            Assertions.assertThat(ite).isExhausted();
            Assertions.assertThat(ale).isExhausted();
        }
    }

    private Map<Integer, List<byte[]>> extract(T handle, StateChangelogHandleReader<T> reader) throws Exception {
        HashMap<Integer, List<byte[]>> changes = new HashMap<Integer, List<byte[]>>();
        try (CloseableIterator it = reader.getChanges(handle);){
            while (it.hasNext()) {
                StateChange change = (StateChange)it.next();
                changes.computeIfAbsent(change.getKeyGroup(), k -> new ArrayList()).add(change.getChange());
            }
        }
        return changes;
    }

    private Map<Integer, List<byte[]>> generateAppends(KeyGroupRange kgRange, int keyLen, int appendsPerGroup) {
        return StreamSupport.stream(kgRange.spliterator(), false).collect(Collectors.toMap(Function.identity(), unused -> this.generateData(appendsPerGroup, keyLen)));
    }

    private List<byte[]> generateData(int numAppends, int keyLen) {
        return Stream.generate(() -> this.randomBytes(keyLen)).limit(numAppends).collect(Collectors.toList());
    }

    private byte[] randomBytes(int len) {
        byte[] bytes = new byte[len];
        this.random.nextBytes(bytes);
        return bytes;
    }

    protected StateChangelogStorage<T> getFactory(boolean compression, File temporaryFolder) throws IOException {
        return new InMemoryStateChangelogStorage();
    }
}

