/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.microprofile.reactive.messaging.tck.acknowledgement;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.tck.TckBase;
import org.eclipse.microprofile.reactive.messaging.tck.acknowledgement.EmitterBean;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.shrinkwrap.api.Archive;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.Test;

public class AsynchronousMessageProcessorAckTest
extends TckBase {
    @Inject
    private EmitterBean bean;
    @Inject
    private MessageProcessor processor;
    @Inject
    private Sink sink;

    @Deployment
    public static Archive<JavaArchive> deployment() {
        return AsynchronousMessageProcessorAckTest.getBaseArchive().addClasses(new Class[]{EmitterBean.class, Sink.class, MessageProcessor.class});
    }

    @Test
    public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws InterruptedException, TimeoutException, ExecutionException {
        this.sink.reset();
        this.processor.disableFailureMode();
        Emitter<String> emitter = this.bean.getEmitter();
        ConcurrentHashMap.KeySetView acked = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView nacked = ConcurrentHashMap.newKeySet();
        Assertions.assertThat(this.run(acked, nacked, emitter)).isEmpty();
        Awaitility.await().until(() -> this.sink.list().size() == 10);
        Assertions.assertThat(acked).hasSize(10);
        Assertions.assertThat(nacked).hasSize(0);
    }

    @Test
    public void testThatMessagesAreNackedAfterFailingProcessingOfMessage() throws InterruptedException, TimeoutException, ExecutionException {
        this.sink.reset();
        Emitter<String> emitter = this.bean.getEmitter();
        ConcurrentHashMap.KeySetView acked = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView nacked = ConcurrentHashMap.newKeySet();
        this.processor.enableFailureMode();
        List<Throwable> throwables = this.run(acked, nacked, emitter);
        Awaitility.await().until(() -> this.sink.list().size() == 8);
        Assertions.assertThat(acked).hasSize(9);
        Assertions.assertThat(nacked).hasSize(1);
        Assertions.assertThat(throwables).hasSize(1);
    }

    private List<Throwable> run(Set<String> acked, Set<String> nacked, Emitter<String> emitter) throws InterruptedException, TimeoutException, ExecutionException {
        CopyOnWriteArrayList<Throwable> reasons = new CopyOnWriteArrayList<Throwable>();
        CompletableFuture.allOf((CompletableFuture[])Stream.of("a", "b", "c", "d", "e", "f", "g", "h", "i", "j").map(i -> CompletableFuture.runAsync(() -> emitter.send(Message.of((Object)i, () -> {
            acked.add((String)i);
            return CompletableFuture.completedFuture(null);
        }, t -> {
            reasons.add((Throwable)t);
            nacked.add((String)i);
            return CompletableFuture.completedFuture(null);
        }))).thenApply(x -> i)).toArray(CompletableFuture[]::new)).get(10L, TimeUnit.SECONDS);
        return reasons;
    }

    @ApplicationScoped
    public static class MessageProcessor {
        private boolean failureModeEnabled = false;

        public void enableFailureMode() {
            this.failureModeEnabled = true;
        }

        public void disableFailureMode() {
            this.failureModeEnabled = false;
        }

        @Incoming(value="data")
        @Outgoing(value="out")
        public CompletionStage<Message<String>> process(Message<String> m) {
            String s = (String)m.getPayload();
            if (this.failureModeEnabled) {
                if (s.equalsIgnoreCase("b")) {
                    return m.nack((Throwable)new IllegalArgumentException("b")).thenApply(x -> null);
                }
                if (s.equalsIgnoreCase("e")) {
                    return m.ack().thenApply(x -> null);
                }
            }
            return CompletableFuture.supplyAsync(() -> m.withPayload((Object)s.toUpperCase()));
        }
    }

    @ApplicationScoped
    public static class Sink {
        private final List<String> list = new CopyOnWriteArrayList<String>();

        @Incoming(value="out")
        public void consume(String s) {
            this.list.add(s);
        }

        public List<String> list() {
            return this.list;
        }

        public void reset() {
            this.list.clear();
        }
    }
}

