/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.translation.SamzaPublishView;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.samza.operators.MessageStream;

class SamzaPublishViewTranslator<ElemT, ViewT>
implements TransformTranslator<SamzaPublishView<ElemT, ViewT>> {
    SamzaPublishViewTranslator() {
    }

    @Override
    public void translate(SamzaPublishView<ElemT, ViewT> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        PCollection input = (PCollection)ctx.getInput(transform);
        MessageStream inputStream = ctx.getMessageStream((PValue)input);
        Coder elementCoder = SamzaCoders.of(input);
        MessageStream elementStream = inputStream.filter(msg -> msg.getType() == OpMessage.Type.ELEMENT).map(OpMessage::getElement);
        MessageStream broadcastStream = ctx.getPipelineOptions().getMaxSourceParallelism() == 1 ? elementStream : elementStream.broadcast(SamzaCoders.toSerde(elementCoder), "view-" + ctx.getCurrentTopologicalId());
        MessageStream outputStream = broadcastStream.map(element -> OpMessage.ofSideInput(ctx.getViewId(transform.getView()), element));
        ctx.registerViewStream(transform.getView(), outputStream);
    }
}

