package org.apache.flink.connector.kafka.sink;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.class */
public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
    private static final String TRANSACTION_MANAGER_FIELD_NAME = "transactionManager";
    private static final String TRANSACTION_MANAGER_STATE_ENUM = "org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.TransactionManager$State";
    private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";

    @Nullable
    private String transactionalId;
    private volatile boolean inTransaction;
    private volatile boolean closed;

    public FlinkKafkaInternalProducer(Properties properties, @Nullable String str) {
        super(withTransactionalId(properties, str));
        this.transactionalId = str;
    }

    private static Properties withTransactionalId(Properties properties, @Nullable String str) {
        if (str == null) {
            return properties;
        }
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, str);
        return properties2;
    }

    @Override // org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer, org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Producer
    public void flush() {
        super.flush();
        if (this.inTransaction) {
            flushNewPartitions();
        }
    }

    @Override // org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer, org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Producer
    public void beginTransaction() throws ProducerFencedException {
        super.beginTransaction();
        this.inTransaction = true;
    }

    @Override // org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer, org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Producer
    public void abortTransaction() throws ProducerFencedException {
        LOG.debug("abortTransaction {}", this.transactionalId);
        Preconditions.checkState(this.inTransaction, "Transaction was not started");
        this.inTransaction = false;
        super.abortTransaction();
    }

    @Override // org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer, org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Producer
    public void commitTransaction() throws ProducerFencedException {
        LOG.debug("commitTransaction {}", this.transactionalId);
        Preconditions.checkState(this.inTransaction, "Transaction was not started");
        this.inTransaction = false;
        super.commitTransaction();
    }

    public boolean isInTransaction() {
        return this.inTransaction;
    }

    @Override // org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer, org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Producer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        if (this.inTransaction) {
            super.close(Duration.ZERO);
        } else {
            super.close(Duration.ofHours(1L));
        }
    }

    @Override // org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.KafkaProducer, org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.Producer
    public void close(Duration duration) {
        this.closed = true;
        super.close(duration);
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Nullable
    public String getTransactionalId() {
        return this.transactionalId;
    }

    public short getEpoch() {
        return ((Short) getField(getField(getTransactionManager(), PRODUCER_ID_AND_EPOCH_FIELD_NAME), "epoch")).shortValue();
    }

    public long getProducerId() {
        return ((Long) getField(getField(getTransactionManager(), PRODUCER_ID_AND_EPOCH_FIELD_NAME), "producerId")).longValue();
    }

    public void initTransactionId(String str) {
        if (str.equals(this.transactionalId)) {
            return;
        }
        setTransactionId(str);
        initTransactions();
    }

    public void setTransactionId(String str) {
        if (str.equals(this.transactionalId)) {
            return;
        }
        Preconditions.checkState(!this.inTransaction, String.format("Another transaction %s is still open.", str));
        LOG.debug("Change transaction id from {} to {}", this.transactionalId, str);
        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            setField(transactionManager, "transactionalId", str);
            setField(transactionManager, "currentState", getTransactionManagerState("UNINITIALIZED"));
            this.transactionalId = str;
        }
    }

    private void flushNewPartitions() {
        LOG.info("Flushing new partitions");
        TransactionalRequestResult enqueueNewPartitions = enqueueNewPartitions();
        invoke(getField("sender"), "wakeup", new Object[0]);
        enqueueNewPartitions.await();
    }

    private TransactionalRequestResult enqueueNewPartitions() {
        TransactionalRequestResult transactionalRequestResult;
        TransactionalRequestResult transactionalRequestResult2;
        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            Object invoke = invoke(getField(transactionManager, "newPartitionsInTransaction"), "isEmpty", new Object[0]);
            if (!(invoke instanceof Boolean) || ((Boolean) invoke).booleanValue()) {
                transactionalRequestResult = new TransactionalRequestResult("AddPartitionsToTxn");
                transactionalRequestResult.done();
            } else {
                Object invoke2 = invoke(transactionManager, "addPartitionsToTransactionHandler", new Object[0]);
                invoke(transactionManager, "enqueueRequest", new Class[]{invoke2.getClass().getSuperclass()}, new Object[]{invoke2});
                transactionalRequestResult = (TransactionalRequestResult) getField(invoke2, invoke2.getClass().getSuperclass(), "result");
            }
            transactionalRequestResult2 = transactionalRequestResult;
        }
        return transactionalRequestResult2;
    }

    private static Object invoke(Object obj, String str, Object... objArr) {
        Class[] clsArr = new Class[objArr.length];
        for (int i = 0; i < objArr.length; i++) {
            clsArr[i] = objArr[i].getClass();
        }
        return invoke(obj, str, clsArr, objArr);
    }

    private static Object invoke(Object obj, String str, Class<?>[] clsArr, Object[] objArr) {
        try {
            Method declaredMethod = obj.getClass().getDeclaredMethod(str, clsArr);
            declaredMethod.setAccessible(true);
            return declaredMethod.invoke(obj, objArr);
        } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private Object getField(String str) {
        return getField(this, KafkaProducer.class, str);
    }

    private static Object getField(Object obj, String str) {
        return getField(obj, obj.getClass(), str);
    }

    private static Object getField(Object obj, Class<?> cls, String str) {
        try {
            Field declaredField = cls.getDeclaredField(str);
            declaredField.setAccessible(true);
            return declaredField.get(obj);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    public void resumeTransaction(long j, short s) {
        Preconditions.checkState(!this.inTransaction, "Already in transaction %s", new Object[]{this.transactionalId});
        Preconditions.checkState(j >= 0 && s >= 0, "Incorrect values for producerId %s and epoch %s", new Object[]{Long.valueOf(j), Short.valueOf(s)});
        LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", new Object[]{this.transactionalId, Long.valueOf(j), Short.valueOf(s)});
        Object transactionManager = getTransactionManager();
        synchronized (transactionManager) {
            Object field = getField(transactionManager, "topicPartitionBookkeeper");
            transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
            invoke(field, "reset", new Object[0]);
            setField(transactionManager, PRODUCER_ID_AND_EPOCH_FIELD_NAME, createProducerIdAndEpoch(j, s));
            transitionTransactionManagerStateTo(transactionManager, "READY");
            transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
            setField(transactionManager, "transactionStarted", true);
            this.inTransaction = true;
        }
    }

    private static Object createProducerIdAndEpoch(long j, short s) {
        try {
            Constructor<?> declaredConstructor = TransactionManager.class.getDeclaredField(PRODUCER_ID_AND_EPOCH_FIELD_NAME).getType().getDeclaredConstructor(Long.TYPE, Short.TYPE);
            declaredConstructor.setAccessible(true);
            return declaredConstructor.newInstance(Long.valueOf(j), Short.valueOf(s));
        } catch (IllegalAccessException | InstantiationException | NoSuchFieldException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static void setField(Object obj, String str, Object obj2) {
        setField(obj, obj.getClass(), str, obj2);
    }

    private static void setField(Object obj, Class<?> cls, String str, Object obj2) {
        try {
            Field declaredField = cls.getDeclaredField(str);
            declaredField.setAccessible(true);
            declaredField.set(obj, obj2);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static Enum<?> getTransactionManagerState(String str) {
        try {
            return Enum.valueOf(Class.forName(TRANSACTION_MANAGER_STATE_ENUM), str);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private Object getTransactionManager() {
        return getField(TRANSACTION_MANAGER_FIELD_NAME);
    }

    private static void transitionTransactionManagerStateTo(Object obj, String str) {
        invoke(obj, "transitionTo", getTransactionManagerState(str));
    }

    public String toString() {
        return "FlinkKafkaInternalProducer{transactionalId='" + this.transactionalId + "', inTransaction=" + this.inTransaction + ", closed=" + this.closed + '}';
    }
}
