/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.microprofile.reactive.streams.operators.tck.spi;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.LongStream;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.AbstractStageVerification;
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.QuietRuntimeException;
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.ReactiveStreamsSpiVerification;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.testng.Assert;
import org.testng.annotations.Test;

public class CoupledStageVerification
extends AbstractStageVerification {
    CoupledStageVerification(ReactiveStreamsSpiVerification.VerificationDeps deps) {
        super(deps);
    }

    @Test
    public void coupledStageShouldCancelAndCompleteUpstreamWhenDownstreamCancels() {
        CompletableFuture subscriberCompleted = new CompletableFuture();
        CompletableFuture upstreamCancelled = new CompletableFuture();
        this.idlePublisher().onTerminate(() -> upstreamCancelled.complete(null)).via(this.rs.coupled(this.rs.builder().onComplete(() -> subscriberCompleted.complete(null)).ignore(), this.idlePublisher())).cancel().run(this.getEngine());
        this.await(subscriberCompleted);
        this.await(upstreamCancelled);
    }

    @Test
    public void coupledStageShouldCancelAndCompleteUpstreamWhenPublisherCompletes() {
        CompletableFuture subscriberCompleted = new CompletableFuture();
        CompletableFuture upstreamCancelled = new CompletableFuture();
        this.idlePublisher().onTerminate(() -> upstreamCancelled.complete(null)).via(this.rs.coupled(this.rs.builder().onComplete(() -> subscriberCompleted.complete(null)).ignore(), this.rs.empty())).ignore().run(this.getEngine());
        this.await(subscriberCompleted);
        this.await(upstreamCancelled);
    }

    @Test
    public void coupledStageShouldCancelAndCompleteUpstreamWhenPublisherFails() {
        CompletableFuture subscriberFailed = new CompletableFuture();
        CompletableFuture upstreamCancelled = new CompletableFuture();
        this.idlePublisher().onTerminate(() -> upstreamCancelled.complete(null)).via(this.rs.coupled(this.rs.builder().onError(subscriberFailed::complete).ignore(), this.rs.failed((Throwable)new QuietRuntimeException("failed")))).ignore().run(this.getEngine());
        Assert.assertTrue((boolean)(this.await(subscriberFailed) instanceof QuietRuntimeException));
        this.await(upstreamCancelled);
    }

    @Test
    public void coupledStageShouldCancelAndCompleteDownstreamWhenUpstreamCompletes() {
        CompletableFuture publisherCancelled = new CompletableFuture();
        CompletableFuture downstreamCompleted = new CompletableFuture();
        this.rs.empty().via(this.rs.coupled(this.rs.builder().ignore(), this.idlePublisher().onTerminate(() -> publisherCancelled.complete(null)))).onComplete(() -> downstreamCompleted.complete(null)).ignore().run(this.getEngine());
        this.await(publisherCancelled);
        this.await(downstreamCompleted);
    }

    @Test
    public void coupledStageShouldCancelAndFailDownstreamWhenUpstreamFails() {
        CompletableFuture publisherCancelled = new CompletableFuture();
        CompletableFuture downstreamFailed = new CompletableFuture();
        this.rs.failed((Throwable)new QuietRuntimeException("failed")).via(this.rs.coupled(this.rs.builder().ignore(), this.idlePublisher().onTerminate(() -> publisherCancelled.complete(null)))).onError(downstreamFailed::complete).ignore().run(this.getEngine());
        this.await(publisherCancelled);
        Assert.assertTrue((boolean)(this.await(downstreamFailed) instanceof QuietRuntimeException));
    }

    @Test
    public void coupledStageShouldCancelAndCompleteDownstreamWhenSubscriberCancels() {
        CompletableFuture publisherCancelled = new CompletableFuture();
        CompletableFuture downstreamCompleted = new CompletableFuture();
        this.idlePublisher().via(this.rs.coupled(this.rs.builder().cancel(), this.idlePublisher().onTerminate(() -> publisherCancelled.complete(null)))).onComplete(() -> downstreamCompleted.complete(null)).ignore().run(this.getEngine());
        this.await(publisherCancelled);
        this.await(downstreamCompleted);
    }

    @Test
    public void coupledStageShouldBeResuable() {
        ProcessorBuilder coupled = this.rs.coupled(this.rs.builder().ignore(), this.rs.of((Object[])new Integer[]{1, 2, 3}));
        Assert.assertEquals((Collection)((Collection)this.await(this.idlePublisher().via(coupled).toList().run(this.getEngine()))), Arrays.asList(1, 2, 3));
        Assert.assertEquals((Collection)((Collection)this.await(this.idlePublisher().via(coupled).toList().run(this.getEngine()))), Arrays.asList(1, 2, 3));
    }

    @Override
    List<Object> reactiveStreamsTckVerifiers() {
        return Arrays.asList(new Object[]{new PublisherVerification(), new SubscriberVerification(), new ProcessorVerification()});
    }

    public class ProcessorVerification
    extends AbstractStageVerification.StageProcessorVerification<Integer> {
        public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
            Processor processor = CoupledStageVerification.this.rs.builder().buildRs(CoupledStageVerification.this.getEngine());
            return CoupledStageVerification.this.rs.coupled((Subscriber)processor, (Publisher)processor).buildRs(CoupledStageVerification.this.getEngine());
        }

        public Integer createElement(int element) {
            return element;
        }

        public long maxElementsFromPublisher() {
            return 0L;
        }
    }

    public class SubscriberVerification
    extends AbstractStageVerification.StageSubscriberWhiteboxVerification<Integer> {
        public Subscriber<Integer> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<Integer> probe) {
            return CoupledStageVerification.this.rs.coupled(CoupledStageVerification.this.rs.fromSubscriber((Subscriber)new Subscriber<Integer>(){

                public void onSubscribe(final Subscription s) {
                    probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet(){

                        public void triggerRequest(long elements) {
                            s.request(elements);
                        }

                        public void signalCancel() {
                            s.cancel();
                        }
                    });
                }

                public void onNext(Integer integer) {
                    probe.registerOnNext((Object)integer);
                }

                public void onError(Throwable t) {
                    probe.registerOnError(t);
                }

                public void onComplete() {
                    probe.registerOnComplete();
                }
            }), CoupledStageVerification.this.idlePublisher()).buildRs(CoupledStageVerification.this.getEngine());
        }

        public Integer createElement(int element) {
            return element;
        }
    }

    public class PublisherVerification
    extends AbstractStageVerification.StagePublisherVerification<Long> {
        public Publisher<Long> createPublisher(long elements) {
            return CoupledStageVerification.this.rs.coupled(CoupledStageVerification.this.rs.builder().ignore(), CoupledStageVerification.this.rs.fromIterable(() -> LongStream.rangeClosed(1L, elements).boxed().iterator())).buildRs(CoupledStageVerification.this.getEngine());
        }

        @Override
        public Publisher<Long> createFailedPublisher() {
            return CoupledStageVerification.this.rs.coupled(CoupledStageVerification.this.rs.builder().ignore(), CoupledStageVerification.this.rs.failed((Throwable)new QuietRuntimeException("failed"))).buildRs(CoupledStageVerification.this.getEngine());
        }
    }
}

