/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

public class MockApiProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VIn, KOut, VOut> {
    private final ArrayList<Record<KIn, VIn>> processed = new ArrayList();
    private final Map<KIn, ValueAndTimestamp<VIn>> lastValueAndTimestampPerKey = new HashMap<KIn, ValueAndTimestamp<VIn>>();
    private final ArrayList<Long> punctuatedStreamTime = new ArrayList();
    private final ArrayList<Long> punctuatedSystemTime = new ArrayList();
    private Cancellable scheduleCancellable;
    private final PunctuationType punctuationType;
    private final long scheduleInterval;
    private boolean commitRequested = false;
    private ProcessorContext<KOut, VOut> context;

    public MockApiProcessor(PunctuationType punctuationType, long scheduleInterval) {
        this.punctuationType = punctuationType;
        this.scheduleInterval = scheduleInterval;
    }

    public MockApiProcessor() {
        this(PunctuationType.STREAM_TIME, -1L);
    }

    public void init(ProcessorContext<KOut, VOut> context) {
        this.context = context;
        if (this.scheduleInterval > 0L) {
            this.scheduleCancellable = context.schedule(Duration.ofMillis(this.scheduleInterval), this.punctuationType, (this.punctuationType == PunctuationType.STREAM_TIME ? this.punctuatedStreamTime : this.punctuatedSystemTime)::add);
        }
    }

    public void process(Record<KIn, VIn> record) {
        Object key = record.key();
        Object value = record.value();
        KeyValueTimestamp<Object, Object> keyValueTimestamp = new KeyValueTimestamp<Object, Object>(key, value, record.timestamp());
        if (value != null) {
            this.lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make((Object)value, (long)record.timestamp()));
        } else {
            this.lastValueAndTimestampPerKey.remove(key);
        }
        this.processed.add(record);
        if (this.commitRequested) {
            this.context.commit();
            this.commitRequested = false;
        }
    }

    public void checkAndClearProcessResult(KeyValueTimestamp<?, ?> ... expected) {
        MatcherAssert.assertThat((String)("the number of outputs:" + this.processed), (Object)this.processed.size(), (Matcher)Matchers.is((Object)expected.length));
        for (int i = 0; i < expected.length; ++i) {
            Record<KIn, VIn> record = this.processed.get(i);
            MatcherAssert.assertThat((String)("output[" + i + "]:"), new KeyValueTimestamp<Object, Object>(record.key(), record.value(), record.timestamp()), (Matcher)Matchers.is(expected[i]));
        }
        this.processed.clear();
    }

    public void checkAndClearProcessedRecords(Record<?, ?> ... expected) {
        MatcherAssert.assertThat((String)("the number of outputs:" + this.processed), (Object)this.processed.size(), (Matcher)Matchers.is((Object)expected.length));
        for (int i = 0; i < expected.length; ++i) {
            MatcherAssert.assertThat((String)("output[" + i + "]:"), this.processed.get(i), (Matcher)Matchers.is(expected[i]));
        }
        this.processed.clear();
    }

    public void requestCommit() {
        this.commitRequested = true;
    }

    public void checkEmptyAndClearProcessResult() {
        MatcherAssert.assertThat((String)"the number of outputs:", (Object)this.processed.size(), (Matcher)Matchers.is((Object)0));
        this.processed.clear();
    }

    public void checkAndClearPunctuateResult(PunctuationType type, long ... expected) {
        ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? this.punctuatedStreamTime : this.punctuatedSystemTime;
        MatcherAssert.assertThat((String)"the number of outputs:", (Object)punctuated.size(), (Matcher)Matchers.is((Object)expected.length));
        for (int i = 0; i < expected.length; ++i) {
            MatcherAssert.assertThat((String)("output[" + i + "]:"), (Object)punctuated.get(i), (Matcher)Matchers.is((Object)expected[i]));
        }
        this.processed.clear();
    }

    public void addProcessorMetadata(String key, long value) {
        if (this.context instanceof InternalProcessorContext) {
            ((InternalProcessorContext)this.context).addProcessorMetadataKeyValue(key, value);
        }
    }

    public ArrayList<KeyValueTimestamp<KIn, VIn>> processed() {
        return this.processed.stream().map(r -> new KeyValueTimestamp<Object, Object>(r.key(), r.value(), r.timestamp())).collect(Collectors.toCollection(ArrayList::new));
    }

    public Map<KIn, ValueAndTimestamp<VIn>> lastValueAndTimestampPerKey() {
        return this.lastValueAndTimestampPerKey;
    }

    public List<Long> punctuatedStreamTime() {
        return this.punctuatedStreamTime;
    }

    public Cancellable scheduleCancellable() {
        return this.scheduleCancellable;
    }

    public ProcessorContext<KOut, VOut> context() {
        return this.context;
    }

    public void context(ProcessorContext<KOut, VOut> context) {
        this.context = context;
    }
}

