/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.sql.impl.operation;

import com.hazelcast.internal.nio.Connection;
import com.hazelcast.internal.nio.Packet;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.sql.impl.NodeServiceProvider;
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.QueryId;
import com.hazelcast.sql.impl.exec.CreateExecPlanNodeVisitor;
import com.hazelcast.sql.impl.exec.CreateExecPlanNodeVisitorHook;
import com.hazelcast.sql.impl.exec.Exec;
import com.hazelcast.sql.impl.exec.io.InboundHandler;
import com.hazelcast.sql.impl.exec.io.OutboundHandler;
import com.hazelcast.sql.impl.exec.io.flowcontrol.FlowControlFactory;
import com.hazelcast.sql.impl.operation.QueryBatchExchangeOperation;
import com.hazelcast.sql.impl.operation.QueryCancelOperation;
import com.hazelcast.sql.impl.operation.QueryCheckOperation;
import com.hazelcast.sql.impl.operation.QueryCheckResponseOperation;
import com.hazelcast.sql.impl.operation.QueryExecuteOperation;
import com.hazelcast.sql.impl.operation.QueryExecuteOperationFragment;
import com.hazelcast.sql.impl.operation.QueryFlowControlExchangeOperation;
import com.hazelcast.sql.impl.operation.QueryOperation;
import com.hazelcast.sql.impl.operation.QueryOperationChannel;
import com.hazelcast.sql.impl.operation.QueryOperationChannelImpl;
import com.hazelcast.sql.impl.operation.QueryOperationHandler;
import com.hazelcast.sql.impl.state.QueryState;
import com.hazelcast.sql.impl.state.QueryStateCompletionCallback;
import com.hazelcast.sql.impl.state.QueryStateRegistry;
import com.hazelcast.sql.impl.worker.QueryFragmentExecutable;
import com.hazelcast.sql.impl.worker.QueryFragmentWorkerPool;
import com.hazelcast.sql.impl.worker.QueryOperationExecutable;
import com.hazelcast.sql.impl.worker.QueryOperationWorkerPool;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;

public class QueryOperationHandlerImpl
implements QueryOperationHandler,
QueryStateCompletionCallback {
    private final NodeServiceProvider nodeServiceProvider;
    private final InternalSerializationService serializationService;
    private final QueryStateRegistry stateRegistry;
    private final QueryFragmentWorkerPool fragmentPool;
    private final QueryOperationWorkerPool operationPool;
    private final int outboxBatchSize;
    private final FlowControlFactory flowControlFactory;
    private volatile CreateExecPlanNodeVisitorHook execHook;

    public QueryOperationHandlerImpl(String instanceName, NodeServiceProvider nodeServiceProvider, InternalSerializationService serializationService, QueryStateRegistry stateRegistry, int outboxBatchSize, FlowControlFactory flowControlFactory, int threadCount, int operationThreadCount) {
        this.nodeServiceProvider = nodeServiceProvider;
        this.serializationService = serializationService;
        this.stateRegistry = stateRegistry;
        this.outboxBatchSize = outboxBatchSize;
        this.flowControlFactory = flowControlFactory;
        this.fragmentPool = new QueryFragmentWorkerPool(instanceName, threadCount, nodeServiceProvider.getLogger(QueryFragmentWorkerPool.class));
        this.operationPool = new QueryOperationWorkerPool(instanceName, operationThreadCount, nodeServiceProvider, this, serializationService, nodeServiceProvider.getLogger(QueryOperationWorkerPool.class));
    }

    public void shutdown() {
        this.fragmentPool.stop();
        this.operationPool.stop();
    }

    @Override
    public boolean submit(UUID localMemberId, UUID targetMemberId, QueryOperation operation) {
        if (targetMemberId.equals(localMemberId)) {
            this.submitLocal(localMemberId, operation);
            return true;
        }
        Connection connection = this.getConnection(targetMemberId);
        if (connection == null) {
            return false;
        }
        return this.submitRemote(localMemberId, connection, operation, false);
    }

    public void submitLocal(UUID callerId, QueryOperation operation) {
        operation.setCallerId(callerId);
        this.operationPool.submit(operation.getPartition(), QueryOperationExecutable.local(operation));
    }

    public boolean submitRemote(UUID callerId, Connection connection, QueryOperation operation, boolean ordered) {
        operation.setCallerId(callerId);
        byte[] bytes = this.serializeOperation(operation);
        Packet packet = new Packet(bytes, operation.getPartition()).setPacketType(Packet.Type.SQL);
        if (ordered) {
            return connection.writeOrdered(packet);
        }
        return connection.write(packet);
    }

    @Override
    public QueryOperationChannel createChannel(UUID sourceMemberId, UUID targetMemberId) {
        if (targetMemberId.equals(this.getLocalMemberId())) {
            return new QueryOperationChannelImpl(this, sourceMemberId, null);
        }
        Connection connection = this.getConnection(targetMemberId);
        if (connection == null) {
            throw QueryException.memberConnection(targetMemberId);
        }
        return new QueryOperationChannelImpl(this, sourceMemberId, connection);
    }

    @Override
    public void execute(QueryOperation operation) {
        if (operation instanceof QueryExecuteOperation) {
            this.handleExecute((QueryExecuteOperation)operation);
        } else if (operation instanceof QueryBatchExchangeOperation) {
            this.handleBatch((QueryBatchExchangeOperation)operation);
        } else if (operation instanceof QueryCancelOperation) {
            this.handleCancel((QueryCancelOperation)operation);
        } else if (operation instanceof QueryFlowControlExchangeOperation) {
            this.handleFlowControl((QueryFlowControlExchangeOperation)operation);
        } else if (operation instanceof QueryCheckOperation) {
            this.handleCheck((QueryCheckOperation)operation);
        } else if (operation instanceof QueryCheckResponseOperation) {
            this.handleCheckResponse((QueryCheckResponseOperation)operation);
        }
    }

    private void handleExecute(QueryExecuteOperation operation) {
        UUID localMemberId = this.getLocalMemberId();
        if (!operation.getPartitionMap().containsKey(localMemberId)) {
            return;
        }
        QueryState state = this.stateRegistry.onDistributedQueryStarted(localMemberId, operation.getQueryId(), this);
        if (state == null) {
            return;
        }
        ArrayList<QueryFragmentExecutable> fragmentExecutables = new ArrayList<QueryFragmentExecutable>(operation.getFragments().size());
        for (QueryExecuteOperationFragment fragmentDescriptor : operation.getFragments()) {
            if (fragmentDescriptor.getNode() == null) continue;
            CreateExecPlanNodeVisitor visitor = new CreateExecPlanNodeVisitor(this, this.nodeServiceProvider, this.serializationService, localMemberId, operation, this.flowControlFactory, operation.getPartitionMap().get(localMemberId), this.outboxBatchSize, this.execHook);
            fragmentDescriptor.getNode().visit(visitor);
            Exec exec = visitor.getExec();
            Map<Integer, InboundHandler> inboxes = visitor.getInboxes();
            Map<Integer, Map<UUID, OutboundHandler>> outboxes = visitor.getOutboxes();
            QueryFragmentExecutable fragmentExecutable = new QueryFragmentExecutable(state, operation.getArguments(), exec, inboxes, outboxes, this.fragmentPool);
            fragmentExecutables.add(fragmentExecutable);
        }
        state.getDistributedState().onStart(fragmentExecutables);
        for (QueryFragmentExecutable fragmentExecutable : fragmentExecutables) {
            fragmentExecutable.schedule();
        }
    }

    private void handleBatch(QueryBatchExchangeOperation operation) {
        UUID localMemberId = this.getLocalMemberId();
        if (!localMemberId.equals(operation.getTargetMemberId())) {
            return;
        }
        QueryState state = this.stateRegistry.onDistributedQueryStarted(localMemberId, operation.getQueryId(), this);
        if (state == null) {
            return;
        }
        QueryFragmentExecutable fragmentExecutable = state.getDistributedState().onOperation(operation);
        if (fragmentExecutable != null) {
            fragmentExecutable.schedule();
        }
    }

    private void handleCancel(QueryCancelOperation operation) {
        QueryId queryId = operation.getQueryId();
        QueryState state = this.stateRegistry.getState(queryId);
        if (state == null) {
            return;
        }
        QueryException error = QueryException.error(operation.getErrorCode(), operation.getErrorMessage(), operation.getOriginatingMemberId());
        state.cancel(error, false);
    }

    private void handleFlowControl(QueryFlowControlExchangeOperation operation) {
        QueryState state = this.stateRegistry.getState(operation.getQueryId());
        if (state == null) {
            return;
        }
        QueryFragmentExecutable fragmentExecutable = state.getDistributedState().onOperation(operation);
        if (fragmentExecutable != null) {
            fragmentExecutable.schedule();
        }
    }

    private void handleCheck(QueryCheckOperation operation) {
        ArrayList<QueryId> inactiveQueryIds = new ArrayList<QueryId>(operation.getQueryIds().size());
        for (QueryId queryId : operation.getQueryIds()) {
            boolean active = this.stateRegistry.getState(queryId) != null;
            if (active) continue;
            inactiveQueryIds.add(queryId);
        }
        QueryCheckResponseOperation responseOperation = new QueryCheckResponseOperation(inactiveQueryIds);
        this.submit(this.getLocalMemberId(), operation.getCallerId(), responseOperation);
    }

    private void handleCheckResponse(QueryCheckResponseOperation operation) {
        if (operation.getQueryIds().isEmpty()) {
            return;
        }
        QueryException error = QueryException.error(-1, "Query is no longer active on coordinator.", operation.getCallerId());
        for (QueryId queryId : operation.getQueryIds()) {
            QueryState state = this.stateRegistry.getState(queryId);
            if (state == null) continue;
            state.cancel(error, false);
        }
    }

    @Override
    public void onCompleted(QueryId queryId) {
        this.stateRegistry.onQueryCompleted(queryId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(QueryId queryId, int errCode, String errMessage, UUID originatingMemberId, Collection<UUID> memberIds) {
        try {
            if (memberIds.isEmpty()) {
                return;
            }
            QueryCancelOperation operation = new QueryCancelOperation(queryId, errCode, errMessage, originatingMemberId);
            for (UUID memberId : memberIds) {
                this.submit(this.getLocalMemberId(), memberId, operation);
            }
        }
        finally {
            this.stateRegistry.onQueryCompleted(queryId);
        }
    }

    public void onPacket(Packet packet) {
        int partition = packet.hasPartitionHash() ? packet.getPartitionId() : -1;
        this.operationPool.submit(partition, QueryOperationExecutable.remote(packet));
    }

    private Connection getConnection(UUID memberId) {
        return this.nodeServiceProvider.getConnection(memberId);
    }

    private UUID getLocalMemberId() {
        return this.nodeServiceProvider.getLocalMemberId();
    }

    private byte[] serializeOperation(QueryOperation operation) {
        try {
            return this.serializationService.toBytes(operation);
        }
        catch (Exception e) {
            throw QueryException.error("Failed to serialize " + operation.getClass().getSimpleName() + ": " + e.getMessage(), e);
        }
    }

    public void setExecHook(CreateExecPlanNodeVisitorHook execHook) {
        this.execHook = execHook;
    }
}

