TomcatのJio Ented Pointは、HTTP要求を処理します。
もっと読む
TomcatのConnector(Protocol、CoyoteAdapter Adapter、ArEndPoint)初期化と要求処理プロセス:
http://donald-draper.iteye.com/blog/2330139
前の記事ではConnector(Protocol、CoyoteAdapter Adapter、ArEndPoint)初期化と要求処理過程を話しています。
その中のEntPointはArEntdPointです。この記事について、Jio Ent Pointについてお話しします。
また来ます
//スレッド実行器の作成
TomcatのConnector(Protocol、CoyoteAdapter Adapter、ArEndPoint)初期化と要求処理プロセス:
http://donald-draper.iteye.com/blog/2330139
前の記事ではConnector(Protocol、CoyoteAdapter Adapter、ArEndPoint)初期化と要求処理過程を話しています。
その中のEntPointはArEntdPointです。この記事について、Jio Ent Pointについてお話しします。
public class Http11Protocol extends AbstractHttp11JsseProtocol {
protected Http11ConnectionHandler cHandler;
private int disableKeepAlivePercentage = 75;
public Http11Protocol() {
// EndPoint JIoEndpoint
endpoint = new JIoEndpoint();
cHandler = new Http11ConnectionHandler(this);
// endpoint handler
((JIoEndpoint) endpoint).setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
}
Http 11 Protocolから見れば、EndPointのデフォルトはJIoEnd pointです。/**
* Handle incoming TCP connections.
*
* This class implement a simple server model: one listener thread accepts on a socket and
* creates a new worker thread for each incoming connection.
* More advanced Endpoints will reuse the threads, use queues, etc.
*/
public class JIoEndpoint extends AbstractEndpoint {
/**
* Associated server socket.
*/
protected ServerSocket serverSocket = null;
public JIoEndpoint() {
// Set maxConnections to zero so we can tell if the user has specified
// their own value on the connector when we reach bind()
setMaxConnections(0);
// Reduce the executor timeout for BIO as threads in keep-alive will not
// terminate when the executor interrupts them.
setExecutorTerminationTimeoutMillis(0);
}
// ------------------------------------------------------------- Properties
/**
* Handling of accepted sockets.
*/
// Http11ConnectionHandler
protected Handler handler = null;
/**
* Server socket factory.
*/
protected ServerSocketFactory serverSocketFactory = null;
@Override
// serverSocketFactory serverSocket
public void bind() throws Exception {
// Initialize thread count defaults for acceptor
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;
}
// Initialize maxConnections
if (getMaxConnections() == 0) {
// User hasn't set a value - use the default
setMaxConnections(getMaxThreadsExecutor(true));
}
if (serverSocketFactory == null) {
if (isSSLEnabled()) {
serverSocketFactory =
handler.getSslImplementation().getServerSocketFactory(this);
} else {
// serverSocketFactory
serverSocketFactory = new DefaultServerSocketFactory(this);
}
}
if (serverSocket == null) {
try {
if (getAddress() == null) {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog());
} else {
//serverSocketFactory ip Port serverSocket
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog(), getAddress());
}
} catch (BindException orig) {
String msg;
if (getAddress() == null)
msg = orig.getMessage() + " :" + getPort();
else
msg = orig.getMessage() + " " +
getAddress().toString() + ":" + getPort();
BindException be = new BindException(msg);
be.initCause(orig);
throw be;
}
}
}
}
//Default ServerSocketFactorypublic class DefaultServerSocketFactory implements ServerSocketFactory {
/**
*
* @param endpoint Unused in this implementation.
*/
public DefaultServerSocketFactory(AbstractEndpoint> endpoint) {
}
@Override
public ServerSocket createSocket (int port) throws IOException {
return new ServerSocket (port);
}
@Override
public ServerSocket createSocket (int port, int backlog)
throws IOException {
return new ServerSocket (port, backlog);
}
@Override
//serverSocketFactory ip Port serverSocket
public ServerSocket createSocket (int port, int backlog,
InetAddress ifAddress) throws IOException {
return new ServerSocket (port, backlog, ifAddress);
}
@Override
public Socket acceptSocket(ServerSocket socket) throws IOException {
return socket.accept();
}
@Override
public void handshake(Socket sock) throws IOException {
// NOOP
}
}
bind方法から実際にserverSocketFactoryおよびserverSocketを初期化することが分かる。また来ます
public class JIoEndpoint extends AbstractEndpoint {
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// Create worker collection
if (getExecutor() == null) {
//
createExecutor();
}
//
initializeConnectionLatch();
startAcceptorThreads();
// Start async timeout thread
Thread timeoutThread = new Thread(new AsyncTimeout(),
getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
}
}
まずcreateExectorを見て、この方法はAbstractEntPointの中にあります。//スレッド実行器の作成
public void createExecutor() {
internalExecutor = true;
//
TaskQueue taskqueue = new TaskQueue();
//
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
//
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
//Task Queをチェックする/**
* As task queue specifically designed to run with a thread pool executor.
* The task queue is optimised to properly utilize threads within
* a thread pool executor. If you use a normal queue, the executor will spawn threads
* when there are idle threads and you wont be able to force items unto the queue itself
*
*/
TaskQueue LinkedBlockingQueue
public class TaskQueue extends LinkedBlockingQueue {
private ThreadPoolExecutor parent = null;
// no need to be volatile, the one times when we change and read it occur in
// a single thread (the one that did stop a context and fired listeners)
private Integer forcedRemainingCapacity = null;
@Override
// , Runnable BlockingQueue , BlockingQueue , true, false。
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()
// TaskThreadFactory
/**
* Simple task thread factory to use to create threads for an executor implementation.
*/
public class TaskThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final boolean daemon;
private final int threadPriority;
public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
this.daemon = daemon;
this.threadPriority = priority;
}
@Override
public Thread newThread(Runnable r) {
TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(threadPriority);
return t;
}
}
ThreadPool Exectorを してください。
ThreadPool Exectorはjava.util.co ncurrent.ThreadPool Exectorを き いでいます。public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {}
JIoEndpointのstartInternalに るinitialize Connection Latch
AbstractEndPointではprotected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null;
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections());
}
return connectionLimitLatch;
}
//LimitLatch
/**
* Shared latch that allows the latch to be acquired a limited number of times
* after which all subsequent requests to acquire the latch will be placed in a
* FIFO queue until one of the shares is returned.
*/
public class LimitLatch {
private final Sync sync;
private final AtomicLong count;
private volatile long limit;
private volatile boolean released = false;
/**
* Instantiates a LimitLatch object with an initial limit.
* @param limit - maximum number of concurrent acquisitions of this latch
*/
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
//
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
//
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
JIoEndpointのstartInternalのstartAccent Threadsに る
この はAbstractEntPointにあります。
//Socketを して、 スレッドAccepttorを け けてください。 protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
/**
* Hook to allow Endpoints to provide a specific Acceptor implementation.
*/
//
protected abstract Acceptor createAcceptor();
JIoEndpointのcreateAcceptの を べます。
@Override
protected AbstractEndpoint.Acceptor createAcceptor() {
return new Acceptor();
}
このアクセルは、JIoEndpointの タイプです。
// --------------------------------------------------- Acceptor Inner Class
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
// TCP/IP, processor
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
Socket socket = null;
try {
// Accept the next incoming connection from the server
// socket
// TCP/IP
socket = serverSocketFactory.acceptSocket(serverSocket);
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused && setSocketOptions(socket)) {
// Hand this socket off to an appropriate processor
// SOCKET
if (!processSocket(socket)) {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} else {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
}
state = AcceptorState.ENDED;
}
}
//JIoEndpointのprocessSocketの を する/**
* Process a new connection from a new client. Wraps the socket so
* keep-alive and other attributes can be tracked and then passes the socket
* to the executor for processing.
*
* @param socket The socket associated with the client.
*
* @return true
if the socket is passed to the
* executor, false
if something went wrong or
* if the endpoint is shutting down. Returning
* false
is an indication to close the socket
* immediately.
*/
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
SocketWrapper wrapper = new SocketWrapper(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
wrapper.setSecure(isSSLEnabled());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
// SOCKET
getExecutor().execute(new SocketProcessor(wrapper));
}
return true;
}
//SocketProcessor、JIOEndPoint /**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {
protected SocketWrapper socket = null;
protected SocketStatus status = null;
public SocketProcessor(SocketWrapper socket) {
if (socket==null) throw new NullPointerException();
this.socket = socket;
}
public SocketProcessor(SocketWrapper socket, SocketStatus status) {
this(socket);
this.status = status;
}
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
try {
// SSL handshake
//SSL ,SSL AbstractEndPoint.init
serverSocketFactory.handshake(socket.getSocket());
}
if ((state != SocketState.CLOSED)) {
if (status == null) {
state = handler.process(socket, SocketStatus.OPEN_READ);
} else {
//Socket null, , Socket
state = handler.process(socket,status);
}
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();
try {
socket.getSocket().close();
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.OPEN ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT ||
state == SocketState.UPGRADED){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
} catch (RejectedExecutionException x) {
log.warn("Socket reprocessing request was rejected for:"+socket,x);
try {
//unable to handle connection at this time
handler.process(socket, SocketStatus.DISCONNECT);
} finally {
countDownConnection();
}
}
}
socket = null;
// Finish up this request
}
}
//Socket はnullではなく、クローズ ではなく、Socket を するstate = handler.process(socket,status);
から して、ハンドル
Http 11 Protocolの において
ppublic Http11Protocol() {
// EndPoint JIoEndpoint
endpoint = new JIoEndpoint();
cHandler = new Http11ConnectionHandler(this);
// endpoint handler
((JIoEndpoint) endpoint).setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
なお、HandlerはHttp 11 Connection Handlerである。
Http 11 Connection Handlerを します。
Http 11 Connection HandlerはHttp 11 Protocolの クラスです。// ----------------------------------- Http11ConnectionHandler Inner Class
protected static class Http11ConnectionHandler
extends AbstractConnectionHandler implements Handler {
}
Http 11 Connection Handlerはprocessがないので、この はすでにHandlerにあります。 な です。
AbstractConnection Handlerを て、AbstractProtocolの クラスです。// ------------------------------------------- Connection handler base class
protected abstract static class AbstractConnectionHandler>
implements AbstractEndpoint.Handler {
// Socket, Processor
protected final Map> connections = new ConcurrentHashMap>();
@SuppressWarnings("deprecation")
// Old HTTP upgrade method has been deprecated
// HTTP
public SocketState process(SocketWrapper wrapper,SocketStatus status) {
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
if (socket == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
Processor processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
wrapper.setAsync(false);
ContainerThreadMarker.markAsContainerThread();
try {
if (processor == null) {
processor = recycledProcessors.poll();
}
if (processor == null) {
//
processor = createProcessor();
}
// SSL,
initSsl(wrapper, processor);
SocketState state = SocketState.CLOSED;
do {
if (status == SocketStatus.CLOSE_NOW) {
processor.errorDispatch();
state = SocketState.CLOSED;
} else if (status == SocketStatus.DISCONNECT &&
!processor.isComet()) {
// Do nothing here, just wait for it to get recycled
// Don't do this for Comet we need to generate an end
// event (see BZ 54022)
} else if (processor.isAsync() || state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
if (state == SocketState.OPEN) {
// release() won't get called so in case this request
// takes a long time to process, remove the socket from
// the waiting requests now else the async timeout will
// fire
getProtocol().endpoint.removeWaitingRequest(wrapper);
// There may be pipe-lined data to read. If the data
// isn't processed now, execution will exit this
// loop and call release() which will recycle the
// processor (and input buffer) deleting any
// pipe-lined data. To avoid this, process it now.
state = processor.process(wrapper);
}
} else if (processor.isComet()) {
state = processor.event(status);
} {
state = processor.process(wrapper);
}
}
//
protected abstract P createProcessor();
protected abstract void initSsl(SocketWrapper socket,
Processor processor);
protected abstract void longPoll(SocketWrapper socket,
Processor processor);
protected abstract void release(SocketWrapper socket,
Processor processor, boolean socketClosing,
boolean addToPoller);
}
// Http11ConnectionHandler,createProcessor
@Override
protected Http11AprProcessor createProcessor() {
Http11AprProcessor processor = new Http11AprProcessor(
proto.getMaxHttpHeaderSize(), (AprEndpoint)proto.endpoint,
proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(),
proto.getMaxExtensionSize(), proto.getMaxSwallowSize());
// , connector ,
//adapter = new CoyoteAdapter(this);
//protocolHandler.setAdapter(adapter);
//
processor.setAdapter(proto.adapter);
processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
processor.setConnectionUploadTimeout(
proto.getConnectionUploadTimeout());
processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
processor.setCompressionMinSize(proto.getCompressionMinSize());
processor.setCompression(proto.getCompression());
processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
processor.setSocketBuffer(proto.getSocketBuffer());
processor.setMaxSavePostSize(proto.getMaxSavePostSize());
processor.setServer(proto.getServer());
processor.setClientCertProvider(proto.getClientCertProvider());
register(processor);
return processor;
}
Http 11 A prProcessorを します。public class Http11AprProcessor extends AbstractHttp11Processor {}
プロcess(socket,status)がありません。
AbstractHttp 11 Processorを てください。public abstract class AbstractHttp11Processor extends AbstractProcessor {
/**
* Process pipelined HTTP requests using the specified input and output
* streams.
* @param socketWrapper Socket from which the HTTP requests will be read
* and the HTTP responses will be written.
* @throws IOException error during an I/O operation
*/
// HTTP
@Override
public SocketState process(SocketWrapper socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint);
getOutputBuffer().init(socketWrapper, endpoint);
while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
upgradeInbound == null &&
httpUpgradeHandler == null && !endpoint.isPaused()) {
// Parsing the request header
try {
setRequestLineReadTimeout();
//503
if (endpoint.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
// Currently only NIO will ever return false here
if (!getInputBuffer().parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
setSocketTimeout(connectionUploadTimeout);
}
}
catch (Throwable t) {
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
//
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}
//503,400,500, ?
// Process the request in the adapter
// , request, response
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
// ,service , adapter,
// CoyoteAdapter
adapter.service(request, response);
}
/**
* After reading the request headers, we have to setup the request filters.
*/
protected void prepareRequest() {
MimeHeaders headers = request.getMimeHeaders();
// Check for a full URI (including protocol://host:port/)
ByteChunk uriBC = request.requestURI().getByteChunk();
if (uriBC.startsWithIgnoreCase("http", 0)) {
int pos = uriBC.indexOf("://", 0, 3, 4);
int uriBCStart = uriBC.getStart();
int slashPos = -1;
if (pos != -1) {
byte[] uriB = uriBC.getBytes();
slashPos = uriBC.indexOf('/', pos + 3);
if (slashPos == -1) {
slashPos = uriBC.getLength();
// Set URI as "/"
request.requestURI().setBytes
(uriB, uriBCStart + pos + 1, 1);
} else {
request.requestURI().setBytes
(uriB, uriBCStart + slashPos,
uriBC.getLength() - slashPos);
}
MessageBytes hostMB = headers.setValue("host");
hostMB.setBytes(uriB, uriBCStart + pos + 3,
slashPos - pos - 3);
}
}
}
}
AbstractHttp 11 Processのprocessから、
Httpプロセッサは、まずHTTPのヘッダプロトコル、フィルタ、 び を して、Adapterに して します。
CoyoteAdapterを に ましたpublic class CoyoteAdapter implements Adapter {
/**
* Construct a new CoyoteProcessor associated with the specified connector.
*
* @param connector CoyoteConnector that owns this processor
*/
// Adapter Connector
public CoyoteAdapter(Connector connector) {
super();
this.connector = connector;
}
private Connector connector = null;
/**
* Encoder for the Location URL in HTTP redirects.
*/
protected static URLEncoder urlEncoder;
//url
static {
urlEncoder = new URLEncoder();
urlEncoder.addSafeCharacter('-');
urlEncoder.addSafeCharacter('_');
urlEncoder.addSafeCharacter('.');
urlEncoder.addSafeCharacter('*');
urlEncoder.addSafeCharacter('/');
}
**
* Service method.
*/
@Override
public void service(org.apache.coyote.Request req,
org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
try {
// Parse and set Catalina and configuration specific
// request parameters
req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
// ,Servlet request
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
}
}
の で したことがありますが、 connectorは つのServiceに しています。 Serviceには つのEngineがあります。 Engineには のHostがあります。
HostはPipelineがありますが、PipelineはStanddardWrappeValveチェーンです。StanddardWrappeValveは されます。
RequestのServlet は、この を するべきでしょう。connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
JioEndpに って、JIoEndpointのstartInternalの スレッド に ります。// Start async timeout thread
Thread timeoutThread = new Thread(new AsyncTimeout(),
getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();
Aync Timeout、Jio クラスを べます。/**
* Async timeout thread
*/
protected class AsyncTimeout implements Runnable {
/**
* The background thread that checks async requests and fires the
* timeout if there has been no activity.
*/
@Override
public void run() {
// Loop until we receive a shutdown command
while (running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
long now = System.currentTimeMillis();
Iterator> sockets =
waitingRequests.iterator();
while (sockets.hasNext()) {
SocketWrapper socket = sockets.next();
long access = socket.getLastAccess();
if (socket.getTimeout() > 0 &&
(now-access)>socket.getTimeout()) {
// Prevent multiple timeouts
socket.setTimeout(-1);
// Socket
processSocketAsync(socket,SocketStatus.TIMEOUT);
}
}
// Loop if endpoint is paused
while (paused && running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}
}
JIoEndpointのprocessSocketAync を べます。/**
* Process an existing async connection. If processing is required, passes
* the wrapped socket to an executor for processing.
*
* @param socket The socket associated with the client.
* @param status Only OPEN and TIMEOUT are used. The others are used for
* Comet requests that are not supported by the BIO (JIO)
* Connector.
*/
@Override
public void processSocketAsync(SocketWrapper socket,
SocketStatus status) {
try {
synchronized (socket) {
// TIMEOUT
if (waitingRequests.remove(socket)) {
SocketProcessor proc = new SocketProcessor(socket,status);
ClassLoader loader = Thread.currentThread().getContextClassLoader();
try {
//threads should not be created by the webapp classloader
if (Constants.IS_SECURITY_ENABLED) {
PrivilegedAction pa = new PrivilegedSetTccl(
getClass().getClassLoader());
AccessController.doPrivileged(pa);
} else {
Thread.currentThread().setContextClassLoader(
getClass().getClassLoader());
}
// During shutdown, executor may be null - avoid NPE
if (!running) {
return;
}
getExecutor().execute(proc);
//TODO gotta catch RejectedExecutionException and properly handle it
} finally {
if (Constants.IS_SECURITY_ENABLED) {
PrivilegedAction pa = new PrivilegedSetTccl(loader);
AccessController.doPrivileged(pa);
} else {
Thread.currentThread().setContextClassLoader(loader);
}
}
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
}
}
//waitingRequests
protected ConcurrentLinkedQueue> waitingRequests =
new ConcurrentLinkedQueue>();
まとめ:
Connect corは し、Http 11 Protocolを し、AbstractProtocolを し、AbstractProtocolを し、 する Abstract EntPoint はbindメソッドを び し、bind Jion EntPoint、bind は、 にserstractverctorry Faverctorryを します。Connectが し、Http 11 Protocolが し、AbstractProtoctocolが し、AbstractProtocolがスタートしてstartメソッドを し、Abstract EntPointのstartメソッドを し、startメソッドがstartInternalメソッドを し、startInternalテープ が され、Jionent Point Point、startnetInternalの スレッドを します。その TCP/IP を し、 スレッドAccepttorを し、Accectは にJio Entの クラスであり、Abstract Ent Pointの クラスAcctを し、 に TCP/IP を し、 スレッドAccepttorを する。AccepterはTCP/IP を した 、SocketProcesssorスレッドを してSOCKET を し、SocketProcessorsorはまずSSL hadshakeを い、その 、SocketProcessorstorは の をHttp 11 Connection Handlerに し、 にAbstractConnection Handlerのprocess(Sockets)で する。Http 11 ProcessorはAbstractHttp 11 Processorを し、AbstractHttp 11 Processのprocess(SocketWrapper socketWrapper)の を し、 は の を し、CoyoteAdapter(Http 11 ProtoctocorderのAdaptvice)のSerptdapterを び すことによって、Coder Coptdaptdaptdapterを します。ServletのService を に び します。つまりconnector.get Service().get Continer.get Pipeline().get First().invoke(request,reponse); の とこの を て、Jio EntitPointは にAccepttorを して を しています。そしてJavaSocketはjava bioに づいて しています。 を るごとにスレッドを り てて、データを んでブロックしているのが です。ArEndPointはPoller により、Apache Portable Runtimeを いて し、 native を び すことにより、より い が られますが、 なプラットフォームに することを します。