dubboソース(一)--サービス登録

42009 ワード

要旨:ほとんどのインターネット会社はdubboをマイクロサービスアーキテクチャの中間部品として選択しており、このブログは1回の故障からdubboソース分析に拡張され、他の人の参考にしています.
あるサービス移行では、サービスをサーバクラスタAからサーバクラスタBに移行する必要があり、サービスのスムーズな移行を保証するために、サーバクラスタAとBが同時に存在する時期があり、私のサービスproviderはクラスタAとBに同時にサービスを登録し、サービスを提供する.次のように構成されています.
    <dubbo:registry address="A ip address" />
    <dubbo:registry address="B ip address"/>

ある日、クラスタAが期限切れになり、完全に使用できなくなりました.このときproviderの登録に失敗し、大量のwarnログが発生しました.翌日、日常的に発表されたとき、dubboサービスの発表に失敗し、サービスをすべて停止しました.
ログを見ると、dubboサービスはサービス提供時にクラスタAにサービスを登録し続け、クラスタAが使用できないため、登録に失敗したことがわかります.その後dubbo構成のAのregistryを外すと問題が解決します.
しかし,深層原因を分析するためにdubbo登録サービスに関するソースコードを分析した.
従来の使用ではdubboはspring beanとspring contextモジュールと組み合わせて使用されることが多いが、以下は最も簡単なdubboとspringの結合の例である.
public class Application {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-provider.xml");
        context.start();
        System.in.read();
    }
}

providerのxmlプロファイルは次のとおりです.
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    
    <dubbo:application name="demo-provider"/>

    <dubbo:registry address="multicast://224.5.6.7:1234" />

    
    <dubbo:protocol name="dubbo"/>

    
    <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>

    
    <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>

beans>

その中のdubbo:serviceラベルはspringによってServiceBeanオブジェクトとして登録され、その原理はspringがカスタムラベルを作成する能力を提供し、dubboのxsdファイルがラベルの解析を実現し、beanを生成することである.
ServiceBeanソースを見てみましょう
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware {

    // ..           
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
        SpringExtensionFactory.addApplicationContext(applicationContext);
        supportedApplicationListener = addApplicationListener(applicationContext, this);
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (!isExported() && !isUnexported()) {
            //  spring       bean ,     。         ,   export  
            export();
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //  service bean          ,       、     dubbo  , bean           。
    }


    @Override
    public void export() {
        super.export();//     ServiceConfig export  
        // Publish ServiceBeanExportedEvent
        publishExportEvent();
    }

    /**
     * @since 2.6.5
     */
    private void publishExportEvent() {
        ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this);
        applicationEventPublisher.publishEvent(exportEvent);
    }

}

EXportメソッドでServiceConfigのexportメソッドに入りました
class ServiceConfig {
 public synchronized void export() {
        checkAndUpdateSubConfigs();

        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }
		//               ,   delay   ,    
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }
}

DoExportメソッドへのアクセス->doExportUrlsメソッドへのアクセス
private void doExportUrls() {
		//            
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        
        // export service
        String contextPath = protocolConfig.getContextpath();
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, (StringUtils.isEmpty(contextPath) ? "" : contextPath + "/") + path, map);

        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(Constants.PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
                        }

                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
						//            
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
                /**
                 * @since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if ((metadataReportService = getMetadataReportService()) != null) {
                    metadataReportService.publishProvider(url);
                }
            }
        }
        this.urls.add(url);
    }

Invoker> invoker = proxyFactory.getInvoker; Exporter> exporter = protocol.export(wrapperInvoker); この2つは最も核心的なコードですInvokerはサービスproviderが外にサービスを提供するエージェントクラスで、一般的にJavassistProxyFactoryによって実現され、JavassistProxyFactoryは簡単です.
 @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

彼はまずあなたのクラスをWrapperクラスで包装し、wrapperクラスのinvokeMethodメソッドを呼び出します.Wrapperパッケージのプロセスも簡単で、反射によってあなたのクラスの属性、方法、これらに基づいて、そっくりの方法属性を生成し、簡単に言えば、Wrapperクラスはあなたのクラスのサブクラスだと考えることができます.次に、Exporter>exporter=protocolです.export(wrapperInvoker);ここでは、dubboサービスを使用している場合は、DubboProtocolにアクセスするexportメソッドの後ろに、登録センターに接続し、メッセージを購読する操作があり、サービス登録はここで基本的に終了します.
本明細書の先頭に戻ると、2つの登録センターがあり、1つの登録センターが使用できない場合、サービス登録時に何が起こりますか?dubboの公式バージョンでは、1つの登録センターが3秒以内に接続できない場合、プログラム全体が停止し、すべての登録センターの接続を閉じます.
[24/09/19 08:24:42:628 CST] main-SendThread(172.x.x.x:x)  INFO zookeeper.ClientCnxn: Opening socket connection to server 172.x.x.x/172.x.x.x:x. Will not attempt to authenticate using SASL (unknown error)
Exception in thread "main" java.lang.IllegalStateException: Failed to connect to config center (zookeeper): 172.x.x.x:x in 30000ms.
	at org.apache.dubbo.configcenter.support.zookeeper.ZookeeperDynamicConfiguration.<init>(ZookeeperDynamicConfiguration.java:75)
	at org.apache.dubbo.configcenter.support.zookeeper.ZookeeperDynamicConfigurationFactory.createDynamicConfiguration(ZookeeperDynamicConfigurationFa

しかし、私たちが使用しているバージョンではdubboが再試行され続け、正常な登録センターに登録されている接続はオフになりません.zkに登録する接続は再試行されます.zkに正常に登録されているサービスは、サービスステータスをチェックするためのサービスであるため、サービスの正確性チェック時にdubboサービスの起動に失敗することは発見されなかった.