/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class EosBetaUpgradeIntegrationTest {
    @Parameterized.Parameter
    public boolean injectError;
    private static final int NUM_BROKERS = 3;
    private static final int MAX_POLL_INTERVAL_MS = 100000;
    private static final int MAX_WAIT_TIME_MS = 60000;
    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CLOSE = Collections.unmodifiableList(Arrays.asList(KeyValue.pair((Object)KafkaStreams.State.RUNNING, (Object)KafkaStreams.State.PENDING_SHUTDOWN), KeyValue.pair((Object)KafkaStreams.State.PENDING_SHUTDOWN, (Object)KafkaStreams.State.NOT_RUNNING)));
    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CRASH = Collections.unmodifiableList(Collections.singletonList(KeyValue.pair((Object)KafkaStreams.State.RUNNING, (Object)KafkaStreams.State.ERROR)));
    private static final List<KeyValue<KafkaStreams.State, KafkaStreams.State>> CLOSE_CRASHED = Collections.unmodifiableList(Arrays.asList(KeyValue.pair((Object)KafkaStreams.State.ERROR, (Object)KafkaStreams.State.PENDING_SHUTDOWN), KeyValue.pair((Object)KafkaStreams.State.PENDING_SHUTDOWN, (Object)KafkaStreams.State.NOT_RUNNING)));
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")));
    private static String applicationId;
    private static final int NUM_TOPIC_PARTITIONS = 4;
    private static final String CONSUMER_GROUP_ID = "readCommitted";
    private static final String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
    private static final String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
    private final String storeName = "store";
    private final IntegrationTestUtils.StableAssignmentListener assignmentListener = new IntegrationTestUtils.StableAssignmentListener();
    private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false);
    private final AtomicBoolean errorInjectedClient2 = new AtomicBoolean(false);
    private final AtomicBoolean commitErrorInjectedClient1 = new AtomicBoolean(false);
    private final AtomicBoolean commitErrorInjectedClient2 = new AtomicBoolean(false);
    private final AtomicInteger commitCounterClient1 = new AtomicInteger(-1);
    private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
    private final AtomicInteger commitRequested = new AtomicInteger(0);
    private final AtomicBoolean requestCommit = new AtomicBoolean(false);
    private Throwable uncaughtException;
    private int testNumber = 0;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Boolean[]> data() {
        return Arrays.asList({false}, {true});
    }

    @Before
    public void createTopics() throws Exception {
        applicationId = "appId-" + ++this.testNumber;
        CLUSTER.deleteTopicsAndWait(MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC, applicationId + "-" + "store" + "-changelog");
        CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, 4, 1);
        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, 4, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
        LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>> stateTransitions1 = new LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>>();
        KafkaStreams streams1Alpha = null;
        KafkaStreams streams1Beta = null;
        KafkaStreams streams1BetaTwo = null;
        LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>> stateTransitions2 = new LinkedList<KeyValue<KafkaStreams.State, KafkaStreams.State>>();
        KafkaStreams streams2Alpha = null;
        KafkaStreams streams2AlphaTwo = null;
        KafkaStreams streams2Beta = null;
        try {
            List<KeyValue<Long, Long>> expectedCommittedResultAfterFailure;
            List<KeyValue<Long, Long>> dataFailingKey;
            HashMap<Long, Long> uncommittedState;
            Long failingKey;
            Object otherKey;
            Iterator<Long> it;
            List<KeyValue<Long, Long>> expectedCommittedResultBeforeFailure;
            Set<Long> keysSecondClient;
            Set<Long> keysFirstClient;
            streams1Alpha = this.getKafkaStreams("appDir1", "exactly_once");
            streams1Alpha.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair((Object)oldState, (Object)newState)));
            this.assignmentListener.prepareForRebalance();
            streams1Alpha.cleanUp();
            streams1Alpha.start();
            this.assignmentListener.waitForNextStableAssignment(60000L);
            this.waitForRunning(stateTransitions1);
            streams2Alpha = this.getKafkaStreams("appDir2", "exactly_once");
            streams2Alpha.setStateListener((newState, oldState) -> stateTransitions2.add(KeyValue.pair((Object)oldState, (Object)newState)));
            stateTransitions1.clear();
            this.assignmentListener.prepareForRebalance();
            streams2Alpha.cleanUp();
            streams2Alpha.start();
            this.assignmentListener.waitForNextStableAssignment(60000L);
            this.waitForRunning(stateTransitions1);
            this.waitForRunning(stateTransitions2);
            List<KeyValue<Long, Long>> committedInputDataBeforeUpgrade = this.prepareData(0L, 10L, 0L, 1L, 2L, 3L);
            this.writeInputData(committedInputDataBeforeUpgrade);
            TestUtils.waitForCondition(() -> this.commitRequested.get() == 4, (long)60000L, (String)"SteamsTasks did not request commit.");
            HashMap<Long, Long> committedState = new HashMap<Long, Long>();
            List<KeyValue<Long, Long>> expectedUncommittedResult = this.computeExpectedResult(committedInputDataBeforeUpgrade, committedState);
            this.verifyCommitted(expectedUncommittedResult);
            Set cleanKeys = Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L});
            Set<Long> keyFilterFirstClient = this.keysFromInstance(streams1Alpha);
            long potentiallyFirstFailingKey = keyFilterFirstClient.iterator().next();
            cleanKeys.remove(potentiallyFirstFailingKey);
            LinkedList<KeyValue<Long, Long>> uncommittedInputDataBeforeFirstUpgrade = new LinkedList<KeyValue<Long, Long>>();
            if (!this.injectError) {
                uncommittedInputDataBeforeFirstUpgrade.addAll(this.prepareData(10L, 15L, 0L, 1L, 2L, 3L));
                this.writeInputData(uncommittedInputDataBeforeFirstUpgrade);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade, new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
            } else {
                LinkedList<KeyValue<Long, Long>> uncommittedInputDataWithoutFailingKey = new LinkedList<KeyValue<Long, Long>>();
                Iterator iterator = cleanKeys.iterator();
                while (iterator.hasNext()) {
                    long key = (Long)iterator.next();
                    uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(10L, 15L, key));
                }
                uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(10L, 14L, potentiallyFirstFailingKey));
                uncommittedInputDataBeforeFirstUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
                this.writeInputData(uncommittedInputDataWithoutFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataWithoutFailingKey, new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
            }
            stateTransitions2.clear();
            this.assignmentListener.prepareForRebalance();
            if (!this.injectError) {
                stateTransitions1.clear();
                streams1Alpha.close();
                this.waitForStateTransition(stateTransitions1, CLOSE);
            } else {
                this.errorInjectedClient1.set(true);
                List<KeyValue<Long, Long>> dataPotentiallyFirstFailingKey = this.prepareData(14L, 15L, potentiallyFirstFailingKey);
                uncommittedInputDataBeforeFirstUpgrade.addAll(dataPotentiallyFirstFailingKey);
                this.writeInputData(dataPotentiallyFirstFailingKey);
            }
            this.assignmentListener.waitForNextStableAssignment(60000L);
            this.waitForRunning(stateTransitions2);
            if (!this.injectError) {
                List<KeyValue<Long, Long>> committedInputDataDuringFirstUpgrade = uncommittedInputDataBeforeFirstUpgrade.stream().filter(pair -> keyFilterFirstClient.contains(pair.key)).collect(Collectors.toList());
                List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataDuringFirstUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResult);
            } else {
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade.stream().filter(pair -> keyFilterFirstClient.contains(pair.key)).collect(Collectors.toList()), new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
                this.errorInjectedClient1.set(false);
                stateTransitions1.clear();
                streams1Alpha.close();
                this.waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
            }
            this.requestCommit.set(true);
            TestUtils.waitForCondition(() -> !this.requestCommit.get(), (String)"Punctuator did not request commit for running client");
            this.commitRequested.set(0);
            stateTransitions1.clear();
            stateTransitions2.clear();
            streams1Beta = this.getKafkaStreams("appDir1", "exactly_once_beta");
            streams1Beta.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair((Object)oldState, (Object)newState)));
            this.assignmentListener.prepareForRebalance();
            streams1Beta.start();
            this.assignmentListener.waitForNextStableAssignment(60000L);
            this.waitForRunning(stateTransitions1);
            this.waitForRunning(stateTransitions2);
            Set committedKeys = Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L});
            if (!this.injectError) {
                committedKeys.removeAll(keyFilterFirstClient);
            }
            List<KeyValue<Long, Long>> expectedCommittedResultAfterRestartFirstClient = this.computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade.stream().filter(pair -> committedKeys.contains(pair.key)).collect(Collectors.toList()), committedState);
            this.verifyCommitted(expectedCommittedResultAfterRestartFirstClient);
            this.commitCounterClient1.set(0);
            if (!this.injectError) {
                List<KeyValue<Long, Long>> committedInputDataDuringUpgrade = this.prepareData(15L, 20L, 0L, 1L, 2L, 3L);
                this.writeInputData(committedInputDataDuringUpgrade);
                List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataDuringUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResult);
                expectedUncommittedResult.addAll(expectedCommittedResult);
            } else {
                keysFirstClient = this.keysFromInstance(streams1Beta);
                keysSecondClient = this.keysFromInstance(streams2Alpha);
                List<KeyValue<Long, Long>> committedInputDataAfterFirstUpgrade = this.prepareData(15L, 20L, keysFirstClient.toArray(new Long[0]));
                this.writeInputData(committedInputDataAfterFirstUpgrade);
                expectedCommittedResultBeforeFailure = this.computeExpectedResult(committedInputDataAfterFirstUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResultBeforeFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultBeforeFailure);
                this.commitCounterClient2.set(0);
                it = keysSecondClient.iterator();
                otherKey = it.next();
                failingKey = it.next();
                List<KeyValue<Long, Long>> uncommittedInputDataAfterFirstUpgrade = this.prepareData(15L, 19L, keysSecondClient.toArray(new Long[0]));
                uncommittedInputDataAfterFirstUpgrade.addAll(this.prepareData(19L, 20L, new Long[]{otherKey}));
                this.writeInputData(uncommittedInputDataAfterFirstUpgrade);
                uncommittedState = new HashMap<Long, Long>(committedState);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                stateTransitions1.clear();
                stateTransitions2.clear();
                this.assignmentListener.prepareForRebalance();
                this.commitCounterClient1.set(0);
                this.commitErrorInjectedClient2.set(true);
                dataFailingKey = this.prepareData(19L, 20L, failingKey);
                uncommittedInputDataAfterFirstUpgrade.addAll(dataFailingKey);
                this.writeInputData(dataFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(dataFailingKey, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                this.assignmentListener.waitForNextStableAssignment(60000L);
                this.waitForStateTransition(stateTransitions2, CRASH);
                this.commitErrorInjectedClient2.set(false);
                stateTransitions2.clear();
                streams2Alpha.close();
                this.waitForStateTransition(stateTransitions2, CLOSE_CRASHED);
                expectedCommittedResultAfterFailure = this.computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResultAfterFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultAfterFailure);
            }
            if (!this.injectError) {
                streams2AlphaTwo = streams2Alpha;
            } else {
                this.commitCounterClient1.set(0);
                this.commitCounterClient2.set(-1);
                stateTransitions1.clear();
                stateTransitions2.clear();
                streams2AlphaTwo = this.getKafkaStreams("appDir2", "exactly_once");
                streams2AlphaTwo.setStateListener((newState, oldState) -> stateTransitions2.add(KeyValue.pair((Object)oldState, (Object)newState)));
                this.assignmentListener.prepareForRebalance();
                streams2AlphaTwo.start();
                this.assignmentListener.waitForNextStableAssignment(60000L);
                this.waitForRunning(stateTransitions1);
                this.waitForRunning(stateTransitions2);
                keysFirstClient = this.keysFromInstance(streams1Beta);
                keysSecondClient = this.keysFromInstance(streams2AlphaTwo);
                List<KeyValue<Long, Long>> committedInputDataBetweenUpgrades = this.prepareData(20L, 30L, keysSecondClient.toArray(new Long[0]));
                this.writeInputData(committedInputDataBetweenUpgrades);
                expectedCommittedResultBeforeFailure = this.computeExpectedResult(committedInputDataBetweenUpgrades, committedState);
                this.verifyCommitted(expectedCommittedResultBeforeFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultBeforeFailure);
                this.commitCounterClient2.set(0);
                it = keysFirstClient.iterator();
                otherKey = it.next();
                failingKey = it.next();
                List<KeyValue<Long, Long>> uncommittedInputDataBetweenUpgrade = this.prepareData(20L, 29L, keysFirstClient.toArray(new Long[0]));
                uncommittedInputDataBetweenUpgrade.addAll(this.prepareData(29L, 30L, new Long[]{otherKey}));
                this.writeInputData(uncommittedInputDataBetweenUpgrade);
                uncommittedState = new HashMap<Long, Long>(committedState);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBetweenUpgrade, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                stateTransitions1.clear();
                stateTransitions2.clear();
                this.assignmentListener.prepareForRebalance();
                this.commitCounterClient2.set(0);
                this.commitErrorInjectedClient1.set(true);
                dataFailingKey = this.prepareData(29L, 30L, failingKey);
                uncommittedInputDataBetweenUpgrade.addAll(dataFailingKey);
                this.writeInputData(dataFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(dataFailingKey, uncommittedState));
                this.verifyUncommitted(expectedUncommittedResult);
                this.assignmentListener.waitForNextStableAssignment(60000L);
                this.waitForStateTransition(stateTransitions1, CRASH);
                this.commitErrorInjectedClient1.set(false);
                stateTransitions1.clear();
                streams1Beta.close();
                this.waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
                expectedCommittedResultAfterFailure = this.computeExpectedResult(uncommittedInputDataBetweenUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResultAfterFailure);
                expectedUncommittedResult.addAll(expectedCommittedResultAfterFailure);
                stateTransitions1.clear();
                stateTransitions2.clear();
                streams1BetaTwo = this.getKafkaStreams("appDir1", "exactly_once_beta");
                streams1BetaTwo.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair((Object)oldState, (Object)newState)));
                this.assignmentListener.prepareForRebalance();
                streams1BetaTwo.start();
                this.assignmentListener.waitForNextStableAssignment(60000L);
                this.waitForRunning(stateTransitions1);
                this.waitForRunning(stateTransitions2);
            }
            cleanKeys.addAll(Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L}));
            Set<Long> keyFilterSecondClient = this.keysFromInstance(streams2AlphaTwo);
            long potentiallySecondFailingKey = keyFilterSecondClient.iterator().next();
            cleanKeys.remove(potentiallySecondFailingKey);
            LinkedList<KeyValue<Long, Long>> uncommittedInputDataBeforeSecondUpgrade = new LinkedList<KeyValue<Long, Long>>();
            if (!this.injectError) {
                uncommittedInputDataBeforeSecondUpgrade.addAll(this.prepareData(30L, 35L, 0L, 1L, 2L, 3L));
                this.writeInputData(uncommittedInputDataBeforeSecondUpgrade);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeSecondUpgrade, new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
            } else {
                LinkedList<KeyValue<Long, Long>> uncommittedInputDataWithoutFailingKey = new LinkedList<KeyValue<Long, Long>>();
                otherKey = cleanKeys.iterator();
                while (otherKey.hasNext()) {
                    long key = (Long)otherKey.next();
                    uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(30L, 35L, key));
                }
                uncommittedInputDataWithoutFailingKey.addAll(this.prepareData(30L, 34L, potentiallySecondFailingKey));
                uncommittedInputDataBeforeSecondUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
                this.writeInputData(uncommittedInputDataWithoutFailingKey);
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataWithoutFailingKey, new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
            }
            stateTransitions1.clear();
            this.assignmentListener.prepareForRebalance();
            if (!this.injectError) {
                stateTransitions2.clear();
                streams2AlphaTwo.close();
                this.waitForStateTransition(stateTransitions2, CLOSE);
            } else {
                this.errorInjectedClient2.set(true);
                List<KeyValue<Long, Long>> dataPotentiallySecondFailingKey = this.prepareData(34L, 35L, potentiallySecondFailingKey);
                uncommittedInputDataBeforeSecondUpgrade.addAll(dataPotentiallySecondFailingKey);
                this.writeInputData(dataPotentiallySecondFailingKey);
            }
            this.assignmentListener.waitForNextStableAssignment(60000L);
            this.waitForRunning(stateTransitions1);
            if (!this.injectError) {
                List<KeyValue<Long, Long>> committedInputDataDuringSecondUpgrade = uncommittedInputDataBeforeSecondUpgrade.stream().filter(pair -> keyFilterSecondClient.contains(pair.key)).collect(Collectors.toList());
                List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataDuringSecondUpgrade, committedState);
                this.verifyCommitted(expectedCommittedResult);
            } else {
                expectedUncommittedResult.addAll(this.computeExpectedResult(uncommittedInputDataBeforeSecondUpgrade.stream().filter(pair -> keyFilterSecondClient.contains(pair.key)).collect(Collectors.toList()), new HashMap<Long, Long>(committedState)));
                this.verifyUncommitted(expectedUncommittedResult);
                this.errorInjectedClient2.set(false);
                stateTransitions2.clear();
                streams2AlphaTwo.close();
                this.waitForStateTransition(stateTransitions2, CLOSE_CRASHED);
            }
            this.requestCommit.set(true);
            TestUtils.waitForCondition(() -> !this.requestCommit.get(), (String)"Punctuator did not request commit for running client");
            this.commitRequested.set(0);
            stateTransitions1.clear();
            stateTransitions2.clear();
            streams2Beta = this.getKafkaStreams("appDir1", "exactly_once_beta");
            streams2Beta.setStateListener((newState, oldState) -> stateTransitions2.add(KeyValue.pair((Object)oldState, (Object)newState)));
            this.assignmentListener.prepareForRebalance();
            streams2Beta.start();
            this.assignmentListener.waitForNextStableAssignment(60000L);
            this.waitForRunning(stateTransitions1);
            this.waitForRunning(stateTransitions2);
            committedKeys.addAll(Utils.mkSet((Object[])new Long[]{0L, 1L, 2L, 3L}));
            if (!this.injectError) {
                committedKeys.removeAll(keyFilterSecondClient);
            }
            List<KeyValue<Long, Long>> expectedCommittedResultAfterRestartSecondClient = this.computeExpectedResult(uncommittedInputDataBeforeSecondUpgrade.stream().filter(pair -> committedKeys.contains(pair.key)).collect(Collectors.toList()), committedState);
            this.verifyCommitted(expectedCommittedResultAfterRestartSecondClient);
            this.commitCounterClient1.set(-1);
            this.commitCounterClient2.set(-1);
            List<KeyValue<Long, Long>> committedInputDataAfterUpgrade = this.prepareData(35L, 40L, 0L, 1L, 2L, 3L);
            this.writeInputData(committedInputDataAfterUpgrade);
            List<KeyValue<Long, Long>> expectedCommittedResult = this.computeExpectedResult(committedInputDataAfterUpgrade, committedState);
            this.verifyCommitted(expectedCommittedResult);
        }
        finally {
            if (streams1Alpha != null) {
                streams1Alpha.close();
            }
            if (streams1Beta != null) {
                streams1Beta.close();
            }
            if (streams1BetaTwo != null) {
                streams1BetaTwo.close();
            }
            if (streams2Alpha != null) {
                streams2Alpha.close();
            }
            if (streams2AlphaTwo != null) {
                streams2AlphaTwo.close();
            }
            if (streams2Beta != null) {
                streams2Beta.close();
            }
        }
    }

    private KafkaStreams getKafkaStreams(String appDir, String processingGuarantee) {
        StreamsBuilder builder = new StreamsBuilder();
        String[] storeNames = new String[]{"store"};
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"), (Serde)Serdes.Long(), (Serde)Serdes.Long()).withCachingEnabled();
        builder.addStateStore(storeBuilder);
        KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
        input.transform((TransformerSupplier)new TransformerSupplier<Long, Long, KeyValue<Long, Long>>(){

            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
                return new Transformer<Long, Long, KeyValue<Long, Long>>(){
                    ProcessorContext context;
                    KeyValueStore<Long, Long> state = null;
                    AtomicBoolean crash;
                    AtomicInteger sharedCommit;
                    Cancellable punctuator;

                    public void init(ProcessorContext context) {
                        this.context = context;
                        this.state = (KeyValueStore)context.getStateStore("store");
                        String clientId = context.appConfigs().get("client.id").toString();
                        if ("appDir1".equals(clientId)) {
                            this.crash = EosBetaUpgradeIntegrationTest.this.errorInjectedClient1;
                            this.sharedCommit = EosBetaUpgradeIntegrationTest.this.commitCounterClient1;
                        } else {
                            this.crash = EosBetaUpgradeIntegrationTest.this.errorInjectedClient2;
                            this.sharedCommit = EosBetaUpgradeIntegrationTest.this.commitCounterClient2;
                        }
                        this.punctuator = context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, (Punctuator)new CommitPunctuator(context, EosBetaUpgradeIntegrationTest.this.requestCommit));
                    }

                    public KeyValue<Long, Long> transform(Long key, Long value) {
                        Long sum;
                        if ((value + 1L) % 10L == 0L) {
                            if (this.sharedCommit.get() < 0 || this.sharedCommit.incrementAndGet() == 2) {
                                this.context.commit();
                            }
                            EosBetaUpgradeIntegrationTest.this.commitRequested.incrementAndGet();
                        }
                        sum = (sum = (Long)this.state.get((Object)key)) == null ? value : Long.valueOf(sum + value);
                        this.state.put((Object)key, (Object)sum);
                        this.state.flush();
                        if (value % 10L == 4L && this.crash != null && this.crash.compareAndSet(true, false)) {
                            throw new RuntimeException("Injected test exception.");
                        }
                        return new KeyValue((Object)key, this.state.get((Object)key));
                    }

                    public void close() {
                        this.punctuator.cancel();
                    }
                };
            }
        }, storeNames).to(MULTI_PARTITION_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("client.id", appDir);
        properties.put("processing.guarantee", processingGuarantee);
        properties.put("commit.interval.ms", (Object)Long.MAX_VALUE);
        properties.put(StreamsConfig.consumerPrefix((String)"metadata.max.age.ms"), "1000");
        properties.put(StreamsConfig.consumerPrefix((String)"auto.offset.reset"), "earliest");
        properties.put(StreamsConfig.consumerPrefix((String)"request.timeout.ms"), (Object)5000);
        properties.put(StreamsConfig.consumerPrefix((String)"session.timeout.ms"), (Object)4999);
        properties.put(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), (Object)100000);
        properties.put(StreamsConfig.producerPrefix((String)"partitioner.class"), KeyPartitioner.class);
        properties.put("cache.max.bytes.buffering", (Object)0);
        properties.put("state.dir", TestUtils.tempDirectory().getPath() + File.separator + appDir);
        properties.put("__assignment.listener__", this.assignmentListener);
        Properties config = StreamsTestUtils.getStreamsConfig(applicationId, CLUSTER.bootstrapServers(), Serdes.LongSerde.class.getName(), Serdes.LongSerde.class.getName(), properties);
        KafkaStreams streams = new KafkaStreams(builder.build(), config, (KafkaClientSupplier)new TestKafkaClientSupplier());
        streams.setUncaughtExceptionHandler((t, e) -> {
            if (this.uncaughtException != null) {
                e.printStackTrace(System.err);
                Assert.fail((String)"Should only get one uncaught exception from Streams.");
            }
            this.uncaughtException = e;
        });
        return streams;
    }

    private void waitForRunning(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed) throws Exception {
        TestUtils.waitForCondition(() -> !observed.isEmpty() && ((KafkaStreams.State)((KeyValue)observed.get((int)(observed.size() - 1))).value).equals((Object)KafkaStreams.State.RUNNING), (long)60000L, () -> "Client did not startup on time. Observers transitions: " + observed);
    }

    private void waitForStateTransition(List<KeyValue<KafkaStreams.State, KafkaStreams.State>> observed, List<KeyValue<KafkaStreams.State, KafkaStreams.State>> expected) throws Exception {
        TestUtils.waitForCondition(() -> observed.equals(expected), (long)60000L, () -> "Client did not startup on time. Observers transitions: " + observed);
    }

    private List<KeyValue<Long, Long>> prepareData(long fromInclusive, long toExclusive, Long ... keys) {
        ArrayList<KeyValue<Long, Long>> data = new ArrayList<KeyValue<Long, Long>>();
        for (Long k : keys) {
            for (long v = fromInclusive; v < toExclusive; ++v) {
                data.add((KeyValue<Long, Long>)new KeyValue((Object)k, (Object)v));
            }
        }
        return data;
    }

    private void writeInputData(List<KeyValue<Long, Long>> records) {
        Properties config = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class);
        config.setProperty("partitioner.class", KeyPartitioner.class.getName());
        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC, records, config, (Time)EosBetaUpgradeIntegrationTest.CLUSTER.time);
    }

    private void verifyCommitted(List<KeyValue<Long, Long>> expectedResult) throws Exception {
        List<KeyValue<Long, Long>> committedOutput = this.readResult(expectedResult.size(), true);
        this.checkResultPerKey(committedOutput, expectedResult);
    }

    private void verifyUncommitted(List<KeyValue<Long, Long>> expectedResult) throws Exception {
        List<KeyValue<Long, Long>> uncommittedOutput = this.readResult(expectedResult.size(), false);
        this.checkResultPerKey(uncommittedOutput, expectedResult);
    }

    private List<KeyValue<Long, Long>> readResult(int numberOfRecords, boolean readCommitted) throws Exception {
        if (readCommitted) {
            return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)CONSUMER_GROUP_ID, LongDeserializer.class, LongDeserializer.class, (Properties)Utils.mkProperties(Collections.singletonMap("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))), MULTI_PARTITION_OUTPUT_TOPIC, numberOfRecords);
        }
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class), MULTI_PARTITION_OUTPUT_TOPIC, numberOfRecords);
    }

    private void checkResultPerKey(List<KeyValue<Long, Long>> result, List<KeyValue<Long, Long>> expectedResult) {
        HashSet<Long> allKeys = new HashSet<Long>();
        this.addAllKeys(allKeys, result);
        this.addAllKeys(allKeys, expectedResult);
        for (Long key : allKeys) {
            MatcherAssert.assertThat(this.getAllRecordPerKey(key, result), (Matcher)CoreMatchers.equalTo(this.getAllRecordPerKey(key, expectedResult)));
        }
    }

    private void addAllKeys(Set<Long> allKeys, List<KeyValue<Long, Long>> records) {
        for (KeyValue<Long, Long> record : records) {
            allKeys.add((Long)record.key);
        }
    }

    private List<KeyValue<Long, Long>> getAllRecordPerKey(Long key, List<KeyValue<Long, Long>> records) {
        ArrayList<KeyValue<Long, Long>> recordsPerKey = new ArrayList<KeyValue<Long, Long>>(records.size());
        for (KeyValue<Long, Long> record : records) {
            if (!((Long)record.key).equals(key)) continue;
            recordsPerKey.add(record);
        }
        return recordsPerKey;
    }

    private List<KeyValue<Long, Long>> computeExpectedResult(List<KeyValue<Long, Long>> input, Map<Long, Long> currentState) {
        ArrayList<KeyValue<Long, Long>> expectedResult = new ArrayList<KeyValue<Long, Long>>(input.size());
        for (KeyValue<Long, Long> record : input) {
            long sum = currentState.getOrDefault(record.key, 0L);
            currentState.put((Long)record.key, sum + (Long)record.value);
            expectedResult.add((KeyValue<Long, Long>)new KeyValue(record.key, (Object)(sum + (Long)record.value)));
        }
        return expectedResult;
    }

    private Set<Long> keysFromInstance(KafkaStreams streams) throws Exception {
        ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(60000L, streams, StoreQueryParameters.fromNameAndType((String)"store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        HashSet<Long> keys = new HashSet<Long>();
        try (KeyValueIterator it = store.all();){
            while (it.hasNext()) {
                KeyValue row = (KeyValue)it.next();
                keys.add((Long)row.key);
            }
        }
        return keys;
    }

    private class ErrorInjector
    extends KafkaProducer<byte[], byte[]> {
        private final AtomicBoolean crash;

        public ErrorInjector(Map<String, Object> configs) {
            super(configs, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
            String clientId = configs.get("client.id").toString();
            this.crash = clientId.contains("appDir1") ? EosBetaUpgradeIntegrationTest.this.commitErrorInjectedClient1 : EosBetaUpgradeIntegrationTest.this.commitErrorInjectedClient2;
        }

        public void commitTransaction() throws ProducerFencedException {
            super.flush();
            if (this.crash.compareAndSet(true, false)) {
                throw new RuntimeException("Injected producer commit exception.");
            }
            super.commitTransaction();
        }
    }

    private class TestKafkaClientSupplier
    extends DefaultKafkaClientSupplier {
        private TestKafkaClientSupplier() {
        }

        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            return new ErrorInjector(config);
        }
    }

    public static class KeyPartitioner
    implements Partitioner {
        private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % 4;
        }

        public void close() {
        }

        public void configure(Map<String, ?> configs) {
        }
    }

    private static class CommitPunctuator
    implements Punctuator {
        final ProcessorContext context;
        final AtomicBoolean requestCommit;

        public CommitPunctuator(ProcessorContext context, AtomicBoolean requestCommit) {
            this.context = context;
            this.requestCommit = requestCommit;
        }

        public void punctuate(long timestamp) {
            if (this.requestCommit.get()) {
                this.context.commit();
                this.requestCommit.set(false);
            }
        }
    }
}

