/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.*;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.SolrException;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.*;
/**
* LBHttpSolrServer or "LoadBalanced HttpSolrServer" is a load balancing wrapper around
* {@link org.apache.solr.client.solrj.impl.HttpSolrServer}. This is useful when you
* have multiple SolrServers and the requests need to be Load Balanced among them.
*
* Do <b>NOT</b> use this class for indexing in master/slave scenarios since documents must be sent to the
* correct master; no inter-node routing is done.
*
* In SolrCloud (leader/replica) scenarios, this class may be used for updates since updates will be forwarded
* to the appropriate leader.
*
* Also see the <a href="http://wiki.apache.org/solr/LBHttpSolrServer">wiki</a> page.
*
* <p/>
* It offers automatic failover when a server goes down and it detects when the server comes back up.
* <p/>
* Load balancing is done using a simple round-robin on the list of servers.
* <p/>
* If a request to a server fails by an IOException due to a connection timeout or read timeout then the host is taken
* off the list of live servers and moved to a 'dead server list' and the request is resent to the next live server.
* This process is continued till it tries all the live servers. If at least one server is alive, the request succeeds,
* and if not it fails.
* <blockquote><pre>
* SolrServer lbHttpSolrServer = new LBHttpSolrServer("http://host1:8080/solr/","http://host2:8080/solr","http://host2:8080/solr");
* //or if you wish to pass the HttpClient do as follows
* httpClient httpClient = new HttpClient();
* SolrServer lbHttpSolrServer = new LBHttpSolrServer(httpClient,"http://host1:8080/solr/","http://host2:8080/solr","http://host2:8080/solr");
* </pre></blockquote>
* This detects if a dead server comes alive automatically. The check is done in fixed intervals in a dedicated thread.
* This interval can be set using {@link #setAliveCheckInterval} , the default is set to one minute.
* <p/>
* <b>When to use this?</b><br/> This can be used as a software load balancer when you do not wish to setup an external
* load balancer. Alternatives to this code are to use
* a dedicated hardware load balancer or using Apache httpd with mod_proxy_balancer as a load balancer. See <a
* href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load balancing on Wikipedia</a>
*
* @since solr 1.4
*/
public class LBHttpSolrServer extends SolrServer {
// keys to the maps are currently of the form "http://localhost:8983/solr"
// which should be equivalent to CommonsHttpSolrServer.getBaseURL()
private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<String, ServerWrapper>();
// access to aliveServers should be synchronized on itself
protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<String, ServerWrapper>();
// changes to aliveServers are reflected in this array, no need to synchronize
private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
private ScheduledExecutorService aliveCheckExecutor;
private final HttpClient httpClient;
private final boolean clientIsInternal;
private final AtomicInteger counter = new AtomicInteger(-1);
private static final SolrQuery solrQuery = new SolrQuery("*:*");
private volatile ResponseParser parser;
private volatile RequestWriter requestWriter;
private Set<String> queryParams;
static {
solrQuery.setRows(0);
}
protected static class ServerWrapper {
final HttpSolrServer solrServer;
long lastUsed; // last time used for a real request
long lastChecked; // last time checked for liveness
// "standard" servers are used by default. They normally live in the alive list
// and move to the zombie list when unavailable. When they become available again,
// they move back to the alive list.
boolean standard = true;
int failedPings = 0;
public ServerWrapper(HttpSolrServer solrServer) {
this.solrServer = solrServer;
}
@Override
public String toString() {
return solrServer.getBaseURL();
}
public String getKey() {
return solrServer.getBaseURL();
}
@Override
public int hashCode() {
return this.getKey().hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof ServerWrapper)) return false;
return this.getKey().equals(((ServerWrapper)obj).getKey());
}
}
public static class Req {
protected SolrRequest request;
protected List<String> servers;
protected int numDeadServersToTry;
public Req(SolrRequest request, List<String> servers) {
this.request = request;
this.servers = servers;
this.numDeadServersToTry = servers.size();
}
public SolrRequest getRequest() {
return request;
}
public List<String> getServers() {
return servers;
}
/** @return the number of dead servers to try if there are no live servers left */
public int getNumDeadServersToTry() {
return numDeadServersToTry;
}
/** @param numDeadServersToTry The number of dead servers to try if there are no live servers left.
* Defaults to the number of servers in this request. */
public void setNumDeadServersToTry(int numDeadServersToTry) {
this.numDeadServersToTry = numDeadServersToTry;
}
}
public static class Rsp {
protected String server;
protected NamedList<Object> rsp;
/** The response from the server */
public NamedList<Object> getResponse() {
return rsp;
}
/** The server that returned the response */
public String getServer() {
return server;
}
}
public LBHttpSolrServer(String... solrServerUrls) throws MalformedURLException {
this(null, solrServerUrls);
}
/** The provided httpClient should use a multi-threaded connection manager */
public LBHttpSolrServer(HttpClient httpClient, String... solrServerUrl)
throws MalformedURLException {
this(httpClient, new BinaryResponseParser(), solrServerUrl);
}
/** The provided httpClient should use a multi-threaded connection manager */
public LBHttpSolrServer(HttpClient httpClient, ResponseParser parser, String... solrServerUrl)
throws MalformedURLException {
clientIsInternal = (httpClient == null);
this.parser = parser;
if (httpClient == null) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_USE_RETRY, false);
this.httpClient = HttpClientUtil.createClient(params);
} else {
this.httpClient = httpClient;
}
for (String s : solrServerUrl) {
ServerWrapper wrapper = new ServerWrapper(makeServer(s));
aliveServers.put(wrapper.getKey(), wrapper);
}
updateAliveList();
}
public Set<String> getQueryParams() {
return queryParams;
}
/**
* Expert Method.
* @param queryParams set of param keys to only send via the query string
*/
public void setQueryParams(Set<String> queryParams) {
this.queryParams = queryParams;
}
public static String normalize(String server) {
if (server.endsWith("/"))
server = server.substring(0, server.length() - 1);
return server;
}
protected HttpSolrServer makeServer(String server) throws MalformedURLException {
HttpSolrServer s = new HttpSolrServer(server, httpClient, parser);
if (requestWriter != null) {
s.setRequestWriter(requestWriter);
}
if (queryParams != null) {
s.setQueryParams(queryParams);
}
return s;
}
/**
* Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
* If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
* time, or until a test request on that server succeeds.
*
* Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
* If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
* Req.getNumDeadServersToTry() controls how many dead servers will be tried.
*
* If no live servers are found a SolrServerException is thrown.
*
* @param req contains both the request as well as the list of servers to query
*
* @return the result of the request
*
* @throws IOException If there is a low-level I/O error.
*/
public Rsp request(Req req) throws SolrServerException, IOException {
Rsp rsp = new Rsp();
Exception ex = null;
List<ServerWrapper> skipped = new ArrayList<ServerWrapper>(req.getNumDeadServersToTry());
for (String serverStr : req.getServers()) {
serverStr = normalize(serverStr);
// if the server is currently a zombie, just skip to the next one
ServerWrapper wrapper = zombieServers.get(serverStr);
if (wrapper != null) {
// System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
if (skipped.size() < req.getNumDeadServersToTry())
skipped.add(wrapper);
continue;
}
rsp.server = serverStr;
HttpSolrServer server = makeServer(serverStr);
try {
rsp.rsp = server.request(req.getRequest());
return rsp; // SUCCESS
} catch (SolrException e) {
// we retry on 404 or 403 or 503 - you can see this on solr shutdown
if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
ex = addZombie(server, e);
} else {
// Server is alive but the request was likely malformed or invalid
throw e;
}
// TODO: consider using below above - currently does cause a problem with distrib updates:
// seems to match up against a failed forward to leader exception as well...
// || e.getMessage().contains("java.net.SocketException")
// || e.getMessage().contains("java.net.ConnectException")
} catch (SocketException e) {
ex = addZombie(server, e);
} catch (SocketTimeoutException e) {
ex = addZombie(server, e);
} catch (SolrServerException e) {
Throwable rootCause = e.getRootCause();
if (rootCause instanceof IOException) {
ex = addZombie(server, e);
} else {
throw e;
}
} catch (Exception e) {
throw new SolrServerException(e);
}
}
// try the servers we previously skipped
for (ServerWrapper wrapper : skipped) {
try {
rsp.rsp = wrapper.solrServer.request(req.getRequest());
zombieServers.remove(wrapper.getKey());
return rsp; // SUCCESS
} catch (SolrException e) {
// we retry on 404 or 403 or 503 - you can see this on solr shutdown
if (e.code() == 404 || e.code() == 403 || e.code() == 503 || e.code() == 500) {
ex = e;
// already a zombie, no need to re-add
} else {
// Server is alive but the request was malformed or invalid
zombieServers.remove(wrapper.getKey());
throw e;
}
} catch (SocketException e) {
ex = e;
} catch (SocketTimeoutException e) {
ex = e;
} catch (SolrServerException e) {
Throwable rootCause = e.getRootCause();
if (rootCause instanceof IOException) {
ex = e;
// already a zombie, no need to re-add
} else {
throw e;
}
} catch (Exception e) {
throw new SolrServerException(e);
}
}
if (ex == null) {
throw new SolrServerException("No live SolrServers available to handle this request");
} else {
throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);
}
}
protected Exception addZombie(HttpSolrServer server, Exception e) {
ServerWrapper wrapper;
wrapper = new ServerWrapper(server);
wrapper.lastUsed = System.currentTimeMillis();
wrapper.standard = false;
zombieServers.put(wrapper.getKey(), wrapper);
startAliveCheckExecutor();
return e;
}
private void updateAliveList() {
synchronized (aliveServers) {
aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]);
}
}
private ServerWrapper removeFromAlive(String key) {
synchronized (aliveServers) {
ServerWrapper wrapper = aliveServers.remove(key);
if (wrapper != null)
updateAliveList();
return wrapper;
}
}
private void addToAlive(ServerWrapper wrapper) {
synchronized (aliveServers) {
ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper);
// TODO: warn if there was a previous entry?
updateAliveList();
}
}
public void addSolrServer(String server) throws MalformedURLException {
HttpSolrServer solrServer = makeServer(server);
addToAlive(new ServerWrapper(solrServer));
}
public String removeSolrServer(String server) {
try {
server = new URL(server).toExternalForm();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
if (server.endsWith("/")) {
server = server.substring(0, server.length() - 1);
}
// there is a small race condition here - if the server is in the process of being moved between
// lists, we could fail to remove it.
removeFromAlive(server);
zombieServers.remove(server);
return null;
}
public void setConnectionTimeout(int timeout) {
HttpClientUtil.setConnectionTimeout(httpClient, timeout);
}
/**
* set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably
* not for indexing.
*/
public void setSoTimeout(int timeout) {
HttpClientUtil.setSoTimeout(httpClient, timeout);
}
@Override
public void shutdown() {
if (aliveCheckExecutor != null) {
aliveCheckExecutor.shutdownNow();
}
if(clientIsInternal) {
httpClient.getConnectionManager().shutdown();
}
}
/**
* Tries to query a live server. A SolrServerException is thrown if all servers are dead.
* If the request failed due to IOException then the live server is moved to dead pool and the request is
* retried on another live server. After live servers are exhausted, any servers previously marked as dead
* will be tried before failing the request.
*
* @param request the SolrRequest.
*
* @return response
*
* @throws IOException If there is a low-level I/O error.
*/
@Override
public NamedList<Object> request(final SolrRequest request)
throws SolrServerException, IOException {
Exception ex = null;
ServerWrapper[] serverList = aliveServerList;
int maxTries = serverList.length;
Map<String,ServerWrapper> justFailed = null;
for (int attempts=0; attempts<maxTries; attempts++) {
int count = counter.incrementAndGet();
ServerWrapper wrapper = serverList[count % serverList.length];
wrapper.lastUsed = System.currentTimeMillis();
try {
return wrapper.solrServer.request(request);
} catch (SolrException e) {
// Server is alive but the request was malformed or invalid
throw e;
} catch (SolrServerException e) {
if (e.getRootCause() instanceof IOException) {
ex = e;
moveAliveToDead(wrapper);
if (justFailed == null) justFailed = new HashMap<String,ServerWrapper>();
justFailed.put(wrapper.getKey(), wrapper);
} else {
throw e;
}
} catch (Exception e) {
throw new SolrServerException(e);
}
}
// try other standard servers that we didn't try just now
for (ServerWrapper wrapper : zombieServers.values()) {
if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue;
try {
NamedList<Object> rsp = wrapper.solrServer.request(request);
// remove from zombie list *before* adding to alive to avoid a race that could lose a server
zombieServers.remove(wrapper.getKey());
addToAlive(wrapper);
return rsp;
} catch (SolrException e) {
// Server is alive but the request was malformed or invalid
throw e;
} catch (SolrServerException e) {
if (e.getRootCause() instanceof IOException) {
ex = e;
// still dead
} else {
throw e;
}
} catch (Exception e) {
throw new SolrServerException(e);
}
}
if (ex == null) {
throw new SolrServerException("No live SolrServers available to handle this request");
} else {
throw new SolrServerException("No live SolrServers available to handle this request", ex);
}
}
/**
* Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
* aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
*
* @param zombieServer a server in the dead pool
*/
private void checkAZombieServer(ServerWrapper zombieServer) {
long currTime = System.currentTimeMillis();
try {
zombieServer.lastChecked = currTime;
QueryResponse resp = zombieServer.solrServer.query(solrQuery);
if (resp.getStatus() == 0) {
// server has come back up.
// make sure to remove from zombies before adding to alive to avoid a race condition
// where another thread could mark it down, move it back to zombie, and then we delete
// from zombie and lose it forever.
ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey());
if (wrapper != null) {
wrapper.failedPings = 0;
if (wrapper.standard) {
addToAlive(wrapper);
}
} else {
// something else already moved the server from zombie to alive
}
}
} catch (Exception e) {
//Expected. The server is still down.
zombieServer.failedPings++;
// If the server doesn't belong in the standard set belonging to this load balancer
// then simply drop it after a certain number of failed pings.
if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
zombieServers.remove(zombieServer.getKey());
}
}
}
private void moveAliveToDead(ServerWrapper wrapper) {
wrapper = removeFromAlive(wrapper.getKey());
if (wrapper == null)
return; // another thread already detected the failure and removed it
zombieServers.put(wrapper.getKey(), wrapper);
startAliveCheckExecutor();
}
private int interval = CHECK_INTERVAL;
/**
* LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that
* interval
*
* @param interval time in milliseconds
*/
public void setAliveCheckInterval(int interval) {
if (interval <= 0) {
throw new IllegalArgumentException("Alive check interval must be " +
"positive, specified value = " + interval);
}
this.interval = interval;
}
private void startAliveCheckExecutor() {
// double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
// if it's not null.
if (aliveCheckExecutor == null) {
synchronized (this) {
if (aliveCheckExecutor == null) {
aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(
new SolrjNamedThreadFactory("aliveCheckExecutor"));
aliveCheckExecutor.scheduleAtFixedRate(
getAliveCheckRunner(new WeakReference<LBHttpSolrServer>(this)),
this.interval, this.interval, TimeUnit.MILLISECONDS);
}
}
}
}
private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrServer> lbRef) {
return new Runnable() {
@Override
public void run() {
LBHttpSolrServer lb = lbRef.get();
if (lb != null && lb.zombieServers != null) {
for (ServerWrapper zombieServer : lb.zombieServers.values()) {
lb.checkAZombieServer(zombieServer);
}
}
}
};
}
public HttpClient getHttpClient() {
return httpClient;
}
public ResponseParser getParser() {
return parser;
}
public void setParser(ResponseParser parser) {
this.parser = parser;
}
public void setRequestWriter(RequestWriter requestWriter) {
this.requestWriter = requestWriter;
}
public RequestWriter getRequestWriter() {
return requestWriter;
}
@Override
protected void finalize() throws Throwable {
try {
if(this.aliveCheckExecutor!=null)
this.aliveCheckExecutor.shutdownNow();
} finally {
super.finalize();
}
}
// defaults
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
}