package reactor.core.publisher;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
import org.aspectj.weaver.model.AsmRelationshipUtils;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.FluxPublish;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:BOOT-INF/lib/reactor-core-3.1.5.RELEASE.jar:reactor/core/publisher/EmitterProcessor.class */
public final class EmitterProcessor<T> extends FluxProcessor<T, T> {
    final int prefetch;
    final boolean autoCancel;
    volatile Subscription s;
    volatile FluxPublish.PubSubInner<T>[] subscribers;
    volatile int wip;
    volatile Queue<T> queue;
    int sourceMode;
    volatile boolean done;
    volatile Throwable error;
    static final AtomicReferenceFieldUpdater<EmitterProcessor, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, Subscription.class, "s");
    static final AtomicReferenceFieldUpdater<EmitterProcessor, FluxPublish.PubSubInner[]> SUBSCRIBERS = AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, FluxPublish.PubSubInner[].class, "subscribers");
    static final AtomicIntegerFieldUpdater<EmitterProcessor> WIP = AtomicIntegerFieldUpdater.newUpdater(EmitterProcessor.class, "wip");
    static final AtomicReferenceFieldUpdater<EmitterProcessor, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, Throwable.class, AsmRelationshipUtils.DECLARE_ERROR);

    /* loaded from: input_file:BOOT-INF/lib/reactor-core-3.1.5.RELEASE.jar:reactor/core/publisher/EmitterProcessor$EmitterInner.class */
    static final class EmitterInner<T> extends FluxPublish.PubSubInner<T> {
        final EmitterProcessor<T> parent;

        EmitterInner(CoreSubscriber<? super T> coreSubscriber, EmitterProcessor<T> emitterProcessor) {
            super(coreSubscriber);
            this.parent = emitterProcessor;
        }

        @Override // reactor.core.publisher.FluxPublish.PubSubInner
        void drainParent() {
            this.parent.drain();
        }

        @Override // reactor.core.publisher.FluxPublish.PubSubInner
        void removeAndDrainParent() {
            this.parent.remove(this);
            this.parent.drain();
        }
    }

    public static <E> EmitterProcessor<E> create() {
        return create(Queues.SMALL_BUFFER_SIZE, true);
    }

    public static <E> EmitterProcessor<E> create(boolean z) {
        return create(Queues.SMALL_BUFFER_SIZE, z);
    }

    public static <E> EmitterProcessor<E> create(int i) {
        return create(i, true);
    }

    public static <E> EmitterProcessor<E> create(int i, boolean z) {
        return new EmitterProcessor<>(z, i);
    }

    EmitterProcessor(boolean z, int i) {
        if (i < 1) {
            throw new IllegalArgumentException("bufferSize must be strictly positive, was: " + i);
        }
        this.autoCancel = z;
        this.prefetch = i;
        SUBSCRIBERS.lazySet(this, FluxPublish.PublishSubscriber.EMPTY);
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    public Stream<? extends Scannable> inners() {
        return Stream.of((Object[]) this.subscribers);
    }

    @Override // reactor.core.publisher.Flux
    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "subscribe");
        EmitterInner<T> emitterInner = new EmitterInner<>(coreSubscriber, this);
        coreSubscriber.onSubscribe(emitterInner);
        if (emitterInner.isCancelled()) {
            return;
        }
        if (add(emitterInner)) {
            if (emitterInner.isCancelled()) {
                remove(emitterInner);
            }
            drain();
        } else {
            Throwable th = this.error;
            if (th != null) {
                emitterInner.actual.onError(th);
            } else {
                emitterInner.actual.onComplete();
            }
        }
    }

    public int getPending() {
        Queue<T> queue = this.queue;
        if (queue != null) {
            return queue.size();
        }
        return 0;
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (Operators.setOnce(S, this, subscription)) {
            if (subscription instanceof Fuseable.QueueSubscription) {
                Fuseable.QueueSubscription queueSubscription = (Fuseable.QueueSubscription) subscription;
                int requestFusion = queueSubscription.requestFusion(3);
                if (requestFusion == 1) {
                    this.sourceMode = requestFusion;
                    this.queue = queueSubscription;
                    this.done = true;
                    drain();
                    return;
                }
                if (requestFusion == 2) {
                    this.sourceMode = requestFusion;
                    this.queue = queueSubscription;
                    subscription.request(Operators.unboundedOrPrefetch(this.prefetch));
                    return;
                }
            }
            this.queue = (Queue) Queues.get(this.prefetch).get();
            subscription.request(Operators.unboundedOrPrefetch(this.prefetch));
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(T t) {
        if (this.done) {
            Operators.onNextDropped(t, currentContext());
            return;
        }
        if (this.sourceMode == 2) {
            drain();
            return;
        }
        Objects.requireNonNull(t, "onNext");
        Queue<T> queue = this.queue;
        if (queue == null) {
            if (Operators.setOnce(S, this, Operators.emptySubscription())) {
                queue = (Queue) Queues.get(this.prefetch).get();
                this.queue = queue;
            }
            while (!isDisposed()) {
                queue = this.queue;
                if (queue != null) {
                }
            }
            return;
        }
        while (!queue.offer(t)) {
            LockSupport.parkNanos(10L);
        }
        drain();
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        Objects.requireNonNull(th, "onError");
        if (this.done) {
            Operators.onErrorDropped(th, currentContext());
        } else if (!Exceptions.addThrowable(ERROR, this, th)) {
            Operators.onErrorDroppedMulticast(th);
        } else {
            this.done = true;
            drain();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        drain();
    }

    @Override // reactor.core.publisher.FluxProcessor
    @Nullable
    public Throwable getError() {
        return this.error;
    }

    public boolean isCancelled() {
        return Operators.cancelledSubscription() == this.s;
    }

    @Override // reactor.core.publisher.FluxProcessor
    public final int getBufferSize() {
        return this.prefetch;
    }

    @Override // reactor.core.publisher.FluxProcessor
    public boolean isTerminated() {
        return this.done && getPending() == 0;
    }

    @Override // reactor.core.publisher.Flux
    public int getPrefetch() {
        return this.prefetch;
    }

    @Override // reactor.core.publisher.FluxProcessor, reactor.core.Scannable
    @Nullable
    public Object scanUnsafe(Scannable.Attr attr) {
        return attr == Scannable.Attr.PARENT ? this.s : attr == Scannable.Attr.BUFFERED ? Integer.valueOf(getPending()) : attr == Scannable.Attr.CANCELLED ? Boolean.valueOf(isCancelled()) : attr == Scannable.Attr.PREFETCH ? Integer.valueOf(getPrefetch()) : super.scanUnsafe(attr);
    }

    final void drain() {
        T t;
        T t2;
        if (WIP.getAndIncrement(this) != 0) {
            return;
        }
        int i = 1;
        while (true) {
            boolean z = this.done;
            Queue<T> queue = this.queue;
            boolean z2 = queue == null || queue.isEmpty();
            if (checkTerminated(z, z2)) {
                return;
            }
            FluxPublish.PubSubInner<T>[] pubSubInnerArr = this.subscribers;
            if (pubSubInnerArr != FluxPublish.PublishSubscriber.EMPTY && !z2) {
                long j = Long.MAX_VALUE;
                int length = pubSubInnerArr.length;
                int i2 = 0;
                for (FluxPublish.PubSubInner<T> pubSubInner : pubSubInnerArr) {
                    long j2 = pubSubInner.requested;
                    if (j2 >= 0) {
                        j = Math.min(j, j2);
                    } else {
                        i2++;
                    }
                }
                if (length == i2) {
                    try {
                        t2 = queue.poll();
                    } catch (Throwable th) {
                        Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this.s, th, currentContext()));
                        z = true;
                        t2 = null;
                    }
                    if (checkTerminated(z, t2 == null)) {
                        return;
                    }
                    if (this.sourceMode != 1) {
                        this.s.request(1L);
                    }
                } else {
                    int i3 = 0;
                    while (i3 < j && i2 != Integer.MIN_VALUE) {
                        boolean z3 = this.done;
                        try {
                            t = queue.poll();
                        } catch (Throwable th2) {
                            Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this.s, th2, currentContext()));
                            z3 = true;
                            t = null;
                        }
                        z2 = t == null;
                        if (checkTerminated(z3, z2)) {
                            return;
                        }
                        if (z2) {
                            break;
                        }
                        for (FluxPublish.PubSubInner<T> pubSubInner2 : pubSubInnerArr) {
                            pubSubInner2.actual.onNext(t);
                            if (Operators.producedCancellable(FluxPublish.PublishInner.REQUESTED, pubSubInner2, 1L) == Long.MIN_VALUE) {
                                i2 = Integer.MIN_VALUE;
                            }
                        }
                        i3++;
                    }
                    if (i3 != 0 && this.sourceMode != 1) {
                        this.s.request(i3);
                    }
                    if (j != 0 && !z2) {
                    }
                }
            }
            i = WIP.addAndGet(this, -i);
            if (i == 0) {
                return;
            }
        }
    }

    FluxPublish.PubSubInner<T>[] terminate() {
        return SUBSCRIBERS.getAndSet(this, FluxPublish.PublishSubscriber.TERMINATED);
    }

    boolean checkTerminated(boolean z, boolean z2) {
        if (this.s == Operators.cancelledSubscription()) {
            if (!this.autoCancel) {
                return true;
            }
            terminate();
            Queue<T> queue = this.queue;
            if (queue == null) {
                return true;
            }
            queue.clear();
            return true;
        }
        if (!z) {
            return false;
        }
        Throwable th = this.error;
        if (th == null || th == Exceptions.TERMINATED) {
            if (!z2) {
                return false;
            }
            for (FluxPublish.PubSubInner<T> pubSubInner : terminate()) {
                pubSubInner.actual.onComplete();
            }
            return true;
        }
        Queue<T> queue2 = this.queue;
        if (queue2 != null) {
            queue2.clear();
        }
        for (FluxPublish.PubSubInner<T> pubSubInner2 : terminate()) {
            pubSubInner2.actual.onError(th);
        }
        return true;
    }

    final boolean add(EmitterInner<T> emitterInner) {
        FluxPublish.PubSubInner<T>[] pubSubInnerArr;
        FluxPublish.PubSubInner[] pubSubInnerArr2;
        do {
            pubSubInnerArr = this.subscribers;
            if (pubSubInnerArr == FluxPublish.PublishSubscriber.TERMINATED) {
                return false;
            }
            int length = pubSubInnerArr.length;
            pubSubInnerArr2 = new FluxPublish.PubSubInner[length + 1];
            System.arraycopy(pubSubInnerArr, 0, pubSubInnerArr2, 0, length);
            pubSubInnerArr2[length] = emitterInner;
        } while (!SUBSCRIBERS.compareAndSet(this, pubSubInnerArr, pubSubInnerArr2));
        return true;
    }

    final void remove(FluxPublish.PubSubInner<T> pubSubInner) {
        FluxPublish.PubSubInner[] pubSubInnerArr;
        FluxPublish.PubSubInner<T>[] pubSubInnerArr2 = this.subscribers;
        if (pubSubInnerArr2 == FluxPublish.PublishSubscriber.TERMINATED || pubSubInnerArr2 == FluxPublish.PublishSubscriber.EMPTY) {
            return;
        }
        int length = pubSubInnerArr2.length;
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= length) {
                break;
            }
            if (pubSubInnerArr2[i2] == pubSubInner) {
                i = i2;
                break;
            }
            i2++;
        }
        if (i < 0) {
            return;
        }
        if (length == 1) {
            pubSubInnerArr = FluxPublish.PublishSubscriber.EMPTY;
        } else {
            pubSubInnerArr = new FluxPublish.PubSubInner[length - 1];
            System.arraycopy(pubSubInnerArr2, 0, pubSubInnerArr, 0, i);
            System.arraycopy(pubSubInnerArr2, i + 1, pubSubInnerArr, i, (length - i) - 1);
        }
        if (SUBSCRIBERS.compareAndSet(this, pubSubInnerArr2, pubSubInnerArr) && this.autoCancel && pubSubInnerArr == FluxPublish.PublishSubscriber.EMPTY && Operators.terminate(S, this) && WIP.getAndIncrement(this) == 0) {
            terminate();
            Queue<T> queue = this.queue;
            if (queue != null) {
                queue.clear();
            }
        }
    }

    @Override // reactor.core.publisher.FluxProcessor
    public long downstreamCount() {
        return this.subscribers.length;
    }
}
