/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.debezium.inbound;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.jspecify.annotations.Nullable;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.debezium.support.DefaultDebeziumHeaderMapper;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.HeaderMapper;
import org.springframework.util.Assert;

public class DebeziumMessageProducer
extends MessageProducerSupport {
    private final DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumEngineBuilder;
    private DebeziumEngine<ChangeEvent<byte[], byte[]>> debeziumEngine;
    private TaskExecutor taskExecutor;
    private String contentType = "application/json";
    private HeaderMapper<List<Header<Object>>> headerMapper = new DefaultDebeziumHeaderMapper();
    private boolean enableEmptyPayload = false;
    private boolean enableBatch = false;
    private volatile CountDownLatch lifecycleLatch = new CountDownLatch(0);

    public DebeziumMessageProducer(DebeziumEngine.Builder<ChangeEvent<byte[], byte[]>> debeziumBuilder) {
        Assert.notNull(debeziumBuilder, (String)"'debeziumBuilder' must not be null");
        this.debeziumEngineBuilder = debeziumBuilder;
    }

    public void setEnableBatch(boolean enable) {
        this.enableBatch = enable;
    }

    public void setEnableEmptyPayload(boolean enabled) {
        this.enableEmptyPayload = enabled;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        Assert.notNull((Object)taskExecutor, (String)"'taskExecutor' must not be null");
        this.taskExecutor = taskExecutor;
    }

    public void setContentType(String contentType) {
        Assert.hasText((String)contentType, (String)"'contentType' must not be empty");
        this.contentType = contentType;
    }

    public void setHeaderMapper(HeaderMapper<List<Header<Object>>> headerMapper) {
        Assert.notNull(headerMapper, (String)"'headerMapper' must not be null.");
        this.headerMapper = headerMapper;
    }

    public String getComponentType() {
        return "debezium:inbound-channel-adapter";
    }

    protected void onInit() {
        super.onInit();
        if (this.taskExecutor == null) {
            this.taskExecutor = new SimpleAsyncTaskExecutor(this.getComponentName() + "-thread-");
        }
        if (!this.enableBatch) {
            this.debeziumEngineBuilder.notifying(new StreamChangeEventConsumer());
        } else {
            this.debeziumEngineBuilder.notifying(new BatchChangeEventConsumer());
        }
        this.debeziumEngine = this.debeziumEngineBuilder.build();
    }

    protected void doStart() {
        if (this.lifecycleLatch.getCount() > 0L) {
            return;
        }
        this.lifecycleLatch = new CountDownLatch(1);
        this.taskExecutor.execute(() -> {
            try {
                this.debeziumEngine.run();
            }
            finally {
                this.lifecycleLatch.countDown();
            }
        });
    }

    protected void doStop() {
        try {
            this.debeziumEngine.close();
        }
        catch (IOException e) {
            this.logger.warn((Throwable)e, (CharSequence)"Debezium failed to close!");
        }
        try {
            if (!this.lifecycleLatch.await(5L, TimeUnit.SECONDS)) {
                throw new IllegalStateException("Failed to stop " + String.valueOf((Object)this));
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private <T> @Nullable Message<?> toMessage(ChangeEvent<T, T> changeEvent) {
        Object key = changeEvent.key();
        Optional payload = changeEvent.value();
        String destination = changeEvent.destination();
        if (payload == null && this.enableEmptyPayload) {
            payload = Optional.empty();
        }
        if (payload == null) {
            this.logger.info(() -> "Dropped null payload message for Change Event key: " + String.valueOf(key));
            return null;
        }
        return this.getMessageBuilderFactory().withPayload(payload).setHeader("debezium_key", key).setHeader("debezium_destination", (Object)destination).setHeader("contentType", (Object)this.contentType).copyHeaders((Map)this.headerMapper.toHeaders((Object)changeEvent.headers())).build();
    }

    final class StreamChangeEventConsumer<T>
    implements Consumer<ChangeEvent<T, T>> {
        StreamChangeEventConsumer() {
        }

        @Override
        public void accept(ChangeEvent<T, T> changeEvent) {
            Message<?> message = DebeziumMessageProducer.this.toMessage(changeEvent);
            if (message != null) {
                DebeziumMessageProducer.this.sendMessage(message);
            }
        }
    }

    final class BatchChangeEventConsumer<T>
    implements DebeziumEngine.ChangeConsumer<ChangeEvent<T, T>> {
        BatchChangeEventConsumer() {
        }

        public void handleBatch(List<ChangeEvent<T, T>> changeEvents, DebeziumEngine.RecordCommitter<ChangeEvent<T, T>> committer) throws InterruptedException {
            Message message = DebeziumMessageProducer.this.getMessageBuilderFactory().withPayload(changeEvents).build();
            DebeziumMessageProducer.this.sendMessage(message);
            for (ChangeEvent<T, T> event : changeEvents) {
                committer.markProcessed(event);
            }
            committer.markBatchFinished();
        }
    }
}

