/*
 * Decompiled with CFR 0.152.
 */
package oracle.jdbc.driver;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Phaser;
import java.util.logging.Level;
import oracle.jdbc.diagnostics.CommonDiagnosable;
import oracle.jdbc.diagnostics.SecurityLabel;
import oracle.jdbc.driver.BufferedPublisher;
import oracle.jdbc.driver.DatabaseError;
import oracle.jdbc.internal.CompletionStageUtil;
import oracle.jdbc.internal.Monitor;

abstract class LobSegmentSubscriber<T>
implements Flow.Subscriber<T> {
    private static final String CLASS_NAME = LobSegmentSubscriber.class.getName();
    private final Monitor signalMonitor = Monitor.newInstance();
    private final LobSegmentBuffer<T> buffer;
    private final BufferedPublisher<Long> outcomePublisher;
    private final Runnable terminalAction;
    private final Executor userCodeExecutor;
    private Flow.Subscription subscription;
    private boolean isTerminated = false;
    private volatile CompletionStage<Void> pushSegmentStage = CompletableFuture.completedFuture(null);
    static final Flow.Subscriber<Long> NO_OUTCOME_SUBSCRIBER = new Flow.Subscriber<Long>(){

        @Override
        public void onSubscribe(Flow.Subscription outcomeSubscription) {
        }

        @Override
        public void onNext(Long outcome) {
        }

        @Override
        public void onError(Throwable failedOutcome) {
        }

        @Override
        public void onComplete() {
        }
    };

    LobSegmentSubscriber(LobSegmentBuffer<T> buffer, Flow.Subscriber<Long> outcomeSubscriber, Runnable terminalAction, Executor userCodeExecutor, Phaser joinPhaser) {
        this.userCodeExecutor = userCodeExecutor;
        this.buffer = buffer;
        if (outcomeSubscriber == NO_OUTCOME_SUBSCRIBER) {
            this.outcomePublisher = null;
        } else {
            this.outcomePublisher = BufferedPublisher.newInstance(Flow.defaultBufferSize(), joinPhaser, userCodeExecutor);
            this.outcomePublisher.subscribe(outcomeSubscriber);
        }
        this.terminalAction = terminalAction;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        Objects.requireNonNull(subscription);
        try (Monitor.CloseableLock lock = this.signalMonitor.acquireCloseableLock();){
            if (this.subscription != null) {
                subscription.cancel();
                if (subscription.equals(this.subscription)) {
                    this.cancelSubscription();
                }
                return;
            }
            this.subscription = subscription;
            this.userCodeExecutor.execute(() -> subscription.request(1L));
        }
    }

    @Override
    public final void onNext(T nextSegment) {
        Objects.requireNonNull(nextSegment);
        try (Monitor.CloseableLock lock = this.signalMonitor.acquireCloseableLock();){
            if (this.isTerminated) {
                return;
            }
            this.pushSegmentStage = this.pushSegment(nextSegment, 0).thenAccept(nil -> {
                if (!this.isTerminated) {
                    this.userCodeExecutor.execute(() -> this.subscription.request(1L));
                }
            });
        }
    }

    private final CompletionStage<Void> pushSegment(T segment, int offset) {
        int putLength = this.buffer.putSegment(segment, offset);
        int putRemaining = this.buffer.getSegmentLength(segment) - (offset + putLength);
        if (putRemaining == 0) {
            return CompletionStageUtil.VOID_COMPLETED_FUTURE;
        }
        return this.flushBufferAsync(true).thenCompose(nil -> this.pushSegment(segment, offset + putLength));
    }

    @Override
    public final void onComplete() {
        try (Monitor.CloseableLock lock = this.signalMonitor.acquireCloseableLock();){
            if (this.isTerminated) {
                return;
            }
            this.isTerminated = true;
            this.pushSegmentStage = this.pushSegmentStage.thenCompose(nil -> {
                if (this.buffer.getPosition() > 0) {
                    return this.flushBufferAsync(false);
                }
                return CompletionStageUtil.VOID_COMPLETED_FUTURE;
            }).whenComplete((nil, err) -> {
                if (err == null && this.outcomePublisher != null) {
                    this.outcomePublisher.terminate(null);
                }
                this.terminalAction.run();
            });
        }
    }

    @Override
    public final void onError(Throwable error) {
        Objects.requireNonNull(error);
        CommonDiagnosable.getInstance().debug(Level.FINEST, SecurityLabel.UNKNOWN, "oracle/jdbc/driver/OracleBlob", "flushBufferAsync", "LOB Subscriber onError(Throwable) was invoked with:", null, error);
        try (Monitor.CloseableLock lock = this.signalMonitor.acquireCloseableLock();){
            if (this.isTerminated) {
                return;
            }
            this.isTerminated = true;
            this.pushSegmentStage.whenComplete((nil, err) -> {
                this.terminalAction.run();
                if (this.outcomePublisher != null && err == null) {
                    this.outcomePublisher.terminate(DatabaseError.createSqlException(null, 1713, null, error).fillInStackTrace());
                }
            });
        }
    }

    private final void cancelSubscription() {
        try (Monitor.CloseableLock lock = this.signalMonitor.acquireCloseableLock();){
            this.userCodeExecutor.execute(this.subscription::cancel);
            if (!this.isTerminated) {
                this.isTerminated = true;
                this.terminalAction.run();
            }
        }
    }

    private final CompletionStage<Void> flushBufferAsync(boolean awaitOutcomeBuffer) {
        int writeLength = this.buffer.getPosition();
        CompletionStage<Void> flushBufferStage = this.flushBufferAsync(writeLength).whenComplete((nil, err) -> {
            if (err == null) {
                this.buffer.reset();
            } else {
                this.cancelSubscription();
                if (this.outcomePublisher != null) {
                    this.outcomePublisher.terminate(CompletionStageUtil.unwrapCompletionException(err));
                }
            }
        });
        if (this.outcomePublisher != null) {
            CompletionStage<Void> outcomeBufferStage = flushBufferStage.thenCompose(nil -> this.outcomePublisher.offerItem(Long.valueOf(writeLength)));
            if (awaitOutcomeBuffer) {
                return outcomeBufferStage;
            }
        }
        return flushBufferStage;
    }

    abstract CompletionStage<Void> flushBufferAsync(int var1);

    static abstract class LobSegmentBuffer<T> {
        private volatile int position = 0;

        LobSegmentBuffer() {
        }

        static LobSegmentBuffer<byte[]> newByteBuffer(final byte[] buffer) {
            return new LobSegmentBuffer<byte[]>(){

                @Override
                protected int copyToBuffer(byte[] segment, int segmentOffset, int bufferPosition) {
                    int copyLength = Math.min(buffer.length - bufferPosition, segment.length - segmentOffset);
                    System.arraycopy(segment, segmentOffset, buffer, bufferPosition, copyLength);
                    return copyLength;
                }

                @Override
                protected int getSegmentLength(byte[] segment) {
                    return segment.length;
                }
            };
        }

        static LobSegmentBuffer<String> newCharacterBuffer(final char[] buffer) {
            return new LobSegmentBuffer<String>(){

                @Override
                protected int copyToBuffer(String segment, int offset, int bufferPosition) {
                    int copyLength = Math.min(buffer.length - bufferPosition, segment.length() - offset);
                    segment.getChars(offset, offset + copyLength, buffer, bufferPosition);
                    return copyLength;
                }

                @Override
                protected int getSegmentLength(String segment) {
                    return segment.length();
                }
            };
        }

        private int getPosition() {
            return this.position;
        }

        private void reset() {
            this.position = 0;
        }

        private int putSegment(T segment, int offset) {
            int copyLength = this.copyToBuffer(segment, offset, this.position);
            this.position += copyLength;
            return copyLength;
        }

        protected abstract int copyToBuffer(T var1, int var2, int var3);

        protected abstract int getSegmentLength(T var1);
    }
}

