/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testframe.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.assertj.core.api.AbstractAssert;

public class CollectIteratorAssert<T>
extends AbstractAssert<CollectIteratorAssert<T>, Iterator<T>> {
    private final Iterator<T> collectorIterator;
    private final List<RecordsFromSplit<T>> recordsFromSplits = new ArrayList<RecordsFromSplit<T>>();
    private int totalNumRecords;
    private Integer limit = null;

    protected CollectIteratorAssert(Iterator<T> collectorIterator) {
        super(collectorIterator, CollectIteratorAssert.class);
        this.collectorIterator = collectorIterator;
    }

    public CollectIteratorAssert<T> withNumRecordsLimit(int limit) {
        this.limit = limit;
        return this;
    }

    public void matchesRecordsFromSource(List<List<T>> recordsBySplitsFromSource, CheckpointingMode semantic) {
        for (List<T> recordsFromSplit : recordsBySplitsFromSource) {
            this.recordsFromSplits.add(new RecordsFromSplit<T>(recordsFromSplit));
            this.totalNumRecords += recordsFromSplit.size();
        }
        if (this.limit != null && this.limit > this.totalNumRecords) {
            throw new IllegalArgumentException("Limit validation size should be less than total number of records from source");
        }
        switch (semantic) {
            case AT_LEAST_ONCE: {
                this.compareWithAtLeastOnceSemantic(this.collectorIterator, this.recordsFromSplits);
                break;
            }
            case EXACTLY_ONCE: {
                this.compareWithExactlyOnceSemantic(this.collectorIterator, this.recordsFromSplits);
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unrecognized semantic \"%s\"", semantic));
            }
        }
    }

    private void compareWithAtLeastOnceSemantic(Iterator<T> resultIterator, List<RecordsFromSplit<T>> recordsFromSplits) {
        LinkedList<T> duplicateRead = new LinkedList<T>();
        int recordCounter = 0;
        while (resultIterator.hasNext()) {
            T record = resultIterator.next();
            if (!this.matchThenNext(record)) {
                duplicateRead.add(record);
            } else {
                ++recordCounter;
            }
            if (this.limit == null || recordCounter < this.limit) continue;
            break;
        }
        if (this.limit == null && !this.hasReachedEnd()) {
            this.failWithMessage(this.generateMismatchDescription(String.format("Expected to have at least %d records in result, but only received %d records", recordsFromSplits.stream().mapToInt(recordsFromSplit -> ((RecordsFromSplit)recordsFromSplit).records.size()).sum(), recordCounter), resultIterator), new Object[0]);
        } else {
            this.confirmDuplicateRead(duplicateRead);
        }
    }

    private void compareWithExactlyOnceSemantic(Iterator<T> resultIterator, List<RecordsFromSplit<T>> recordsFromSplits) {
        int recordCounter = 0;
        while (resultIterator.hasNext()) {
            T record = resultIterator.next();
            if (!this.matchThenNext(record)) {
                if (recordCounter >= this.totalNumRecords) {
                    this.failWithMessage(this.generateMismatchDescription(String.format("Expected to have exactly %d records in result, but received more records", recordsFromSplits.stream().mapToInt(recordsFromSplit -> ((RecordsFromSplit)recordsFromSplit).records.size()).sum()), resultIterator), new Object[0]);
                } else {
                    this.failWithMessage(this.generateMismatchDescription(String.format("Unexpected record '%s' at position %d", record, recordCounter), resultIterator), new Object[0]);
                }
            }
            if (this.limit == null || ++recordCounter < this.limit) continue;
            break;
        }
        if (this.limit == null && !this.hasReachedEnd()) {
            this.failWithMessage(this.generateMismatchDescription(String.format("Expected to have exactly %d records in result, but only received %d records", recordsFromSplits.stream().mapToInt(recordsFromSplit -> ((RecordsFromSplit)recordsFromSplit).records.size()).sum(), recordCounter), resultIterator), new Object[0]);
        }
    }

    private void confirmDuplicateRead(List<T> duplicateRead) {
        for (T record : duplicateRead) {
            boolean found = false;
            for (RecordsFromSplit<T> recordsFromSplit : this.recordsFromSplits) {
                if (!((RecordsFromSplit)recordsFromSplit).records.contains(record)) continue;
                found = true;
                break;
            }
            if (found) continue;
            this.failWithMessage(String.format("Unexpected duplicate record '%s'", record), new Object[0]);
        }
    }

    private boolean matchThenNext(T record) {
        for (RecordsFromSplit<T> recordsFromSplit : this.recordsFromSplits) {
            if (!recordsFromSplit.hasNext() || !record.equals(recordsFromSplit.current())) continue;
            recordsFromSplit.forward();
            return true;
        }
        return false;
    }

    private boolean hasReachedEnd() {
        for (RecordsFromSplit<T> recordsFromSplit : this.recordsFromSplits) {
            if (!recordsFromSplit.hasNext()) continue;
            return false;
        }
        return true;
    }

    private String generateMismatchDescription(String reason, Iterator<T> resultIterator) {
        StringBuilder sb = new StringBuilder();
        sb.append(reason).append("\n");
        sb.append("Current progress of multiple split test data validation:\n");
        int splitCounter = 0;
        for (RecordsFromSplit<T> recordsFromSplit : this.recordsFromSplits) {
            sb.append(String.format("Split %d (%d/%d): \n", splitCounter++, ((RecordsFromSplit)recordsFromSplit).offset, ((RecordsFromSplit)recordsFromSplit).records.size()));
            for (int recordIndex = 0; recordIndex < ((RecordsFromSplit)recordsFromSplit).records.size(); ++recordIndex) {
                sb.append(((RecordsFromSplit)recordsFromSplit).records.get(recordIndex));
                if (recordIndex == ((RecordsFromSplit)recordsFromSplit).offset) {
                    sb.append("\t<----");
                }
                sb.append("\n");
            }
        }
        if (resultIterator.hasNext()) {
            sb.append("Remaining received elements after the unexpected one: \n");
            while (resultIterator.hasNext()) {
                sb.append(resultIterator.next()).append("\n");
            }
        }
        return sb.toString();
    }

    private static class RecordsFromSplit<T> {
        private int offset = 0;
        private final List<T> records;

        public RecordsFromSplit(List<T> records) {
            this.records = records;
        }

        public T current() {
            if (!this.hasNext()) {
                return null;
            }
            return this.records.get(this.offset);
        }

        public void forward() {
            ++this.offset;
        }

        public boolean hasNext() {
            return this.offset < this.records.size();
        }
    }
}

