package darabonba.core.async;

import com.aliyun.core.utils.BinaryUtils;
import com.aliyun.core.utils.FunctionalUtils;
import darabonba.core.ResponseBytes;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:BOOT-INF/lib/darabonba-java-core-0.1.13-beta.jar:darabonba/core/async/ByteArrayAsyncResponseHandler.class */
public final class ByteArrayAsyncResponseHandler<ResponseT> implements AsyncResponseHandler<ResponseT, ResponseBytes<ResponseT>> {
    private BaosSubscriber subscriber;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/darabonba-java-core-0.1.13-beta.jar:darabonba/core/async/ByteArrayAsyncResponseHandler$BaosSubscriber.class */
    public static class BaosSubscriber implements Subscriber<ByteBuffer> {
        private ByteArrayOutputStream baos = new ByteArrayOutputStream();
        private Subscription subscription;

        BaosSubscriber() {
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (this.subscription != null) {
                subscription.cancel();
            } else {
                this.subscription = subscription;
                this.subscription.request(Long.MAX_VALUE);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(ByteBuffer byteBuffer) {
            FunctionalUtils.invokeSafely(() -> {
                this.baos.write(BinaryUtils.copyBytesFrom(byteBuffer));
            });
            this.subscription.request(1L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.baos = null;
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
        }
    }

    @Override // darabonba.core.async.AsyncResponseHandler
    public void onStream(Publisher<ByteBuffer> publisher) {
        this.subscriber = new BaosSubscriber();
        publisher.subscribe(this.subscriber);
    }

    @Override // darabonba.core.async.AsyncResponseHandler
    public ResponseBytes<ResponseT> transform(ResponseT responset) {
        return ResponseBytes.fromByteArrayUnsafe(responset, this.subscriber.baos.toByteArray());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // darabonba.core.async.AsyncResponseHandler
    public /* bridge */ /* synthetic */ Object transform(Object obj) {
        return transform((ByteArrayAsyncResponseHandler<ResponseT>) obj);
    }
}
