シンプルRPCフレームワーク-Consulベースのサービス登録と発見

6831 ワード

一般的に一般的なRPCフレームワークには、次の3つの部分が含まれています.
  • 登録センタ、サービス側登録リモートサービス及びクライアント発見サービス
  • サービス側は、対外的にバックグラウンドサービスを提供し、自分のサービス情報を登録センター
  • に登録する.
  • クライアントは、登録センターからリモートサービスの登録情報を取得し、リモートプロセス呼び出しを行う上で述べた登録センターは、実際にはサービスガバナンスに属し、登録センターがなくてもRPCの機能は完全である.以前はzookeeperベースの登録センターに接触していましたが、ここではconsulベースで登録センターの基本的な機能を実現しています.

  • Consulの特徴:
  • RaftはPaxosより直接
  • これ以上説明しないでraftを研究していません
  • はデータセンターをサポートし、単一の障害などの問題を解決するために使用できる
  • 統合はzookeeperよりも簡単(コード量が少なく、論理が明確で簡単)
  • 健康診断、httpおよびtcp
  • をサポート
  • はUI管理機能を備えており、追加のサードパーティサポートは必要ありません.(zookeeperはzkuiなどのサードパーティツールを個別に導入する必要があります)
  • key/valueストレージ
  • をサポート
    consulを起動してから管理ページにアクセス
    RPC統合
    サービス登録とサービス発見の2つのインタフェースを抽出し、consul実装を使用します.ここでは主にconsul-clientによって実装されます(consul-apiでも構いません).pomに導入する必要があります.
    
    	com.orbitz.consul
    	consul-client
    	0.14.1
    

    サービス登録
  • RegistryService提供サービスの登録と削除機能
  • public interface RegistryService {    void register(RpcURL url);    void unregister(RpcURL url);
    }
  • AbstractConsulServiceconsulのベースクラスで、Conslオブジェクトを構築し、サービス側およびクライアントにサービスを提供します.
  • public class AbstractConsulService {    private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);    protected final static String CONSUL_NAME="consul_node_jim";    protected final static String CONSUL_ID="consul_node_id";    protected final static String CONSUL_TAGS="v3";    protected final static String CONSUL_HEALTH_INTERVAL="1s";    protected Consul buildConsul(String registryHost, int registryPort){        return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
        }
    }
  • ConsulRegistryServiceサービス登録実装クラスは、サービス登録とともに健康診断を指定しています.

  • サービスの削除は一時的に実現されませんでした
    public class ConsulRegistryService extends AbstractConsulService implements RegistryService {    private final static int CONSUL_CONNECT_PERIOD=1*1000;    @Override
        public void register(RpcURL url) {        Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());        AgentClient agent = consul.agentClient();        ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();        ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
            builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);
    
            agent.register(builder.build());
    
        }    @Override
        public void unregister(RpcURL url) {
    
        }
    
    }

    私が実現したRPCはTCPベースなので、サービス登録の健康診断もTCPと指定し、consulは指定したIPとポートで接続を確立してサービスの健康状態を判断します.httpの場合は、httpメソッドを呼び出し、健康診断アドレスを指定する必要があります.
    ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();

    バックグラウンドの監視情報は次のとおりです.
    TCPが指定されているだけですが、何らかのメカニズムでバックグラウンドでHTTPの健康診断リクエストが発生する可能性があります.上図の第1のリクエストログです.
    サービス発見
  • DiscoveryServiceは、登録されているすべての有効なサービス情報を取得します.
  • public interface DiscoveryService {    List getUrls(String registryHost, int registryPort);
    }
  • ConsulDiscoveryServiceはまず有効なサービスリストを取得する:
  • List urls= Lists.newArrayList();Consul consul = this.buildConsul(registryHost,registryPort);HealthClient client = consul.healthClient();String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances(name);List serviceHealths=(List)object.getResponse();for(ImmutableServiceHealth serviceHealth:serviceHealths){    RpcURL url=new RpcURL();
        url.setHost(serviceHealth.getService().getAddress());
        url.setPort(serviceHealth.getService().getPort());
        urls.add(url);
    }

    サービス更新リスニングは、利用可能なサービスリストが変更を発見した場合に呼び出し側に通知する必要があります.
    try {    ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
        serviceHealthCache.addListener(new ConsulCache.Listener() {        @Override
            public void notify(Map map) {
                logger.info("serviceHealthCache.addListener notify");            RpcClientInvokerCache.clear();
    
            }
        });
        serviceHealthCache.start();
    } catch (Exception e) {
        logger.info("serviceHealthCache.start error:",e);
    }

    以前はクライアントのInvokerがキャッシュされていたため,サービスリストが変化した場合にキャッシュ情報を更新する必要があった.
    ここでは簡単にキャッシュを直接クリア処理しますが、実は良い方法は変化のある処理だけを行うべきです.
  • RpcClientInvokerCacheクライアントインスタンス化後のInvokerのキャッシュクラス
  • public class RpcClientInvokerCache {    private static CopyOnWriteArrayList connectedHandlers = new CopyOnWriteArrayList<>();    public static CopyOnWriteArrayList getConnectedHandlersClone(){        return (CopyOnWriteArrayList) RpcClientInvokerCache.getConnectedHandlers().clone();
        }    public static void addHandler(RpcClientInvoker handler) {        CopyOnWriteArrayList newHandlers = getConnectedHandlersClone();
            newHandlers.add(handler);
            connectedHandlers=newHandlers;
        }    public static CopyOnWriteArrayList getConnectedHandlers(){        return connectedHandlers;
        }    public static RpcClientInvoker get(int i){        return connectedHandlers.get(i);
        }    public static int size(){        return connectedHandlers.size();
        }    public static void clear(){        CopyOnWriteArrayList newHandlers = getConnectedHandlersClone();
            newHandlers.clear();
            connectedHandlers=newHandlers;
        }
    }
  • 負荷等化同じインタフェースに複数のサービスが同時にサービスを提供する場合、クライアントはクライアントの要求をどのサーバに割り当てるかを決定するために一定の負荷等化メカニズムを必要とし、ここでは簡単なポーリング実装方式を実現する.要求回数を加算し,加算した値とサービスリストのサイズを型取り操作する.

  • コードの中でサービスリストを取る方法は小さい問題があって、インタフェースの情報によって取っていないで、後で完成します
    public class RoundRobinLoadbalanceService implements LoadbalanceService {    private AtomicInteger roundRobin = new AtomicInteger(0);    private static final int MAX_VALUE=1000;    private static final int MIN_VALUE=1;    private AtomicInteger getRoundRobinValue(){        if(this.roundRobin.getAndAdd(1)>MAX_VALUE){            this.roundRobin.set(MIN_VALUE);
            }        return this.roundRobin;
        }    @Override
        public int index(int size) {        return  (this.getRoundRobinValue().get() + size) % size;
        }
    }