/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageExecutionThread;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException;
import io.awspring.cloud.sqs.listener.AsyncMessageListener;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import io.awspring.cloud.sqs.listener.MessageListener;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.acknowledgement.AsyncAcknowledgementResultCallback;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler;
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
import io.awspring.cloud.sqs.listener.interceptor.MessageInterceptor;
import io.awspring.cloud.sqs.support.observation.MessageHeaderContextAccessor;
import io.micrometer.context.ContextAccessor;
import io.micrometer.context.ContextRegistry;
import io.micrometer.context.ContextSnapshot;
import io.micrometer.context.ContextSnapshotFactory;
import io.micrometer.context.Nullable;
import io.micrometer.context.ThreadLocalAccessor;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class AsyncComponentAdapters {
    private static final Logger logger = LoggerFactory.getLogger(AsyncComponentAdapters.class);

    private AsyncComponentAdapters() {
    }

    public static <T> AsyncErrorHandler<T> adapt(ErrorHandler<T> errorHandler) {
        return new BlockingErrorHandlerAdapter<T>(errorHandler);
    }

    public static <T> AsyncMessageInterceptor<T> adapt(MessageInterceptor<T> messageInterceptor) {
        return new BlockingMessageInterceptorAdapter<T>(messageInterceptor);
    }

    public static <T> AsyncMessageListener<T> adapt(MessageListener<T> messageListener) {
        return new BlockingMessageListenerAdapter<T>(messageListener);
    }

    public static <T> AsyncAcknowledgementResultCallback<T> adapt(AcknowledgementResultCallback<T> acknowledgementResultCallback) {
        return new BlockingAcknowledgementResultCallbackAdapter<T>(acknowledgementResultCallback);
    }

    private static class BlockingErrorHandlerAdapter<T>
    extends AbstractThreadingComponentAdapter<T>
    implements AsyncErrorHandler<T> {
        private final ErrorHandler<T> blockingErrorHandler;

        public BlockingErrorHandlerAdapter(ErrorHandler<T> blockingErrorHandler) {
            this.blockingErrorHandler = blockingErrorHandler;
        }

        @Override
        public CompletableFuture<Void> handle(Message<T> message, Throwable t) {
            return this.execute(this.withConsumerThreadLocalScope(msg -> this.blockingErrorHandler.handle((Message<T>)msg, t), message));
        }

        @Override
        public CompletableFuture<Void> handle(Collection<Message<T>> messages, Throwable t) {
            return this.execute(() -> this.blockingErrorHandler.handle(messages, t));
        }
    }

    private static class BlockingMessageInterceptorAdapter<T>
    extends AbstractThreadingComponentAdapter<T>
    implements AsyncMessageInterceptor<T> {
        private final MessageInterceptor<T> blockingMessageInterceptor;

        public BlockingMessageInterceptorAdapter(MessageInterceptor<T> blockingMessageInterceptor) {
            this.blockingMessageInterceptor = blockingMessageInterceptor;
        }

        @Override
        public CompletableFuture<Message<T>> intercept(Message<T> message) {
            return this.execute(this.withFunctionThreadLocalScope(this.blockingMessageInterceptor::intercept, message));
        }

        @Override
        public CompletableFuture<Collection<Message<T>>> intercept(Collection<Message<T>> messages) {
            return this.execute(() -> this.blockingMessageInterceptor.intercept(messages));
        }

        @Override
        public CompletableFuture<Void> afterProcessing(Message<T> message, Throwable t) {
            return this.execute(this.withConsumerThreadLocalScope(msg -> this.blockingMessageInterceptor.afterProcessing((Message<T>)msg, t), message));
        }

        @Override
        public CompletableFuture<Void> afterProcessing(Collection<Message<T>> messages, Throwable t) {
            return this.execute(() -> this.blockingMessageInterceptor.afterProcessing(messages, t));
        }
    }

    private static class BlockingMessageListenerAdapter<T>
    extends AbstractThreadingComponentAdapter<T>
    implements AsyncMessageListener<T> {
        private final MessageListener<T> blockingMessageListener;

        public BlockingMessageListenerAdapter(MessageListener<T> blockingMessageListener) {
            this.blockingMessageListener = blockingMessageListener;
        }

        @Override
        public CompletableFuture<Void> onMessage(Message<T> message) {
            return this.execute(this.withConsumerThreadLocalScope(this.blockingMessageListener::onMessage, message));
        }

        @Override
        public CompletableFuture<Void> onMessage(Collection<Message<T>> messages) {
            return this.execute(() -> this.blockingMessageListener.onMessage(messages));
        }
    }

    private static class BlockingAcknowledgementResultCallbackAdapter<T>
    extends AbstractThreadingComponentAdapter<T>
    implements AsyncAcknowledgementResultCallback<T> {
        private final AcknowledgementResultCallback<T> blockingAcknowledgementResultCallback;

        public BlockingAcknowledgementResultCallbackAdapter(AcknowledgementResultCallback<T> blockingAcknowledgementResultCallback) {
            this.blockingAcknowledgementResultCallback = blockingAcknowledgementResultCallback;
        }

        @Override
        public CompletableFuture<Void> onSuccess(Collection<Message<T>> messages) {
            return this.execute(() -> this.blockingAcknowledgementResultCallback.onSuccess(messages));
        }

        @Override
        public CompletableFuture<Void> onFailure(Collection<Message<T>> messages, Throwable t) {
            return this.execute(() -> this.blockingAcknowledgementResultCallback.onFailure(messages, t));
        }
    }

    private static class MicrometerObservationThreadLocalWrapper<MessageType>
    implements ThreadLocalScope<MessageType> {
        private static final ContextSnapshotFactory CONTEXT_SNAPSHOT_FACTORY;

        private MicrometerObservationThreadLocalWrapper() {
        }

        @Override
        public ThreadLocalScope.CloseableScope<MessageType> openScope(Message<MessageType> message) {
            ContextSnapshot.Scope scope = CONTEXT_SNAPSHOT_FACTORY.captureFrom(new Object[]{message.getHeaders()}).setThreadLocals();
            return new ObservationAwareCloseableScope<MessageType>(scope, message);
        }

        static {
            ContextRegistry registry = new ContextRegistry();
            registry.registerContextAccessor((ContextAccessor)new MessageHeaderContextAccessor());
            registry.registerThreadLocalAccessor((ThreadLocalAccessor)new ObservationThreadLocalAccessor());
            CONTEXT_SNAPSHOT_FACTORY = ContextSnapshotFactory.builder().contextRegistry(registry).build();
        }

        private static class ObservationAwareCloseableScope<MessageType>
        implements ThreadLocalScope.CloseableScope<MessageType> {
            private final ContextSnapshot.Scope scope;
            @Nullable
            private final Object observationContext;

            public ObservationAwareCloseableScope(ContextSnapshot.Scope scope, Message<MessageType> message) {
                this.scope = scope;
                this.observationContext = message.getHeaders().get((Object)"micrometer.observation");
            }

            @Override
            public void close() {
                this.scope.close();
            }

            @Override
            public Message<MessageType> beforeExecution(Message<MessageType> message) {
                return MessageHeaderUtils.removeHeaderIfPresent(message, "micrometer.observation");
            }

            @Override
            public Message<MessageType> afterExecution(Message<MessageType> message) {
                return this.observationContext != null ? MessageHeaderUtils.addHeaderIfAbsent(message, "micrometer.observation", this.observationContext) : message;
            }

            @Override
            public Message<MessageType> onExecutionError(Message<MessageType> message, Throwable t) {
                if (this.observationContext != null && ListenerExecutionFailedException.hasListenerException(t)) {
                    Message failedMessage = Objects.requireNonNull(ListenerExecutionFailedException.unwrapMessage(t), "Message not found in Listener Exception.");
                    Message messageWithHeader = MessageHeaderUtils.addHeaderIfAbsent(failedMessage, "micrometer.observation", this.observationContext);
                    throw new ListenerExecutionFailedException(t.getMessage(), t.getCause(), messageWithHeader);
                }
                return message;
            }
        }
    }

    private static interface ThreadLocalScope<MessageType> {
        public CloseableScope<MessageType> openScope(Message<MessageType> var1);

        public static interface CloseableScope<MessageType>
        extends AutoCloseable {
            default public Message<MessageType> beforeExecution(Message<MessageType> message) {
                return message;
            }

            default public Message<MessageType> afterExecution(Message<MessageType> message) {
                return message;
            }

            default public Message<MessageType> onExecutionError(Message<MessageType> message, Throwable t) {
                return message;
            }

            @Override
            public void close();
        }
    }

    protected static class AbstractThreadingComponentAdapter<MessageType>
    implements TaskExecutorAware {
        private TaskExecutor taskExecutor;
        private final ThreadLocalScope<MessageType> threadLocalScope = new MicrometerObservationThreadLocalWrapper();

        protected AbstractThreadingComponentAdapter() {
        }

        @Override
        public void setTaskExecutor(TaskExecutor taskExecutor) {
            this.taskExecutor = taskExecutor;
        }

        protected <T> CompletableFuture<T> execute(Supplier<T> executable) {
            if (Thread.currentThread() instanceof MessageExecutionThread) {
                logger.trace("Already in a {}, not switching", (Object)MessageExecutionThread.class.getSimpleName());
                return this.supplyInSameThread(executable);
            }
            logger.trace("Not in a {}, submitting to executor", (Object)MessageExecutionThread.class.getSimpleName());
            Assert.notNull((Object)this.taskExecutor, (String)"Task executor not set");
            return this.supplyInNewThread(executable);
        }

        protected CompletableFuture<Void> execute(Runnable executable) {
            if (Thread.currentThread() instanceof MessageExecutionThread) {
                logger.trace("Already in a {}, not switching", (Object)MessageExecutionThread.class.getSimpleName());
                return this.runInSameThread(executable);
            }
            logger.trace("Not in a {}, submitting to executor", (Object)MessageExecutionThread.class.getSimpleName());
            Assert.notNull((Object)this.taskExecutor, (String)"Task executor not set");
            return this.runInNewThread(executable);
        }

        private CompletableFuture<Void> runInSameThread(Runnable blockingProcess) {
            try {
                blockingProcess.run();
                return CompletableFuture.completedFuture(null);
            }
            catch (Exception e) {
                return CompletableFutures.failedFuture(this.wrapWithBlockingException(e));
            }
        }

        private CompletableFuture<Void> runInNewThread(Runnable blockingProcess) {
            try {
                return CompletableFutures.exceptionallyCompose(CompletableFuture.runAsync(blockingProcess, (Executor)this.taskExecutor), t -> CompletableFutures.failedFuture(this.wrapWithBlockingException((Throwable)t)));
            }
            catch (Exception e) {
                return CompletableFutures.failedFuture(this.wrapWithBlockingException(e));
            }
        }

        private <T> CompletableFuture<T> supplyInSameThread(Supplier<T> blockingProcess) {
            try {
                return CompletableFuture.completedFuture(blockingProcess.get());
            }
            catch (Exception e) {
                return CompletableFutures.failedFuture(this.wrapWithBlockingException(e));
            }
        }

        private <T> CompletableFuture<T> supplyInNewThread(Supplier<T> blockingProcess) {
            try {
                return CompletableFutures.exceptionallyCompose(CompletableFuture.supplyAsync(blockingProcess, (Executor)this.taskExecutor), t -> CompletableFutures.failedFuture(this.wrapWithBlockingException((Throwable)t)));
            }
            catch (Exception e) {
                return CompletableFutures.failedFuture(this.wrapWithBlockingException(e));
            }
        }

        private AsyncAdapterBlockingExecutionFailedException wrapWithBlockingException(Throwable t) {
            return new AsyncAdapterBlockingExecutionFailedException("Error executing action in " + this.getClass().getSimpleName(), t instanceof CompletionException ? t.getCause() : t);
        }

        protected Supplier<Message<MessageType>> withFunctionThreadLocalScope(Function<Message<MessageType>, Message<MessageType>> executable, Message<MessageType> message) {
            return () -> {
                ThreadLocalScope.CloseableScope<MessageType> closeableScope = this.threadLocalScope.openScope(message);
                Message<MessageType> messageToExecute = closeableScope.beforeExecution(message);
                try {
                    Message returnedMessage = (Message)executable.apply(messageToExecute);
                    Message<MessageType> message2 = closeableScope.afterExecution(returnedMessage);
                    return message2;
                }
                catch (Exception e) {
                    closeableScope.onExecutionError(messageToExecute, e);
                    throw e;
                }
                finally {
                    closeableScope.close();
                }
            };
        }

        protected Runnable withConsumerThreadLocalScope(Consumer<Message<MessageType>> executable, Message<MessageType> message) {
            return () -> {
                ThreadLocalScope.CloseableScope<MessageType> closeableScope = this.threadLocalScope.openScope(message);
                Message<MessageType> messageToExecute = closeableScope.beforeExecution(message);
                try {
                    executable.accept(messageToExecute);
                    closeableScope.afterExecution(messageToExecute);
                }
                catch (Exception e) {
                    closeableScope.onExecutionError(messageToExecute, e);
                    throw e;
                }
                finally {
                    closeableScope.close();
                }
            };
        }
    }
}

