package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.eventhubs.ReceiverRuntimeInformation;
import com.microsoft.azure.eventhubs.impl.ReceivePump;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnknownDescribedType;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes.dex */
public final class PartitionReceiverImpl extends ClientEntity implements ReceiverSettingsProvider, PartitionReceiver {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger((Class<?>) PartitionReceiverImpl.class);
    private final String consumerGroupName;
    private final Long epoch;
    private final String eventHubName;
    private final EventPositionImpl eventPosition;
    private volatile MessageReceiver internalReceiver;
    private final boolean isEpochReceiver;
    private final String partitionId;
    private final Object receiveHandlerLock;
    private ReceivePump receivePump;
    private final ReceiverOptions receiverOptions;
    private final ReceiverRuntimeInformation runtimeInformation;
    private final MessagingFactory underlyingFactory;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    private PartitionReceiverImpl(MessagingFactory messagingFactory, String str, String str2, String str3, EventPositionImpl eventPositionImpl, Long l, boolean z, ReceiverOptions receiverOptions, Executor executor) {
        super(null, null, executor);
        ReceiverRuntimeInformation receiverRuntimeInformation = null;
        this.underlyingFactory = messagingFactory;
        this.eventHubName = str;
        this.consumerGroupName = str2;
        this.partitionId = str3;
        this.eventPosition = eventPositionImpl;
        this.epoch = l;
        this.isEpochReceiver = z;
        this.receiveHandlerLock = new Object();
        this.receiverOptions = receiverOptions;
        if (receiverOptions != null && receiverOptions.getReceiverRuntimeMetricEnabled()) {
            receiverRuntimeInformation = new ReceiverRuntimeInformation(str3);
        }
        this.runtimeInformation = receiverRuntimeInformation;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<PartitionReceiver> create(MessagingFactory messagingFactory, String str, String str2, String str3, EventPosition eventPosition, long j, boolean z, ReceiverOptions receiverOptions, Executor executor) throws EventHubException {
        if (j < 0) {
            throw new IllegalArgumentException("epoch cannot be a negative value. Please specify a zero or positive long value.");
        }
        if (StringUtil.isNullOrWhiteSpace(str2)) {
            throw new IllegalArgumentException("specify valid string for argument - 'consumerGroupName'");
        }
        PartitionReceiverImpl partitionReceiverImpl = new PartitionReceiverImpl(messagingFactory, str, str2, str3, (EventPositionImpl) eventPosition, Long.valueOf(j), z, receiverOptions, executor);
        return partitionReceiverImpl.createInternalReceiver().thenApplyAsync((Function<? super Void, ? extends U>) new Function<Void, PartitionReceiver>() { // from class: com.microsoft.azure.eventhubs.impl.PartitionReceiverImpl.1
            @Override // java.util.function.Function
            public PartitionReceiver apply(Void r1) {
                return PartitionReceiverImpl.this;
            }
        }, executor);
    }

    private CompletableFuture<Void> createInternalReceiver() {
        return MessageReceiver.create(this.underlyingFactory, StringUtil.getRandomString(), String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), PartitionReceiver.DEFAULT_PREFETCH_COUNT, this).thenAcceptAsync((Consumer<? super MessageReceiver>) new Consumer<MessageReceiver>() { // from class: com.microsoft.azure.eventhubs.impl.PartitionReceiverImpl.2
            @Override // java.util.function.Consumer
            public void accept(MessageReceiver messageReceiver) {
                PartitionReceiverImpl.this.internalReceiver = messageReceiver;
            }
        }, this.executor);
    }

    @Override // com.microsoft.azure.eventhubs.impl.ReceiverSettingsProvider
    public Symbol[] getDesiredCapabilities() {
        ReceiverOptions receiverOptions = this.receiverOptions;
        if (receiverOptions == null || !receiverOptions.getReceiverRuntimeMetricEnabled()) {
            return null;
        }
        return new Symbol[]{AmqpConstants.ENABLE_RECEIVER_RUNTIME_METRIC_NAME};
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public final long getEpoch() {
        return this.epoch.longValue();
    }

    @Override // com.microsoft.azure.eventhubs.impl.ReceiverSettingsProvider
    public Map<Symbol, UnknownDescribedType> getFilter(Message message) {
        String str;
        String format = message != null ? String.format(AmqpConstants.AMQP_ANNOTATION_FORMAT, AmqpConstants.OFFSET_ANNOTATION_NAME, "", message.getMessageAnnotations().getValue().get(AmqpConstants.OFFSET).toString()) : this.eventPosition.getExpression();
        Logger logger = TRACE_LOGGER;
        if (logger.isInfoEnabled()) {
            if (this.internalReceiver == null) {
                str = "receiverPath[RECEIVER IS NULL]";
            } else {
                str = "receiverPath[" + this.internalReceiver.getReceivePath() + "]";
            }
            logger.info(String.format("%s, action[createReceiveLink], %s", str, this.eventPosition));
        }
        return Collections.singletonMap(AmqpConstants.STRING_FILTER, new UnknownDescribedType(AmqpConstants.STRING_FILTER, format));
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public final String getPartitionId() {
        return this.partitionId;
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public final int getPrefetchCount() {
        return this.internalReceiver.getPrefetchCount();
    }

    @Override // com.microsoft.azure.eventhubs.impl.ReceiverSettingsProvider
    public Map<Symbol, Object> getProperties() {
        ReceiverOptions receiverOptions;
        if (!this.isEpochReceiver && ((receiverOptions = this.receiverOptions) == null || receiverOptions.getIdentifier() == null)) {
            return null;
        }
        HashMap hashMap = new HashMap();
        if (this.isEpochReceiver) {
            hashMap.put(AmqpConstants.EPOCH, this.epoch);
        }
        ReceiverOptions receiverOptions2 = this.receiverOptions;
        if (receiverOptions2 != null && receiverOptions2.getIdentifier() != null) {
            hashMap.put(AmqpConstants.RECEIVER_IDENTIFIER_NAME, this.receiverOptions.getIdentifier());
        }
        return hashMap;
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public final Duration getReceiveTimeout() {
        return this.internalReceiver.getReceiveTimeout();
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public final ReceiverRuntimeInformation getRuntimeInformation() {
        return this.runtimeInformation;
    }

    final EventPosition getStartingPosition() {
        return this.eventPosition;
    }

    @Override // com.microsoft.azure.eventhubs.impl.ClientEntity
    public CompletableFuture<Void> onClose() {
        synchronized (this.receiveHandlerLock) {
            ReceivePump receivePump = this.receivePump;
            if (receivePump != null && receivePump.isRunning()) {
                this.receivePump.stop();
            }
        }
        return this.internalReceiver != null ? this.internalReceiver.close() : CompletableFuture.completedFuture(null);
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public CompletableFuture<Iterable<EventData>> receive(int i) {
        return this.internalReceiver.receive(i).thenApplyAsync((Function<? super Collection<Message>, ? extends U>) new Function<Collection<Message>, Iterable<EventData>>() { // from class: com.microsoft.azure.eventhubs.impl.PartitionReceiverImpl.3
            @Override // java.util.function.Function
            public Iterable<EventData> apply(Collection<Message> collection) {
                DeliveryAnnotations deliveryAnnotations;
                PassByRef passByRef = (PartitionReceiverImpl.this.receiverOptions == null || !PartitionReceiverImpl.this.receiverOptions.getReceiverRuntimeMetricEnabled()) ? null : new PassByRef();
                LinkedList<EventData> eventDataCollection = EventDataUtil.toEventDataCollection(collection, passByRef);
                if (passByRef != null && passByRef.get() != null && (deliveryAnnotations = ((Message) passByRef.get()).getDeliveryAnnotations()) != null && deliveryAnnotations.getValue() != null) {
                    Map<Symbol, Object> value = deliveryAnnotations.getValue();
                    PartitionReceiverImpl.this.runtimeInformation.setRuntimeInformation(((Long) value.get(ClientConstants.LAST_ENQUEUED_SEQUENCE_NUMBER)).longValue(), ((Date) value.get(ClientConstants.LAST_ENQUEUED_TIME_UTC)).toInstant(), (String) value.get(ClientConstants.LAST_ENQUEUED_OFFSET));
                }
                return eventDataCollection;
            }
        }, this.executor);
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public /* synthetic */ Iterable receiveSync(int i) {
        return PartitionReceiver.CC.$default$receiveSync(this, i);
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public final void setPrefetchCount(int i) throws EventHubException {
        if (i < 10) {
            throw new IllegalArgumentException(String.format(Locale.US, "PrefetchCount has to be above %s", 10));
        }
        this.internalReceiver.setPrefetchCount(i);
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public CompletableFuture<Void> setReceiveHandler(PartitionReceiveHandler partitionReceiveHandler) {
        return setReceiveHandler(partitionReceiveHandler, false);
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public CompletableFuture<Void> setReceiveHandler(PartitionReceiveHandler partitionReceiveHandler, boolean z) {
        synchronized (this.receiveHandlerLock) {
            if (partitionReceiveHandler == null) {
                ReceivePump receivePump = this.receivePump;
                if (receivePump != null && receivePump.isRunning()) {
                    return this.receivePump.stop();
                }
            } else {
                ReceivePump receivePump2 = this.receivePump;
                if (receivePump2 != null && receivePump2.isRunning()) {
                    throw new IllegalArgumentException("Unexpected value for parameter 'receiveHandler'. PartitionReceiver was already registered with a PartitionReceiveHandler instance. Only 1 instance can be registered.");
                }
                this.receivePump = new ReceivePump(new ReceivePump.IPartitionReceiver() { // from class: com.microsoft.azure.eventhubs.impl.PartitionReceiverImpl.4
                    @Override // com.microsoft.azure.eventhubs.impl.ReceivePump.IPartitionReceiver
                    public String getPartitionId() {
                        return PartitionReceiverImpl.this.getPartitionId();
                    }

                    @Override // com.microsoft.azure.eventhubs.impl.ReceivePump.IPartitionReceiver
                    public Iterable<EventData> receive(int i) throws EventHubException {
                        return PartitionReceiverImpl.this.receiveSync(i);
                    }
                }, partitionReceiveHandler, z);
                new Thread(new Runnable() { // from class: com.microsoft.azure.eventhubs.impl.PartitionReceiverImpl.5
                    @Override // java.lang.Runnable
                    public void run() {
                        PartitionReceiverImpl.this.receivePump.run();
                    }
                }).start();
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    @Override // com.microsoft.azure.eventhubs.PartitionReceiver
    public void setReceiveTimeout(Duration duration) {
        this.internalReceiver.setReceiveTimeout(duration);
    }
}
