シンプルRPCフレームワーク-Consulベースのサービス登録と発見
6831 ワード
一般的に一般的なRPCフレームワークには、次の3つの部分が含まれています.登録センタ、サービス側登録リモートサービス及びクライアント発見サービス 用サービス側は、対外的にバックグラウンドサービスを提供し、自分のサービス情報を登録センター に登録する.クライアントは、登録センターからリモートサービスの登録情報を取得し、リモートプロセス呼び出しを行う上で述べた登録センターは、実際にはサービスガバナンスに属し、登録センターがなくてもRPCの機能は完全である.以前はzookeeperベースの登録センターに接触していましたが、ここではconsulベースで登録センターの基本的な機能を実現しています.
Consulの特徴: RaftはPaxosより直接 これ以上説明しないでraftを研究していませんはデータセンターをサポートし、単一の障害などの問題を解決するために使用できる 統合はzookeeperよりも簡単(コード量が少なく、論理が明確で簡単) 健康診断、httpおよびtcp をサポートはUI管理機能を備えており、追加のサードパーティサポートは必要ありません.(zookeeperはzkuiなどのサードパーティツールを個別に導入する必要があります) key/valueストレージ をサポート
consulを起動してから管理ページにアクセス
RPC統合
サービス登録とサービス発見の2つのインタフェースを抽出し、consul実装を使用します.ここでは主にconsul-clientによって実装されます(consul-apiでも構いません).pomに導入する必要があります.
サービス登録 RegistryService提供サービスの登録と削除機能 AbstractConsulServiceconsulのベースクラスで、Conslオブジェクトを構築し、サービス側およびクライアントにサービスを提供します. ConsulRegistryServiceサービス登録実装クラスは、サービス登録とともに健康診断を指定しています.
サービスの削除は一時的に実現されませんでした
私が実現したRPCはTCPベースなので、サービス登録の健康診断もTCPと指定し、consulは指定したIPとポートで接続を確立してサービスの健康状態を判断します.httpの場合は、httpメソッドを呼び出し、健康診断アドレスを指定する必要があります.
バックグラウンドの監視情報は次のとおりです.
TCPが指定されているだけですが、何らかのメカニズムでバックグラウンドでHTTPの健康診断リクエストが発生する可能性があります.上図の第1のリクエストログです.
サービス発見 DiscoveryServiceは、登録されているすべての有効なサービス情報を取得します. ConsulDiscoveryServiceはまず有効なサービスリストを取得する:
サービス更新リスニングは、利用可能なサービスリストが変更を発見した場合に呼び出し側に通知する必要があります.
以前はクライアントのInvokerがキャッシュされていたため,サービスリストが変化した場合にキャッシュ情報を更新する必要があった.
ここでは簡単にキャッシュを直接クリア処理しますが、実は良い方法は変化のある処理だけを行うべきです. RpcClientInvokerCacheクライアントインスタンス化後のInvokerのキャッシュクラス 負荷等化同じインタフェースに複数のサービスが同時にサービスを提供する場合、クライアントはクライアントの要求をどのサーバに割り当てるかを決定するために一定の負荷等化メカニズムを必要とし、ここでは簡単なポーリング実装方式を実現する.要求回数を加算し,加算した値とサービスリストのサイズを型取り操作する.
コードの中でサービスリストを取る方法は小さい問題があって、インタフェースの情報によって取っていないで、後で完成します
Consulの特徴:
consulを起動してから管理ページにアクセス
RPC統合
サービス登録とサービス発見の2つのインタフェースを抽出し、consul実装を使用します.ここでは主にconsul-clientによって実装されます(consul-apiでも構いません).pomに導入する必要があります.
com.orbitz.consul
consul-client
0.14.1
サービス登録
public interface RegistryService { void register(RpcURL url); void unregister(RpcURL url);
}
public class AbstractConsulService { private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class); protected final static String CONSUL_NAME="consul_node_jim"; protected final static String CONSUL_ID="consul_node_id"; protected final static String CONSUL_TAGS="v3"; protected final static String CONSUL_HEALTH_INTERVAL="1s"; protected Consul buildConsul(String registryHost, int registryPort){ return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
}
}
サービスの削除は一時的に実現されませんでした
public class ConsulRegistryService extends AbstractConsulService implements RegistryService { private final static int CONSUL_CONNECT_PERIOD=1*1000; @Override
public void register(RpcURL url) { Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort()); AgentClient agent = consul.agentClient(); ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build(); ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);
agent.register(builder.build());
} @Override
public void unregister(RpcURL url) {
}
}
私が実現したRPCはTCPベースなので、サービス登録の健康診断もTCPと指定し、consulは指定したIPとポートで接続を確立してサービスの健康状態を判断します.httpの場合は、httpメソッドを呼び出し、健康診断アドレスを指定する必要があります.
ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
バックグラウンドの監視情報は次のとおりです.
TCPが指定されているだけですが、何らかのメカニズムでバックグラウンドでHTTPの健康診断リクエストが発生する可能性があります.上図の第1のリクエストログです.
サービス発見
public interface DiscoveryService { List getUrls(String registryHost, int registryPort);
}
List urls= Lists.newArrayList();Consul consul = this.buildConsul(registryHost,registryPort);HealthClient client = consul.healthClient();String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances(name);List serviceHealths=(List)object.getResponse();for(ImmutableServiceHealth serviceHealth:serviceHealths){ RpcURL url=new RpcURL();
url.setHost(serviceHealth.getService().getAddress());
url.setPort(serviceHealth.getService().getPort());
urls.add(url);
}
サービス更新リスニングは、利用可能なサービスリストが変更を発見した場合に呼び出し側に通知する必要があります.
try { ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
serviceHealthCache.addListener(new ConsulCache.Listener() { @Override
public void notify(Map map) {
logger.info("serviceHealthCache.addListener notify"); RpcClientInvokerCache.clear();
}
});
serviceHealthCache.start();
} catch (Exception e) {
logger.info("serviceHealthCache.start error:",e);
}
以前はクライアントのInvokerがキャッシュされていたため,サービスリストが変化した場合にキャッシュ情報を更新する必要があった.
ここでは簡単にキャッシュを直接クリア処理しますが、実は良い方法は変化のある処理だけを行うべきです.
public class RpcClientInvokerCache { private static CopyOnWriteArrayList connectedHandlers = new CopyOnWriteArrayList<>(); public static CopyOnWriteArrayList getConnectedHandlersClone(){ return (CopyOnWriteArrayList) RpcClientInvokerCache.getConnectedHandlers().clone();
} public static void addHandler(RpcClientInvoker handler) { CopyOnWriteArrayList newHandlers = getConnectedHandlersClone();
newHandlers.add(handler);
connectedHandlers=newHandlers;
} public static CopyOnWriteArrayList getConnectedHandlers(){ return connectedHandlers;
} public static RpcClientInvoker get(int i){ return connectedHandlers.get(i);
} public static int size(){ return connectedHandlers.size();
} public static void clear(){ CopyOnWriteArrayList newHandlers = getConnectedHandlersClone();
newHandlers.clear();
connectedHandlers=newHandlers;
}
}
コードの中でサービスリストを取る方法は小さい問題があって、インタフェースの情報によって取っていないで、後で完成します
public class RoundRobinLoadbalanceService implements LoadbalanceService { private AtomicInteger roundRobin = new AtomicInteger(0); private static final int MAX_VALUE=1000; private static final int MIN_VALUE=1; private AtomicInteger getRoundRobinValue(){ if(this.roundRobin.getAndAdd(1)>MAX_VALUE){ this.roundRobin.set(MIN_VALUE);
} return this.roundRobin;
} @Override
public int index(int size) { return (this.getRoundRobinValue().get() + size) % size;
}
}