/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.samza.operators.MessageStream;

class FlattenPCollectionsTranslator<T>
implements TransformTranslator<Flatten.PCollections<T>> {
    FlattenPCollectionsTranslator() {
    }

    @Override
    public void translate(Flatten.PCollections<T> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        PCollection output = (PCollection)ctx.getOutput(transform);
        ArrayList inputStreams = new ArrayList();
        for (Map.Entry taggedPValue : node.getInputs().entrySet()) {
            if (!(taggedPValue.getValue() instanceof PCollection)) {
                throw new IllegalArgumentException(String.format("Got non-PCollection input for flatten. Tag: %s. Input: %s. Type: %s", taggedPValue.getKey(), taggedPValue.getValue(), ((PValue)taggedPValue.getValue()).getClass()));
            }
            PCollection input = (PCollection)taggedPValue.getValue();
            inputStreams.add(ctx.getMessageStream((PValue)input));
        }
        if (inputStreams.size() == 0) {
            MessageStream noOpStream = ctx.getDummyStream().flatMap(OpAdapter.adapt((inputElement, emitter) -> {}));
            ctx.registerMessageStream((PValue)output, noOpStream);
            return;
        }
        if (inputStreams.size() == 1) {
            ctx.registerMessageStream((PValue)output, (MessageStream)inputStreams.get(0));
            return;
        }
        HashSet streamsToMerge = new HashSet();
        inputStreams.forEach(stream -> {
            boolean inserted = streamsToMerge.add(stream);
            if (!inserted) {
                streamsToMerge.add(stream.map(m -> m));
            }
        });
        MessageStream outputStream = MessageStream.mergeAll(streamsToMerge);
        ctx.registerMessageStream((PValue)output, outputStream);
    }
}

