/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.cassandra.outbound;

import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.data.cassandra.core.InsertOptions;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.UpdateOptions;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.TypeLocator;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.integration.expression.ExpressionEvalMap;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CassandraMessageHandler
extends AbstractReplyProducingMessageHandler {
    private final Map<String, Expression> parameterExpressions = new HashMap<String, Expression>();
    private Type mode;
    private final ReactiveCassandraOperations cassandraOperations;
    private boolean producesReply;
    private @Nullable String ingestQuery;
    private WriteOptions writeOptions;
    private ReactiveSessionMessageCallback sessionMessageCallback;
    private EvaluationContext evaluationContext;

    public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations) {
        this(cassandraOperations, Type.INSERT);
    }

    public CassandraMessageHandler(ReactiveCassandraOperations cassandraOperations, Type queryType) {
        Assert.notNull((Object)cassandraOperations, (String)"'cassandraOperations' must not be null.");
        Assert.notNull((Object)((Object)queryType), (String)"'queryType' must not be null.");
        this.cassandraOperations = cassandraOperations;
        this.mode = queryType;
        this.setAsync(true);
        this.writeOptions = switch (this.mode.ordinal()) {
            default -> throw new IncompatibleClassChangeError();
            case 0 -> InsertOptions.empty();
            case 1 -> UpdateOptions.empty();
            case 2, 3 -> WriteOptions.empty();
        };
    }

    public void setIngestQuery(String ingestQuery) {
        Assert.hasText((String)ingestQuery, (String)"'ingestQuery' must not be empty");
        this.ingestQuery = ingestQuery;
        this.mode = Type.INSERT;
    }

    public void setWriteOptions(WriteOptions writeOptions) {
        Assert.notNull((Object)writeOptions, (String)"'writeOptions' must not be null");
        this.writeOptions = writeOptions;
    }

    public void setProducesReply(boolean producesReply) {
        this.producesReply = producesReply;
    }

    public void setStatementExpressionString(String statementExpression) {
        this.setStatementExpression(EXPRESSION_PARSER.parseExpression(statementExpression));
    }

    public void setStatementExpression(Expression statementExpression) {
        ExpressionEvaluatingMessageProcessor<Statement> expressionEvaluatingMessageProcessor = new ExpressionEvaluatingMessageProcessor<Statement>(statementExpression, Statement.class){

            protected StandardEvaluationContext getEvaluationContext() {
                return (StandardEvaluationContext)CassandraMessageHandler.this.evaluationContext;
            }
        };
        this.setStatementProcessor((MessageProcessor<Statement<?>>)expressionEvaluatingMessageProcessor);
    }

    public void setQuery(String query) {
        Assert.hasText((String)query, (String)"'query' must not be empty");
        this.sessionMessageCallback = (session, requestMessage) -> session.execute(query, (Map)ExpressionEvalMap.from(this.parameterExpressions).usingEvaluationContext(this.evaluationContext).withRoot((Object)requestMessage).build());
        this.mode = Type.STATEMENT;
    }

    public void setParameterExpressions(Map<String, Expression> parameterExpressions) {
        Assert.notEmpty(parameterExpressions, (String)"'parameterExpressions' must not be empty.");
        this.parameterExpressions.clear();
        this.parameterExpressions.putAll(parameterExpressions);
    }

    public void setStatementProcessor(MessageProcessor<Statement<?>> statementProcessor) {
        Assert.notNull(statementProcessor, (String)"'statementProcessor' must not be null.");
        this.sessionMessageCallback = (session, requestMessage) -> {
            Statement statement = (Statement)statementProcessor.processMessage(requestMessage);
            Assert.notNull((Object)statement, (String)"Statement must not be null");
            return session.execute(QueryOptionsUtil.addQueryOptions((Statement)statement, (QueryOptions)this.writeOptions));
        };
        this.mode = Type.STATEMENT;
    }

    public String getComponentType() {
        return "cassandra:outbound-" + (this.producesReply ? "gateway" : "channel-adapter");
    }

    protected void doInit() {
        super.doInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
        TypeLocator typeLocator = this.evaluationContext.getTypeLocator();
        if (typeLocator instanceof StandardTypeLocator) {
            StandardTypeLocator standardTypeLocator = (StandardTypeLocator)typeLocator;
            standardTypeLocator.registerImport(QueryBuilder.class.getPackage().getName());
        }
    }

    protected @Nullable Object handleRequestMessage(Message<?> requestMessage) {
        Mono<? extends WriteResult> result;
        Object payload = requestMessage.getPayload();
        Type modeToUse = payload instanceof Statement ? Type.STATEMENT : this.mode;
        switch (modeToUse.ordinal()) {
            default: {
                throw new IncompatibleClassChangeError();
            }
            case 0: {
                Mono<? extends WriteResult> mono = this.handleInsert(payload);
                break;
            }
            case 1: {
                Mono<? extends WriteResult> mono = this.handleUpdate(payload);
                break;
            }
            case 2: {
                Mono<? extends WriteResult> mono = this.handleDelete(payload);
                break;
            }
            case 3: {
                Mono<? extends WriteResult> mono = result = this.handleStatement(requestMessage);
            }
        }
        if (this.producesReply) {
            return this.isAsync() ? result : result.block();
        }
        if (this.isAsync()) {
            result.subscribe();
        } else {
            result.block();
        }
        return null;
    }

    private Mono<? extends WriteResult> handleInsert(Object payload) {
        String localIngestQuery = this.ingestQuery;
        if (localIngestQuery != null) {
            Assert.isInstanceOf(List.class, (Object)payload, (String)"to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
            List list = (List)payload;
            for (Object o : list) {
                Assert.isInstanceOf(List.class, o, (String)"to perform 'ingest' the 'payload' must be of 'List<List<?>>' type.");
            }
            List rows = (List)payload;
            return this.cassandraOperations.getReactiveCqlOperations().execute(session -> session.prepare((SimpleStatement)QueryOptionsUtil.addQueryOptions((Statement)SimpleStatement.newInstance((String)localIngestQuery), (QueryOptions)this.writeOptions)).flatMapMany(s -> Flux.fromIterable((Iterable)rows).map(row -> s.bind(row.toArray()))).collect(() -> new BatchStatementBuilder(BatchType.UNLOGGED), BatchStatementBuilder::addStatement).map(BatchStatementBuilder::build).flatMap(arg_0 -> ((ReactiveSession)session).execute(arg_0)).transform(this::transformToWriteResult)).next();
        }
        if (payload instanceof List) {
            return this.cassandraOperations.batchOps().insert((Iterable)((List)payload), this.writeOptions).execute();
        }
        return this.cassandraOperations.insert(payload, (InsertOptions)this.writeOptions);
    }

    private Mono<? extends WriteResult> handleUpdate(Object payload) {
        if (payload instanceof List) {
            return this.cassandraOperations.batchOps().update((Iterable)((List)payload), this.writeOptions).execute();
        }
        return this.cassandraOperations.update(payload, (UpdateOptions)this.writeOptions);
    }

    private Mono<WriteResult> handleDelete(Object payload) {
        if (payload instanceof List) {
            return this.cassandraOperations.batchOps().delete((Iterable)((List)payload)).execute();
        }
        return this.cassandraOperations.delete(payload, (QueryOptions)this.writeOptions);
    }

    private Mono<WriteResult> handleStatement(Message<?> requestMessage) {
        Object payload = requestMessage.getPayload();
        Mono resultSetMono = payload instanceof Statement ? this.cassandraOperations.getReactiveCqlOperations().queryForResultSet((Statement)payload) : this.cassandraOperations.getReactiveCqlOperations().execute(session -> this.sessionMessageCallback.doInSession(session, requestMessage)).next();
        return resultSetMono.transform(this::transformToWriteResult);
    }

    private Mono<WriteResult> transformToWriteResult(Mono<ReactiveResultSet> resultSetMono) {
        return resultSetMono.map(ReactiveWriteResult::new);
    }

    public static enum Type {
        INSERT,
        UPDATE,
        DELETE,
        STATEMENT;

    }

    @FunctionalInterface
    private static interface ReactiveSessionMessageCallback {
        public Mono<ReactiveResultSet> doInSession(ReactiveSession var1, Message<?> var2) throws DriverException, DataAccessException;
    }

    private static final class ReactiveWriteResult
    extends WriteResult {
        ReactiveWriteResult(ReactiveResultSet reactiveResultSet) {
            super(reactiveResultSet.getAllExecutionInfo(), reactiveResultSet.wasApplied(), StreamSupport.stream(reactiveResultSet.availableRows().toIterable().spliterator(), false).toList());
        }
    }
}

