package com.taobao.txc.resourcemanager.rt;

import com.taobao.txc.common.CommitMode;
import com.taobao.txc.common.LoggerInit;
import com.taobao.txc.common.LoggerWrap;
import com.taobao.txc.common.exception.TxcErrCode;
import com.taobao.txc.common.exception.TxcException;
import com.taobao.txc.common.message.ResultCode;
import com.taobao.txc.resourcemanager.IRmRpcClient;
import com.taobao.txc.resourcemanager.jdbc.TxcAtomDataSourceHelper;
import com.taobao.txc.resourcemanager.jdbc.api.ITxcDataSource;
import com.taobao.txc.rpc.impl.RegisterRmMessage;
import com.taobao.txc.rpc.impl.RegisterRmResultMessage;
import com.taobao.txc.rpc.impl.RpcClient;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@ChannelHandler.Sharable
/* loaded from: input_file:com/taobao/txc/resourcemanager/rt/RtRpcClient.class */
public class RtRpcClient extends RpcClient implements IRmRpcClient {
    private static final LoggerWrap logger = LoggerInit.logger;
    private static RtRpcClient instance;

    private RtRpcClient(ThreadPoolExecutor threadPoolExecutor) {
        super(threadPoolExecutor);
    }

    public static RtRpcClient getInstance(ThreadPoolExecutor threadPoolExecutor) {
        if (instance == null) {
            instance = new RtRpcClient(threadPoolExecutor);
        }
        return instance;
    }

    @Override // com.taobao.txc.rpc.impl.RpcClient, com.taobao.txc.rpc.impl.RpcEndpoint, com.taobao.txc.resourcemanager.IRmRpcClient
    public void init() {
        this.timerExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.taobao.txc.resourcemanager.rt.RtRpcClient.1
            @Override // java.lang.Runnable
            public void run() {
                Iterator<Map.Entry<ITxcDataSource, Map<String, String>>> it = TxcAtomDataSourceHelper.getRtServerJournelMap().entrySet().iterator();
                while (it.hasNext()) {
                    Iterator<Map.Entry<String, String>> it2 = it.next().getValue().entrySet().iterator();
                    while (it2.hasNext()) {
                        RtRpcClient.this.connect(it2.next().getKey());
                    }
                }
            }
        }, 30L, 60L, TimeUnit.SECONDS);
        super.init(30L, 60L);
    }

    @Override // com.taobao.txc.rpc.impl.RpcClient
    public void setClientAppName(String str) {
        super.setClientAppName(str + "_rt");
    }

    @Override // com.taobao.txc.rpc.impl.RpcClient
    protected String getTargetServerAddress(String str) {
        throw new RuntimeException("unsupprted method!");
    }

    @Override // com.taobao.txc.rpc.impl.RpcClient
    public Channel connect(String str) {
        Channel _connect;
        Channel channel = this.channels.get(str);
        if (channel != null) {
            if (channel.isActive()) {
                return channel;
            }
            for (int i = 0; i < 1000; i++) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
                if (channel.isActive()) {
                    return channel;
                }
            }
            try {
                logger.warn("channel " + channel + " is not active after long wait, close it.");
                Object obj = this.channelLocks.get(str);
                if (obj == null) {
                    this.channelLocks.putIfAbsent(str, new Object());
                    obj = this.channelLocks.get(str);
                }
                synchronized (obj) {
                    Channel channel2 = this.channels.get(str);
                    if (channel2 != null && channel2.compareTo(channel) == 0) {
                        this.channels.remove(str);
                        channel.disconnect();
                        channel.close();
                    }
                }
            } catch (Exception e2) {
            }
        }
        String dbKeysFromSet = TxcAtomDataSourceHelper.getDbKeysFromSet();
        if (dbKeysFromSet == null || dbKeysFromSet.length() == 0) {
            throw new TxcException(ResultCode.SYSTEMERROR.getValue(), "can not register RM because no dbkey found.");
        }
        logger.info("connect to " + str);
        Object obj2 = this.channelLocks.get(str);
        if (obj2 == null) {
            this.channelLocks.putIfAbsent(str, new Object());
            obj2 = this.channelLocks.get(str);
        }
        synchronized (obj2) {
            _connect = super._connect(str);
            try {
                logger.info("RM will register dbkey:" + dbKeysFromSet);
                RegisterRmMessage registerRmMessage = new RegisterRmMessage(dbKeysFromSet);
                registerRmMessage.setType((short) CommitMode.COMMIT_RETRY_MODE.getValue());
                Object invoke = super.invoke(null, _connect, registerRmMessage, 30000L);
                if (!(invoke instanceof RegisterRmResultMessage)) {
                    throw new TxcException(ResultCode.SYSTEMERROR.getValue(), "can not register RM.");
                }
                this.channels.put(str, _connect);
                if (((RegisterRmResultMessage) invoke).isResult()) {
                    logger.info("register RM sucesss. server version:" + ((RegisterRmResultMessage) invoke).getVersion());
                } else {
                    logger.info("register RM failed. server version:" + ((RegisterRmResultMessage) invoke).getVersion());
                }
            } catch (Exception e3) {
                logger.error(TxcErrCode.RegistRM.errCode, "register RM failed.", e3);
                throw new TxcException(ResultCode.SYSTEMERROR.getValue(), "can not register RM.");
            }
        }
        return _connect;
    }

    @Override // com.taobao.txc.rpc.impl.RpcClient, com.taobao.txc.rpc.api.TxcClientMessageSender
    public void sendResponse(long j, String str, Object obj) {
        logger.info("RmRpcClient sendResponse" + obj);
        super.sendResponse(j, connect(str), obj);
    }
}
