/*
 * Decompiled with CFR 0.152.
 */
package com.volcengine.service.tls;

import com.volcengine.model.tls.LogItem;
import com.volcengine.model.tls.exception.LogException;
import com.volcengine.model.tls.pb.PutLogRequest;
import com.volcengine.model.tls.producer.BatchLog;
import com.volcengine.model.tls.producer.CallBack;
import com.volcengine.model.tls.producer.ProducerConfig;
import com.volcengine.model.tls.util.AdaptorUtil;
import com.volcengine.service.tls.BatchHandler;
import com.volcengine.service.tls.LogDispatcher;
import com.volcengine.service.tls.Mover;
import com.volcengine.service.tls.Producer;
import com.volcengine.service.tls.RetryManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ProducerImpl
implements Producer {
    private static final Log LOG = LogFactory.getLog(ProducerImpl.class);
    private ProducerConfig producerConfig;
    private final LogDispatcher dispatcher;
    private static final AtomicInteger INSTANCE_ID = new AtomicInteger(0);
    private final String name;
    private final Semaphore memoryLock;
    private final BatchHandler successHandler;
    private final BatchHandler failHandler;
    private final RetryManager retryManager;
    private final AtomicInteger batchCount = new AtomicInteger(0);
    private final Mover mover;

    public ProducerImpl(ProducerConfig producerConfig) throws LogException {
        producerConfig.validConfig();
        this.producerConfig = producerConfig;
        this.name = "TLS-" + INSTANCE_ID.incrementAndGet();
        LinkedBlockingQueue<BatchLog> successQueue = new LinkedBlockingQueue<BatchLog>();
        LinkedBlockingQueue<BatchLog> failureQueue = new LinkedBlockingQueue<BatchLog>();
        this.memoryLock = new Semaphore(producerConfig.getTotalSizeInBytes());
        this.retryManager = new RetryManager();
        this.dispatcher = new LogDispatcher(producerConfig, this.name, successQueue, failureQueue, this.memoryLock, this.batchCount, this.retryManager);
        this.successHandler = new BatchHandler("success batch handler-" + this.name, this.memoryLock, successQueue, this.batchCount);
        this.failHandler = new BatchHandler("fail batch handler-" + this.name, this.memoryLock, failureQueue, this.batchCount);
        this.mover = new Mover(this.name + "-mover", producerConfig, this.dispatcher, this.retryManager, successQueue, failureQueue);
    }

    public static Producer defaultProducer(String endpoint, String region, String accessKey, String accessSecret, String token) throws LogException {
        return new ProducerImpl(new ProducerConfig(endpoint, region, accessKey, accessSecret, token));
    }

    @Override
    @Deprecated
    public void sendLog(String hashKey, String topicId, String source, String filename, PutLogRequest.Log log, CallBack callBack) throws InterruptedException, LogException {
        if (topicId == null || log == null) {
            throw new LogException("InvalidArgument", String.format("topic id: %s, log: %s", topicId, log), null);
        }
        PutLogRequest.LogGroup logGroup = PutLogRequest.LogGroup.newBuilder().setFileName(filename).setSource(source).addLogs(log).build();
        this.sendLogGroup(hashKey, topicId, source, filename, logGroup, callBack);
    }

    @Override
    @Deprecated
    public void sendLogGroup(String hashKey, String topicId, String source, String filename, PutLogRequest.LogGroup logGroup, CallBack callBack) throws InterruptedException, LogException {
        if (topicId == null || logGroup == null || logGroup.getLogsList() == null || logGroup.getLogsList().size() == 0) {
            throw new LogException("InvalidArgument", String.format("topic id: %s, log group is empty", topicId), null);
        }
        if (logGroup.getLogsList().size() > 10000) {
            throw new LogException("InvalidArgument", String.format("log list size %d is greater than threshold %d", logGroup.getLogsList().size(), 10000), null);
        }
        this.dispatcher.addBatch(hashKey, topicId, source, filename, logGroup, callBack);
    }

    @Override
    public void sendLogV2(String hashKey, String topicId, String source, String filename, LogItem log, CallBack callBack) throws InterruptedException, LogException {
        if (log == null) {
            return;
        }
        ArrayList<LogItem> items = new ArrayList<LogItem>();
        items.add(log);
        this.sendLogsV2(hashKey, topicId, source, filename, items, callBack);
    }

    @Override
    public void sendLogsV2(String hashKey, String topicId, String source, String filename, List<LogItem> logs, CallBack callBack) throws InterruptedException, LogException {
        if (topicId == null || logs == null || logs.size() == 0) {
            throw new LogException("InvalidArgument", String.format("topic id: %s, log group: %s", topicId, logs), null);
        }
        if (logs.size() > 10000) {
            throw new LogException("InvalidArgument", String.format("log list size %d is greater than threshold %d", logs.size(), 10000), null);
        }
        this.dispatcher.addBatch(hashKey, topicId, source, filename, AdaptorUtil.logItems2PbGroup(filename, source, logs), callBack);
    }

    @Override
    public void resetAccessKeyToken(String accessKey, String secretKey, String securityToken) throws LogException {
        if (StringUtils.isEmpty((CharSequence)accessKey) || StringUtils.isEmpty((CharSequence)secretKey)) {
            throw new LogException("InvalidArgument", String.format("reset producer %s access key failed, accessKey is %s, secretKey is %s, token is %s", this.name, accessKey, secretKey, securityToken), null);
        }
        this.dispatcher.resetAccessKeyToken(accessKey, secretKey, securityToken);
    }

    @Override
    public void start() throws LogException {
        this.dispatcher.start();
        this.retryManager.start();
        this.successHandler.start();
        this.failHandler.start();
        this.mover.start();
        LOG.info((Object)String.format("producer %s started", this.name));
    }

    @Override
    public void close() throws InterruptedException, LogException {
        this.close(30000L);
    }

    @Override
    public void close(long timeoutMs) throws InterruptedException, LogException {
        LogException feedbackException;
        block11: {
            block10: {
                block9: {
                    feedbackException = null;
                    try {
                        timeoutMs = this.closeMover(timeoutMs);
                    }
                    catch (LogException e) {
                        feedbackException = e;
                    }
                    try {
                        timeoutMs = this.closeExecutorService(timeoutMs);
                    }
                    catch (LogException e) {
                        if (feedbackException != null) break block9;
                        feedbackException = e;
                    }
                }
                try {
                    timeoutMs = this.closeSuccessHandler(timeoutMs);
                }
                catch (LogException e) {
                    if (feedbackException != null) break block10;
                    feedbackException = e;
                }
            }
            try {
                timeoutMs = this.closeFailureHandler(timeoutMs);
            }
            catch (LogException e) {
                if (feedbackException != null) break block11;
                feedbackException = e;
            }
        }
        if (feedbackException != null) {
            throw feedbackException;
        }
        this.dispatcher.getClient().close();
        LOG.info((Object)String.format("producer %s closed", this.name));
    }

    private long closeMover(long timeoutMs) throws InterruptedException, LogException {
        long startMs = System.currentTimeMillis();
        this.dispatcher.close();
        this.retryManager.close();
        this.mover.close();
        this.mover.join(timeoutMs);
        if (this.mover.isAlive()) {
            LOG.warn((Object)"producer mover thread is still alive");
            throw new LogException("Producer Error", "producer mover thread is still alive", null);
        }
        LOG.info((Object)"producer mover is closed");
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    private long closeExecutorService(long timeoutMs) throws InterruptedException, LogException {
        long startMs = System.currentTimeMillis();
        ExecutorService executorService = this.dispatcher.getExecutorService();
        executorService.shutdown();
        if (!executorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
            LOG.warn((Object)"producer executor is not terminated normally");
            executorService.shutdownNow();
            throw new LogException("Producer Error", "producer executor is not terminated normally", null);
        }
        LOG.info((Object)"producer executor service is closed");
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    private long closeSuccessHandler(long timeoutMs) throws InterruptedException, LogException {
        boolean invokedFromCallback;
        long startMs = System.currentTimeMillis();
        this.successHandler.close();
        boolean bl = invokedFromCallback = Thread.currentThread() == this.successHandler;
        if (invokedFromCallback) {
            LOG.warn((Object)"Skip join success batch handler since you have incorrectly invoked close from the producer callback");
            return timeoutMs;
        }
        this.successHandler.join(timeoutMs);
        if (this.successHandler.isAlive()) {
            LOG.warn((Object)"producer success handler thread is still alive");
            throw new LogException("Producer Error", "producer success handler thread is still alive", null);
        }
        LOG.info((Object)"producer success handler is closed");
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    private long closeFailureHandler(long timeoutMs) throws InterruptedException, LogException {
        boolean invokedFromCallback;
        long startMs = System.currentTimeMillis();
        this.failHandler.close();
        boolean bl = invokedFromCallback = Thread.currentThread() == this.successHandler || Thread.currentThread() == this.failHandler;
        if (invokedFromCallback) {
            LOG.warn((Object)"Skip join failure batch handler since you have incorrectly invoked close from the producer callback");
            return timeoutMs;
        }
        this.failHandler.join(timeoutMs);
        if (this.failHandler.isAlive()) {
            LOG.warn((Object)"producer failure handler thread is still alive");
            throw new LogException("Producer Error", "producer failure handler thread is still alive", null);
        }
        LOG.info((Object)"producer failure handler is closed");
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    @Override
    public void closeNow() throws InterruptedException, LogException {
        this.dispatcher.closeNow();
        this.retryManager.close();
        this.mover.close();
        this.successHandler.close();
        this.failHandler.close();
        LOG.info((Object)String.format("producer %s closed now", this.name));
    }

    @Override
    public void config(ProducerConfig producerConfig) throws LogException {
        if (producerConfig != null) {
            this.producerConfig = producerConfig;
            producerConfig.validConfig();
            LOG.info((Object)String.format("producer %s configured, config: %s", this.name, producerConfig));
        }
    }
}

