/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.cassandra.core;

import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.Delete;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.querybuilder.Truncate;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import lombok.NonNull;
import org.reactivestreams.Publisher;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.ReactiveSession;
import org.springframework.data.cassandra.ReactiveSessionFactory;
import org.springframework.data.cassandra.core.EntityOperations;
import org.springframework.data.cassandra.core.EntityQueryUtils;
import org.springframework.data.cassandra.core.EntityWriteResult;
import org.springframework.data.cassandra.core.InsertOptions;
import org.springframework.data.cassandra.core.ReactiveCassandraBatchOperations;
import org.springframework.data.cassandra.core.ReactiveCassandraBatchTemplate;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import org.springframework.data.cassandra.core.ReactiveDeleteOperation;
import org.springframework.data.cassandra.core.ReactiveDeleteOperationSupport;
import org.springframework.data.cassandra.core.ReactiveInsertOperation;
import org.springframework.data.cassandra.core.ReactiveInsertOperationSupport;
import org.springframework.data.cassandra.core.ReactiveSelectOperation;
import org.springframework.data.cassandra.core.ReactiveSelectOperationSupport;
import org.springframework.data.cassandra.core.ReactiveUpdateOperation;
import org.springframework.data.cassandra.core.ReactiveUpdateOperationSupport;
import org.springframework.data.cassandra.core.StatementFactory;
import org.springframework.data.cassandra.core.UpdateOptions;
import org.springframework.data.cassandra.core.WriteResult;
import org.springframework.data.cassandra.core.convert.CassandraConverter;
import org.springframework.data.cassandra.core.convert.MappingCassandraConverter;
import org.springframework.data.cassandra.core.convert.QueryMapper;
import org.springframework.data.cassandra.core.convert.UpdateMapper;
import org.springframework.data.cassandra.core.cql.CassandraAccessor;
import org.springframework.data.cassandra.core.cql.CqlIdentifier;
import org.springframework.data.cassandra.core.cql.CqlProvider;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.ReactiveCqlOperations;
import org.springframework.data.cassandra.core.cql.ReactiveCqlTemplate;
import org.springframework.data.cassandra.core.cql.ReactiveSessionCallback;
import org.springframework.data.cassandra.core.cql.RowMapper;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.core.cql.session.DefaultReactiveSessionFactory;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.CassandraPersistentProperty;
import org.springframework.data.cassandra.core.mapping.event.AfterConvertEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterLoadEvent;
import org.springframework.data.cassandra.core.mapping.event.AfterSaveEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeDeleteEvent;
import org.springframework.data.cassandra.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.cassandra.core.mapping.event.CassandraMappingEvent;
import org.springframework.data.cassandra.core.mapping.event.ReactiveBeforeConvertCallback;
import org.springframework.data.cassandra.core.mapping.event.ReactiveBeforeSaveCallback;
import org.springframework.data.cassandra.core.query.Columns;
import org.springframework.data.cassandra.core.query.Query;
import org.springframework.data.cassandra.core.query.Update;
import org.springframework.data.convert.EntityWriter;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

public class ReactiveCassandraTemplate
implements ReactiveCassandraOperations,
ApplicationEventPublisherAware,
ApplicationContextAware {
    @Nullable
    private ApplicationEventPublisher eventPublisher;
    @Nullable
    private ReactiveEntityCallbacks entityCallbacks;
    private final CassandraConverter converter;
    private final EntityOperations entityOperations;
    private final ReactiveCqlOperations cqlOperations;
    private final SpelAwareProxyProjectionFactory projectionFactory;
    private final StatementFactory statementFactory;

    public ReactiveCassandraTemplate(ReactiveSession session) {
        this(session, (CassandraConverter)ReactiveCassandraTemplate.newConverter());
    }

    public ReactiveCassandraTemplate(ReactiveSession session, CassandraConverter converter) {
        this(new DefaultReactiveSessionFactory(session), converter);
    }

    public ReactiveCassandraTemplate(ReactiveSessionFactory sessionFactory, CassandraConverter converter) {
        this(new ReactiveCqlTemplate(sessionFactory), converter);
    }

    public ReactiveCassandraTemplate(ReactiveCqlOperations reactiveCqlOperations, CassandraConverter converter) {
        Assert.notNull((Object)reactiveCqlOperations, (String)"ReactiveCqlOperations must not be null");
        Assert.notNull((Object)converter, (String)"CassandraConverter must not be null");
        this.converter = converter;
        this.cqlOperations = reactiveCqlOperations;
        this.entityOperations = new EntityOperations((MappingContext<? extends CassandraPersistentEntity<?>, CassandraPersistentProperty>)converter.getMappingContext());
        this.projectionFactory = new SpelAwareProxyProjectionFactory();
        this.statementFactory = new StatementFactory(new QueryMapper(converter), new UpdateMapper(converter));
    }

    @Override
    public ReactiveCassandraBatchOperations batchOps() {
        return new ReactiveCassandraBatchTemplate(this);
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (this.entityCallbacks == null) {
            this.setEntityCallbacks(ReactiveEntityCallbacks.create((BeanFactory)applicationContext));
        }
        this.projectionFactory.setBeanFactory((BeanFactory)applicationContext);
        this.projectionFactory.setBeanClassLoader(applicationContext.getClassLoader());
    }

    public void setEntityCallbacks(@Nullable ReactiveEntityCallbacks entityCallbacks) {
        this.entityCallbacks = entityCallbacks;
    }

    @Override
    public CassandraConverter getConverter() {
        return this.converter;
    }

    protected EntityOperations getEntityOperations() {
        return this.entityOperations;
    }

    protected SpelAwareProxyProjectionFactory getProjectionFactory() {
        return this.projectionFactory;
    }

    @Override
    public ReactiveCqlOperations getReactiveCqlOperations() {
        return this.cqlOperations;
    }

    private CassandraPersistentEntity<?> getRequiredPersistentEntity(Class<?> entityType) {
        return this.getEntityOperations().getRequiredPersistentEntity(entityType);
    }

    protected StatementFactory getStatementFactory() {
        return this.statementFactory;
    }

    CqlIdentifier getTableName(Class<?> entityClass) {
        return this.getEntityOperations().getTableName(entityClass);
    }

    @Override
    public <T> Flux<T> select(String cql, Class<T> entityClass) {
        Assert.hasText((String)cql, (String)"CQL must not be empty");
        return this.select((Statement)new SimpleStatement(cql), entityClass);
    }

    @Override
    public <T> Mono<T> selectOne(String cql, Class<T> entityClass) {
        return this.select(cql, entityClass).next();
    }

    @Override
    public <T> Flux<T> select(Statement statement, Class<T> entityClass) {
        Assert.notNull((Object)statement, (String)"Statement must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        Function mapper = this.getMapper(entityClass, entityClass, EntityQueryUtils.getTableName(statement));
        return this.getReactiveCqlOperations().query(statement, (row, rowNum) -> mapper.apply(row));
    }

    @Override
    public <T> Mono<T> selectOne(Statement statement, Class<T> entityClass) {
        return this.select(statement, entityClass).next();
    }

    @Override
    public <T> Mono<Slice<T>> slice(Statement statement, Class<T> entityClass) {
        Assert.notNull((Object)statement, (String)"Statement must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        Mono<ReactiveResultSet> resultSetMono = this.getReactiveCqlOperations().queryForResultSet(statement);
        Mono<Integer> effectiveFetchSizeMono = this.getEffectiveFetchSize(statement);
        RowMapper<Object> rowMapper = (row, i) -> this.getConverter().read(entityClass, row);
        return resultSetMono.zipWith(effectiveFetchSizeMono).flatMap(tuple -> {
            ReactiveResultSet resultSet = (ReactiveResultSet)tuple.getT1();
            Integer effectiveFetchSize = (Integer)tuple.getT2();
            return resultSet.availableRows().collectList().map(it -> EntityQueryUtils.readSlice(it, resultSet.getExecutionInfo().getPagingState(), rowMapper, 1, effectiveFetchSize));
        }).defaultIfEmpty((Object)new SliceImpl(Collections.emptyList()));
    }

    @Override
    public <T> Flux<T> select(Query query2, Class<T> entityClass) throws DataAccessException {
        Assert.notNull((Object)query2, (String)"Query must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        return this.doSelect(query2, entityClass, this.getTableName(entityClass), entityClass);
    }

    <T> Flux<T> doSelect(Query query2, Class<?> entityClass, CqlIdentifier tableName, Class<T> returnType) {
        CassandraPersistentEntity<?> persistentEntity = this.getRequiredPersistentEntity(entityClass);
        Columns columns = this.getStatementFactory().computeColumnsForProjection(query2.getColumns(), persistentEntity, returnType);
        Query queryToUse = query2.columns(columns);
        RegularStatement select2 = this.getStatementFactory().select(queryToUse, persistentEntity, tableName);
        Function mapper = this.getMapper(entityClass, returnType, tableName);
        return this.getReactiveCqlOperations().query((Statement)select2, (row, rowNum) -> mapper.apply(row));
    }

    @Override
    public <T> Mono<T> selectOne(Query query2, Class<T> entityClass) throws DataAccessException {
        Assert.notNull((Object)query2, (String)"Query must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        return this.select(query2, entityClass).next();
    }

    @Override
    public <T> Mono<Slice<T>> slice(Query query2, Class<T> entityClass) throws DataAccessException {
        Assert.notNull((Object)query2, (String)"Query must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        RegularStatement select2 = this.getStatementFactory().select(query2, this.getRequiredPersistentEntity(entityClass));
        return this.slice((Statement)select2, entityClass);
    }

    @Override
    public Mono<Boolean> update(Query query2, Update update, Class<?> entityClass) throws DataAccessException {
        Assert.notNull((Object)query2, (String)"Query must not be null");
        Assert.notNull((Object)update, (String)"Update must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        return this.doUpdate(query2, update, entityClass, this.getTableName(entityClass)).map(WriteResult::wasApplied);
    }

    Mono<WriteResult> doUpdate(Query query2, Update update, Class<?> entityClass, CqlIdentifier tableName) {
        RegularStatement statement = this.getStatementFactory().update(query2, update, this.getRequiredPersistentEntity(entityClass), tableName);
        return this.getReactiveCqlOperations().execute(new StatementCallback((Statement)statement)).next();
    }

    @Override
    public Mono<Boolean> delete(Query query2, Class<?> entityClass) throws DataAccessException {
        Assert.notNull((Object)query2, (String)"Query must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        return this.doDelete(query2, entityClass, this.getTableName(entityClass)).map(WriteResult::wasApplied);
    }

    Mono<WriteResult> doDelete(Query query2, Class<?> entityClass, CqlIdentifier tableName) {
        RegularStatement delete = this.getStatementFactory().delete(query2, this.getRequiredPersistentEntity(entityClass), tableName);
        Mono writeResult = this.getReactiveCqlOperations().execute(new StatementCallback((Statement)delete)).doOnSubscribe(it -> this.maybeEmitEvent(new BeforeDeleteEvent((Statement)delete, entityClass, tableName))).next();
        return writeResult.doOnNext(it -> this.maybeEmitEvent(new AfterDeleteEvent((Statement)delete, entityClass, tableName)));
    }

    @Override
    public Mono<Long> count(Class<?> entityClass) {
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        Select select2 = QueryBuilder.select().countAll().from(this.getTableName(entityClass).toCql());
        return this.getReactiveCqlOperations().queryForObject((Statement)select2, Long.class);
    }

    @Override
    public Mono<Long> count(Query query2, Class<?> entityClass) throws DataAccessException {
        Assert.notNull((Object)query2, (String)"Query must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        return this.doCount(query2, entityClass, this.getTableName(entityClass));
    }

    Mono<Long> doCount(Query query2, Class<?> entityClass, CqlIdentifier tableName) {
        RegularStatement count = this.getStatementFactory().count(query2, this.getRequiredPersistentEntity(entityClass), tableName);
        return this.getReactiveCqlOperations().queryForObject((Statement)count, Long.class).switchIfEmpty(Mono.just((Object)0L));
    }

    @Override
    public Mono<Boolean> exists(Object id, Class<?> entityClass) {
        Assert.notNull((Object)id, (String)"Id must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        CassandraPersistentEntity<?> entity = this.getRequiredPersistentEntity(entityClass);
        Select select2 = QueryBuilder.select().from(entity.getTableName().toCql());
        this.getConverter().write(id, select2.where(), entity);
        return this.getReactiveCqlOperations().queryForRows((Statement)select2).hasElements();
    }

    @Override
    public Mono<Boolean> exists(Query query2, Class<?> entityClass) throws DataAccessException {
        Assert.notNull((Object)query2, (String)"Query must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        return this.doExists(query2, entityClass, this.getTableName(entityClass));
    }

    Mono<Boolean> doExists(Query query2, Class<?> entityClass, CqlIdentifier tableName) {
        RegularStatement select2 = this.getStatementFactory().select(query2.limit(1L), this.getRequiredPersistentEntity(entityClass), tableName);
        return this.getReactiveCqlOperations().queryForRows((Statement)select2).hasElements();
    }

    @Override
    public <T> Mono<T> selectOneById(Object id, Class<T> entityClass) {
        Assert.notNull((Object)id, (String)"Id must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        CassandraPersistentEntity<?> entity = this.getRequiredPersistentEntity(entityClass);
        Select select2 = QueryBuilder.select().all().from(entity.getTableName().toCql());
        this.getConverter().write(id, select2.where(), entity);
        return this.selectOne((Statement)select2, entityClass);
    }

    @Override
    public <T> Mono<T> insert(T entity) {
        return this.insert(entity, InsertOptions.empty()).map(EntityWriteResult::getEntity);
    }

    @Override
    public <T> Mono<EntityWriteResult<T>> insert(T entity, InsertOptions options) {
        Assert.notNull(entity, (String)"Entity must not be null");
        Assert.notNull((Object)options, (String)"InsertOptions must not be null");
        return this.doInsert(entity, options, this.getTableName(entity.getClass()));
    }

    <T> Mono<EntityWriteResult<T>> doInsert(T entity, WriteOptions options, CqlIdentifier tableName) {
        return this.maybeCallBeforeConvert(entity, tableName).flatMap(entityToInsert -> {
            EntityOperations.AdaptibleEntity<Object> source = this.entityOperations.forEntity(entityToInsert, this.getConverter().getConversionService());
            CassandraPersistentEntity<?> persistentEntity = this.getRequiredPersistentEntity(entityToInsert.getClass());
            Object entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : entityToInsert;
            Insert insert = EntityQueryUtils.createInsertQuery(tableName.toCql(), entityToUse, options, this.getConverter(), persistentEntity);
            return source.isVersionedEntity() ? this.doInsertVersioned(insert.ifNotExists(), entityToUse, source, tableName) : this.doInsert(insert, entityToUse, tableName);
        });
    }

    private <T> Mono<EntityWriteResult<T>> doInsertVersioned(Insert insert, T entity, EntityOperations.AdaptibleEntity<T> source, CqlIdentifier tableName) {
        return this.executeSave(entity, tableName, (Statement)insert, (result, sink) -> {
            if (!result.wasApplied()) {
                sink.error((Throwable)new OptimisticLockingFailureException(String.format("Cannot insert entity %s with version %s into table %s as it already exists", entity, source.getVersion(), tableName)));
                return;
            }
            sink.next(result);
        });
    }

    private <T> Mono<EntityWriteResult<T>> doInsert(Insert insert, T entity, CqlIdentifier tableName) {
        return this.executeSave(entity, tableName, (Statement)insert);
    }

    @Override
    public <T> Mono<T> update(T entity) {
        return this.update(entity, UpdateOptions.empty()).map(EntityWriteResult::getEntity);
    }

    @Override
    public <T> Mono<EntityWriteResult<T>> update(T entity, UpdateOptions options) {
        Assert.notNull(entity, (String)"Entity must not be null");
        Assert.notNull((Object)options, (String)"UpdateOptions must not be null");
        EntityOperations.AdaptibleEntity source = this.entityOperations.forEntity(entity, this.getConverter().getConversionService());
        CassandraPersistentEntity<?> persistentEntity = this.getRequiredPersistentEntity(entity.getClass());
        CqlIdentifier tableName = persistentEntity.getTableName();
        return this.maybeCallBeforeConvert(entity, tableName).flatMap(entityToUpdate -> source.isVersionedEntity() ? this.doUpdateVersioned(entity, options, tableName, persistentEntity) : this.doUpdate(entity, options, tableName, persistentEntity));
    }

    private <T> Mono<EntityWriteResult<T>> doUpdateVersioned(T entity, UpdateOptions options, CqlIdentifier tableName, CassandraPersistentEntity<?> persistentEntity) {
        EntityOperations.AdaptibleEntity source = this.getEntityOperations().forEntity(entity, this.getConverter().getConversionService());
        Number previousVersion = source.getVersion();
        Object toSave = source.incrementVersion();
        com.datastax.driver.core.querybuilder.Update update = this.getStatementFactory().update(toSave, options, (EntityWriter<Object, Object>)this.getConverter(), persistentEntity, tableName);
        return this.executeSave(toSave, tableName, source.appendVersionCondition(update, previousVersion), (result, sink) -> {
            if (!result.wasApplied()) {
                sink.error((Throwable)new OptimisticLockingFailureException(String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", toSave, source.getVersion(), tableName)));
                return;
            }
            sink.next(result);
        });
    }

    private <T> Mono<EntityWriteResult<T>> doUpdate(T entity, UpdateOptions options, CqlIdentifier tableName, CassandraPersistentEntity<?> persistentEntity) {
        com.datastax.driver.core.querybuilder.Update update = this.getStatementFactory().update(entity, options, (EntityWriter<Object, Object>)this.getConverter(), persistentEntity, tableName);
        return this.executeSave(entity, tableName, (Statement)update);
    }

    @Override
    public <T> Mono<T> delete(T entity) {
        return this.delete(entity, QueryOptions.empty()).map(reactiveWriteResult -> entity);
    }

    @Override
    public Mono<WriteResult> delete(Object entity, QueryOptions options) {
        Assert.notNull((Object)entity, (String)"Entity must not be null");
        Assert.notNull((Object)options, (String)"QueryOptions must not be null");
        EntityOperations.AdaptibleEntity<Object> source = this.entityOperations.forEntity(entity, this.getConverter().getConversionService());
        CassandraPersistentEntity<?> persistentEntity = this.getRequiredPersistentEntity(entity.getClass());
        CqlIdentifier tableName = persistentEntity.getTableName();
        Delete delete = this.getStatementFactory().delete(entity, options, (EntityWriter<Object, Object>)this.getConverter(), persistentEntity, tableName);
        return source.isVersionedEntity() ? this.doDeleteVersioned(delete, entity, source, tableName) : this.doDelete(delete, entity, tableName);
    }

    private Mono<WriteResult> doDeleteVersioned(Delete delete, Object entity, EntityOperations.AdaptibleEntity<Object> source, CqlIdentifier tableName) {
        return this.executeDelete(entity, tableName, source.appendVersionCondition(delete), (result, sink) -> {
            if (!result.wasApplied()) {
                sink.error((Throwable)new OptimisticLockingFailureException(String.format("Cannot delete entity %s with version %s in table %s. Has it been modified meanwhile?", entity, source.getVersion(), tableName)));
                return;
            }
            sink.next(result);
        });
    }

    private Mono<WriteResult> doDelete(Delete delete, Object entity, CqlIdentifier tableName) {
        return this.executeDelete(entity, tableName, (Statement)delete, (result, sink) -> sink.next(result));
    }

    @Override
    public Mono<Boolean> deleteById(Object id, Class<?> entityClass) {
        Assert.notNull((Object)id, (String)"Id must not be null");
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        CassandraPersistentEntity<?> entity = this.getRequiredPersistentEntity(entityClass);
        CqlIdentifier tableName = entity.getTableName();
        Delete delete = QueryBuilder.delete().from(tableName.toCql());
        this.getConverter().write(id, delete.where(), entity);
        Mono result = this.getReactiveCqlOperations().execute((Statement)delete).doOnSubscribe(it -> this.maybeEmitEvent(new BeforeDeleteEvent((Statement)delete, entityClass, tableName)));
        return result.doOnNext(it -> this.maybeEmitEvent(new AfterDeleteEvent((Statement)delete, entityClass, tableName)));
    }

    @Override
    public Mono<Void> truncate(Class<?> entityClass) {
        Assert.notNull(entityClass, (String)"Entity type must not be null");
        CqlIdentifier tableName = this.getTableName(entityClass);
        Truncate truncate = QueryBuilder.truncate((String)tableName.toCql());
        Mono result = this.getReactiveCqlOperations().execute((Statement)truncate).doOnSubscribe(it -> this.maybeEmitEvent(new BeforeDeleteEvent((Statement)truncate, entityClass, tableName)));
        return result.doOnNext(it -> this.maybeEmitEvent(new AfterDeleteEvent((Statement)truncate, entityClass, tableName))).then();
    }

    @Override
    public ReactiveDeleteOperation.ReactiveDelete delete(Class<?> domainType) {
        return new ReactiveDeleteOperationSupport(this).delete(domainType);
    }

    @Override
    public <T> ReactiveInsertOperation.ReactiveInsert<T> insert(Class<T> domainType) {
        return new ReactiveInsertOperationSupport(this).insert(domainType);
    }

    @Override
    public <T> ReactiveSelectOperation.ReactiveSelect<T> query(Class<T> domainType) {
        return new ReactiveSelectOperationSupport(this).query(domainType);
    }

    @Override
    public ReactiveUpdateOperation.ReactiveUpdate update(Class<?> domainType) {
        return new ReactiveUpdateOperationSupport(this).update(domainType);
    }

    private <T> Mono<EntityWriteResult<T>> executeSave(T entity, CqlIdentifier tableName, Statement statement) {
        return this.executeSave(entity, tableName, statement, (writeResult, sink) -> sink.next(writeResult));
    }

    private <T> Mono<EntityWriteResult<T>> executeSave(T entity, CqlIdentifier tableName, Statement statement, BiConsumer<EntityWriteResult<T>, SynchronousSink<EntityWriteResult<T>>> handler) {
        return Mono.defer(() -> {
            this.maybeEmitEvent(new BeforeSaveEvent<Object>(entity, tableName, statement));
            return this.maybeCallBeforeSave(entity, tableName, statement).flatMapMany(entityToSave -> {
                Flux<WriteResult> execute = this.getReactiveCqlOperations().execute(new StatementCallback(statement));
                return execute.map(it -> EntityWriteResult.of(it, entityToSave)).handle(handler).doOnNext(it -> this.maybeEmitEvent(new AfterSaveEvent<Object>(entityToSave, tableName)));
            }).next();
        });
    }

    private Mono<WriteResult> executeDelete(Object entity, CqlIdentifier tableName, Statement statement, BiConsumer<WriteResult, SynchronousSink<WriteResult>> handler) {
        this.maybeEmitEvent(new BeforeDeleteEvent(statement, entity.getClass(), tableName));
        Flux<WriteResult> execute = this.getReactiveCqlOperations().execute(new StatementCallback(statement));
        return execute.map(it -> EntityWriteResult.of(it, entity)).handle(handler).doOnSubscribe(it -> this.maybeEmitEvent(new BeforeSaveEvent<Object>(entity, tableName, statement))).doOnNext(it -> this.maybeEmitEvent(new AfterDeleteEvent(statement, entity.getClass(), tableName))).next();
    }

    private Mono<Integer> getEffectiveFetchSize(Statement statement) {
        CassandraAccessor accessor;
        if (statement.getFetchSize() > 0) {
            return Mono.just((Object)statement.getFetchSize());
        }
        if (this.getReactiveCqlOperations() instanceof CassandraAccessor && (accessor = (CassandraAccessor)((Object)this.getReactiveCqlOperations())).getFetchSize() != -1) {
            return Mono.just((Object)accessor.getFetchSize());
        }
        return this.getReactiveCqlOperations().execute(session -> Mono.just((Object)session.getCluster().getConfiguration().getQueryOptions().getFetchSize())).single();
    }

    private <T> Function<Row, T> getMapper(Class<?> entityType, Class<T> targetType, CqlIdentifier tableName) {
        Class<?> typeToRead = this.resolveTypeToRead(entityType, targetType);
        return row -> {
            Object result;
            this.maybeEmitEvent(new AfterLoadEvent((Row)row, targetType, tableName));
            Object source = this.getConverter().read(typeToRead, row);
            Object object = result = targetType.isInterface() ? this.getProjectionFactory().createProjection(targetType, source) : source;
            if (result != null) {
                this.maybeEmitEvent(new AfterConvertEvent<Object>((Row)row, result, tableName));
            }
            return result;
        };
    }

    private Class<?> resolveTypeToRead(Class<?> entityType, Class<?> targetType) {
        return targetType.isInterface() || targetType.isAssignableFrom(entityType) ? entityType : targetType;
    }

    private static MappingCassandraConverter newConverter() {
        MappingCassandraConverter converter = new MappingCassandraConverter();
        converter.afterPropertiesSet();
        return converter;
    }

    protected <E extends CassandraMappingEvent<T>, T> void maybeEmitEvent(E event) {
        if (this.eventPublisher != null) {
            this.eventPublisher.publishEvent(event);
        }
    }

    protected <T> Mono<T> maybeCallBeforeConvert(T object, CqlIdentifier tableName) {
        if (null != this.entityCallbacks) {
            return this.entityCallbacks.callback(ReactiveBeforeConvertCallback.class, object, new Object[]{tableName});
        }
        return Mono.just(object);
    }

    protected <T> Mono<T> maybeCallBeforeSave(T object, CqlIdentifier tableName, Statement statement) {
        if (null != this.entityCallbacks) {
            return this.entityCallbacks.callback(ReactiveBeforeSaveCallback.class, object, new Object[]{tableName, statement});
        }
        return Mono.just(object);
    }

    static final class StatementCallback
    implements ReactiveSessionCallback<WriteResult>,
    CqlProvider {
        @NonNull
        private final Statement statement;

        @Override
        public Publisher<WriteResult> doInSession(ReactiveSession session) throws DriverException, DataAccessException {
            return session.execute(this.statement).flatMap(StatementCallback::toWriteResult);
        }

        @Override
        public String getCql() {
            return this.statement.toString();
        }

        private static Mono<WriteResult> toWriteResult(ReactiveResultSet resultSet) {
            return resultSet.rows().collectList().map(rows -> new WriteResult(resultSet.getAllExecutionInfo(), resultSet.wasApplied(), (List<Row>)rows));
        }

        public StatementCallback(@NonNull Statement statement) {
            if (statement == null) {
                throw new NullPointerException("statement is marked non-null but is null");
            }
            this.statement = statement;
        }

        @NonNull
        public Statement getStatement() {
            return this.statement;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof StatementCallback)) {
                return false;
            }
            StatementCallback other = (StatementCallback)o;
            Statement this$statement = this.getStatement();
            Statement other$statement = other.getStatement();
            return !(this$statement == null ? other$statement != null : !this$statement.equals(other$statement));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Statement $statement = this.getStatement();
            result = result * 59 + ($statement == null ? 43 : $statement.hashCode());
            return result;
        }

        public String toString() {
            return "ReactiveCassandraTemplate.StatementCallback(statement=" + this.getStatement() + ")";
        }
    }
}

