/*
 * Decompiled with CFR 0.152.
 */
package io.seata.rm.datasource;

import io.seata.common.exception.NotSupportYetException;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus;
import io.seata.core.model.BranchType;
import io.seata.core.model.ResourceManagerInbound;
import io.seata.rm.DefaultResourceManager;
import io.seata.rm.datasource.DataSourceManager;
import io.seata.rm.datasource.DataSourceProxy;
import io.seata.rm.datasource.undo.UndoLogManagerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncWorker
implements ResourceManagerInbound {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);
    private static final int DEFAULT_RESOURCE_SIZE = 16;
    private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;
    private static int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt("client.rm.async.commit.buffer.limit", 10000);
    private static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<Phase2Context>(ASYNC_COMMIT_BUFFER_LIMIT);
    private static ScheduledExecutorService timerExecutor;

    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

    public synchronized void init() {
        LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
        timerExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("AsyncWorker", 1, true));
        timerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    AsyncWorker.this.doBranchCommits();
                }
                catch (Throwable e) {
                    LOGGER.info("Failed at async committing ... " + e.getMessage());
                }
            }
        }, 10L, 1000L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.size() == 0) {
            return;
        }
        HashMap<String, ArrayList<Phase2Context>> mappedContexts = new HashMap<String, ArrayList<Phase2Context>>(16);
        while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
            Phase2Context commitContext = (Phase2Context)ASYNC_COMMIT_BUFFER.poll();
            ArrayList<Phase2Context> contextsGroupedByResourceId = (ArrayList<Phase2Context>)mappedContexts.get(commitContext.resourceId);
            if (contextsGroupedByResourceId == null) {
                contextsGroupedByResourceId = new ArrayList<Phase2Context>();
                mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
            }
            contextsGroupedByResourceId.add(commitContext);
        }
        for (Map.Entry entry : mappedContexts.entrySet()) {
            DataSourceProxy dataSourceProxy;
            Connection conn = null;
            try {
                DataSourceManager resourceManager = (DataSourceManager)DefaultResourceManager.get().getResourceManager(BranchType.AT);
                dataSourceProxy = resourceManager.get((String)entry.getKey());
                if (dataSourceProxy == null) {
                    throw new ShouldNeverHappenException("Failed to find resource on " + (String)entry.getKey());
                }
                conn = dataSourceProxy.getPlainConnection();
            }
            catch (SQLException sqle) {
                LOGGER.warn("Failed to get connection for async committing on " + (String)entry.getKey(), (Throwable)sqle);
                if (conn == null) continue;
                try {
                    conn.close();
                }
                catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", (Throwable)closeEx);
                }
                continue;
            }
            try {
                List contextsGroupedByResourceId = (List)entry.getValue();
                LinkedHashSet<String> xids = new LinkedHashSet<String>(1000);
                LinkedHashSet<Long> branchIds = new LinkedHashSet<Long>(1000);
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    xids.add(commitContext.xid);
                    branchIds.add(commitContext.branchId);
                    int maxSize = xids.size() > branchIds.size() ? xids.size() : branchIds.size();
                    if (maxSize != 1000) continue;
                    try {
                        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);
                    }
                    catch (Exception ex) {
                        LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", (Throwable)ex);
                    }
                    xids.clear();
                    branchIds.clear();
                }
                if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
                    return;
                }
                try {
                    UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);
                }
                catch (Exception ex) {
                    LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", (Throwable)ex);
                }
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                if (conn == null) continue;
                try {
                    conn.close();
                }
                catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", (Throwable)closeEx);
                }
            }
        }
    }

    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        throw new NotSupportYetException();
    }

    private static class Phase2Context {
        String xid;
        long branchId;
        String resourceId;
        String applicationData;
        BranchType branchType;

        public Phase2Context(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) {
            this.xid = xid;
            this.branchId = branchId;
            this.resourceId = resourceId;
            this.applicationData = applicationData;
            this.branchType = branchType;
        }
    }
}

