05-async
- Servlet3.0规范新增了对异步请求的支持,SpringMVC也在此基础之上对异步请求提供了方便。异步请求是在处理比较耗时的业务时先将request返回,然后另起线程处理耗时的业务,处理完之后再返回给用户
- HTTP协议是单向的,只能客户端自己拉,不能服务器主动推,Servlet对异步请求的支持并没有修改HTTP协议,而是对HTTP的巧妙利用
- 异步请求的核心原理主要分为两大类:
- 轮询:定时自动发起请求检查有没有需要返回的数据,这种方式资源浪费比较大
- 长连接:客户端发起请求,服务端处理并返回后并不结束连接,后面再次返回给客户端数据
- Servlet对异步请求采用长连接方式,异步请求在原始请求返回时并没有关闭连接,关闭的只是处理请求的线程,只有在异步请求全部处理完之后才会关闭连接
1. MyAsyncServlet
listao_boot >>> spring_mvc项目
AsyncContext asyncContext = req.startAsync();
开启 Servlet 异步上下文
public class MyAsyncServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
this.doPost(req, resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// 完成具体的核心逻辑
resp.setContentType("text/plain;charset=utf-8");
resp.setHeader("Cache-Control", "private");
PrintWriter writer = resp.getWriter();
writer.println("检査工作");
writer.flush();
// 假设有一堆的job需要完成,每一个job代表具体的业务处理逻辑
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add("job" + i);
}
// 1. 使用request中的startAsync来进行启动
AsyncContext asyncContext = req.startAsync();
// 2... MyAsyncListener
asyncContext.addListener(new MyAsyncListener());
// 3..
doWork(asyncContext, list);
writer.println("任务布置完成");
writer.flush();
}
public void doWork(final AsyncContext ac, final List<String> list) {
// 核心业务逻辑处理
// 设置超时时间
ac.setTimeout(60 * 60 * 1000L);
// 1. 开启新线程来执行具体的处理逻辑
ac.start(new Runnable() {
@Override
public void run() {
try {
PrintWriter writer = ac.getResponse().getWriter();
for (String job : list) {
writer.println(job + "正在执行过程中");
Thread.sleep(300L);
writer.flush();
}
// 2. 当请求执行完毕之后,需要给定完成通知
ac.complete();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
1. web.xml
<async-supported>true</async-supported>
<servlet>
<servlet-name>MyAsyncServlet</servlet-name>
<servlet-class>com.listao.mvc.servlet.MyAsyncServlet</servlet-class>
<async-supported>true</async-supported>
</servlet>
<servlet-mapping>
<servlet-name>MyAsyncServlet</servlet-name>
<url-pattern>/async</url-pattern>
</servlet-mapping>
2. Filter_bug
21-Jan-2024 16:19:52.320 WARNING [http-nio-8082-exec-46] org.apache.catalina.connector.Request.startAsync Unable to start async because the following classes in the processing chain do not support async [org.springframework.web.filter.HiddenHttpMethodFilter]
java.lang.IllegalStateException: A filter or servlet of the current chain does not support asynchronous operations.
at org.apache.catalina.connector.Request.startAsync(Request.java:1668)
at org.apache.catalina.connector.Request.startAsync(Request.java:1662)
at org.apache.catalina.connector.RequestFacade.startAsync(RequestFacade.java:735)
at com.listao.mvc.servlet.AsyncServlet.doPost(AsyncServlet.java:37)
at com.listao.mvc.servlet.AsyncServlet.doGet(AsyncServlet.java:17)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:529)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:623)
web.xml
中的<filter>
都注释掉
<!-- SpringMVC提供字符编码过滤器 -->
<filter>
<filter-name>characterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>utf-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- POST转换为PUT、DELETE -->
<filter>
<filter-name>hiddenHttpMethodFilter</filter-name>
<filter-class>org.springframework.web.filter.HiddenHttpMethodFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>hiddenHttpMethodFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
3. MyAsyncListener
public class MyAsyncListener implements AsyncListener {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
System.out.println("onComplete()");
// 事件获取request
asyncEvent.getSuppliedRequest();
// 事件获取response
asyncEvent.getSuppliedResponse();
// 事件获取Throwable
asyncEvent.getThrowable();
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
System.out.println("onTimeout()");
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
System.out.println("onError()");
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
System.out.println("onStartAsync()");
}
}
4. AsyncContext
package javax.servlet;
public interface AsyncContext {
static final String ASYNC_REQUEST_URI = "javax.servlet.async.request_uri";
static final String ASYNC_CONTEXT_PATH = "javax.servlet.async.context_path";
static final String ASYNC_MAPPING = "javax.servlet.async.mapping";
static final String ASYNC_PATH_INFO = "javax.servlet.async.path_info";
static final String ASYNC_SERVLET_PATH = "javax.servlet.async.servlet_path";
static final String ASYNC_QUERY_STRING = "javax.servlet.async.query_string";
public ServletRequest getRequest();
public ServletResponse getResponse();
public boolean hasOriginalRequestAndResponse();
public void dispatch();
public void dispatch(String path);
public void dispatch(ServletContext context, String path);
public void complete();
public void start(Runnable run);
public void addListener(AsyncListener listener);
public void addListener(AsyncListener listener,
ServletRequest servletRequest,
ServletResponse servletResponse);
public <T extends AsyncListener> T createListener(Class<T> clazz)
throws ServletException;
public void setTimeout(long timeout);
public long getTimeout();
}
2. mvc_async
- Mvc 的异步是对 Servlet 异步的封闭和扩展
1. AsyncWebRequest
WebAsyncManager
异步处理管理器
/**
* Extends {@link NativeWebRequest} with methods for asynchronous request processing.
*
* @author Rossen Stoyanchev
* @since 3.2
*/
public interface AsyncWebRequest extends NativeWebRequest {
/**
* Set the time required for concurrent handling to complete.
* This property should not be set when concurrent handling is in progress,
* i.e. when {@link #isAsyncStarted()} is {@code true}.
*
* @param timeout amount of time in milliseconds; {@code null} means no
* timeout, i.e. rely on the default timeout of the container.
*/
void setTimeout(@Nullable Long timeout);
/**
* 添加请求超时处理器,相当于onTimeout
* <p>
* Add a handler to invoke when concurrent handling has timed out.
*/
void addTimeoutHandler(Runnable runnable);
/**
* Add a handler to invoke when an error occurred while concurrent
* handling of a request.
*
* @since 5.0
*/
void addErrorHandler(Consumer<Throwable> exceptionHandler);
/**
* 添加请求处理完成处理器,相当于onComplete
* <p>
* Add a handler to invoke when request processing completes.
*/
void addCompletionHandler(Runnable runnable);
/**
* Mark the start of asynchronous request processing so that when the main
* processing thread exits, the response remains open for further processing
* in another thread.
*
* @throws IllegalStateException if async processing has completed or is not supported
*/
void startAsync();
/**
* 判断是否启动了异步处理
* <p>
* Whether the request is in async mode following a call to {@link #startAsync()}.
* Returns "false" if asynchronous processing never started, has completed,
* or the request was dispatched for further processing.
*/
boolean isAsyncStarted();
/**
* Dispatch the request to the container in order to resume processing after
* concurrent execution in an application thread.
*/
void dispatch();
/**
* 判断异步处理是否完成
* Whether asynchronous processing has completed.
*/
boolean isAsyncComplete();
}
2. AsyncCtl
http://localhost:8080/spring_mymvc/asyncCallable
- 当 handle 方法返回
WebAsyncTask
或Callable
时,将自动启用异步处理- 使用
WebAsyncTask
类型返回值的处理器AsyncTaskMethodReturnValueHandler
- 返回值为
null
,就会给mavContainer
设置为请求已处理,然后返回 - 返回值不为空,调用
WebAsyncManager.startCallableProcessing()
处理请求
- 使用
@Controller
public class AsyncCtl {
/**
* 1. WebAsyncTask处理器主线程进入
* 3. WebAsyncTask处理器主线程退出
* 2. WebAsyncTask执行中。。。
*/
@ResponseBody
@RequestMapping(value = "/webAsyncTask", produces = "text/plain; charset=UTF-8")
public WebAsyncTask<User> webAsyncTask(HttpServletResponse resp) {
System.out.println("1. WebAsyncTask处理器主线程进入");
resp.setContentType("application/json; charset=UTF-8");
WebAsyncTask<User> task = new WebAsyncTask<>(new Callable<User>() {
@ResponseBody
@Override
public User call() throws Exception {
Thread.sleep(5 * 1000L);
System.out.println("2. WebAsyncTask执行中。。。");
return new User("ooxx", 11);
}
});
System.out.println("3. WebAsyncTask处理器主线程退出");
return task; // ===>>> AsyncTaskMethodReturnValueHandler
}
/**
* 1. Callable处理器主线程进入
* 3. Callable处理器主线程退出
* 2. Callable处理执行中。。。
*/
@ResponseBody
@RequestMapping(value = "/asyncCallable", produces = "text/plain; charset=UTF-8")
public Callable<String> callable() {
System.out.println("1. Callable处理器主线程进入");
Callable<String> callable = () -> {
Thread.sleep(5 * 1000L);
System.out.println("2. Callable处理执行中。。。");
return "久等了";
};
System.out.println("3. Callable处理器主线程退出");
return callable;
}
}
3. SpringMVC实现异步处理
- 在SpringMVC中为了方便异步请求
AsyncWebRequest
:request
类型专门处理异步请求WebAsyncManager
:异步请求处理管理器,管理整个异步处理过程WebAsyncUtils
:用来获取WebAsyncManager
对象以及创建AsyncWebRequest
- 处理流程如下:
- 处理器中返回需要启动异步处理的类型,相应返回值处理器会调用
WebAsyncManager
的相关方法启动异步处理 - 然后
DispatcherServlet
将原来请求直接返回。当异步处理完成后,会重新发出一个相同的请求,在RequestMappingHandlerAdapter
中会使用特殊的ServletInvocableHandlerMethod
来处理请求 - 异步处理结果是异常类型则抛出异常,否则直接返回异步处理结果,然后使用返回值处理器处理
- 接着返回
DispatcherServlet
中按正常流程往下处理
- 处理器中返回需要启动异步处理的类型,相应返回值处理器会调用
1. FrameworkServlet
FrameworkServlet#processRequest()
给当前请求的WebAsyncManager
添加CallableInterceptor
类型的拦截器RequestBindingInterceptor
- 目的:在请求处理前,将当前请求的
LocaleContext
和ServletRequestAttributes
设置到LocaleContextHolder
和RequestContextHolder
中,并在请求处理完成后恢复
@SuppressWarnings("serial")
public abstract class FrameworkServlet extends HttpServletBean implements ApplicationContextAware {
protected final void processRequest(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// 记录当前时间,用于计算处理请求花费的时间
long startTime = System.currentTimeMillis();
// 记录异常,用于保存处理请求过程中发送的异常
Throwable failureCause = null;
// 1. 获取LocaleContextHolder中原来保存的LocaleContext(保存的本地化信息)
LocaleContext previousLocaleContext = LocaleContextHolder.getLocaleContext();
// 2.1. 获取当前请求的LocaleContext
LocaleContext localeContext = buildLocaleContext(request);
// 获取RequestContextHolder总原来保存的RequestAttribute(管理request和session的属性)
RequestAttributes previousAttributes = RequestContextHolder.getRequestAttributes();
// 获取当前请求的ServletRequestAttribute
ServletRequestAttributes requestAttributes = buildRequestAttributes(request, response, previousAttributes);
// 获取异步管理器
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.registerCallableInterceptor(FrameworkServlet.class.getName(), new RequestBindingInterceptor());
// 2.2. 当前请求的LocaleContext、ServletRequestAttribute设置到LocaleContextHolder、RequestContextHolder
initContextHolders(request, localeContext, requestAttributes);
try {
// 3. 执行真正的逻辑
doService(request, response);
} catch (ServletException | IOException ex) {
// 记录抛出的异常
failureCause = ex;
throw ex;
} catch (Throwable ex) {
// 记录抛出的异常
failureCause = ex;
throw new NestedServletException("Request processing failed", ex);
} finally {
// 2.3. 恢复原来的LocaleContext、ServletRequestAttributes到LocaleContextHolder、RequestContextHolder
resetContextHolders(request, previousLocaleContext, previousAttributes);
if (requestAttributes != null) {
requestAttributes.requestCompleted();
}
// 日志级别为debug,打印请求日志
logResult(request, response, failureCause, asyncManager);
// 发布ServletRequestHandledEvent请求处理完成事件
publishRequestHandledEvent(request, response, startTime, failureCause);
}
}
/**
* CallableProcessingInterceptor implementation that initializes and resets
* FrameworkServlet's context holders, i.e. LocaleContextHolder and RequestContextHolder.
*/
private class RequestBindingInterceptor implements CallableProcessingInterceptor {
@Override
public <T> void preProcess(NativeWebRequest webRequest, Callable<T> task) {
HttpServletRequest request = webRequest.getNativeRequest(HttpServletRequest.class);
if (request != null) {
HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
initContextHolders(request, buildLocaleContext(request), buildRequestAttributes(request, response, null));
}
}
@Override
public <T> void postProcess(NativeWebRequest webRequest, Callable<T> task, Object concurrentResult) {
HttpServletRequest request = webRequest.getNativeRequest(HttpServletRequest.class);
if (request != null) {
resetContextHolders(request, null, null);
}
}
}
}
2. DispatcherServlet
- 当
HandlerAdapter
使用 handler 处理完请求时,会检查是否已经启动了异步处理,如果启动了则不再往下处理,直接返回 if (asyncManager.isConcurrentHandlingStarted()) {
启动异步处理,直接进行return;
public class DispatcherServlet extends FrameworkServlet {
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
HttpServletRequest processedRequest = request;
HandlerExecutionChain mappedHandler = null;
boolean multipartRequestParsed = false;
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
try {
ModelAndView mv = null;
Exception dispatchException = null;
try {
processedRequest = checkMultipart(request);
multipartRequestParsed = (processedRequest != request);
// Determine handler for the current request.
mappedHandler = getHandler(processedRequest);
if (mappedHandler == null) {
noHandlerFound(processedRequest, response);
return;
}
// Determine handler adapter for the current request.
HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());
// Process last-modified header, if supported by the handler.
String method = request.getMethod();
boolean isGet = "GET".equals(method);
if (isGet || "HEAD".equals(method)) {
long lastModified = ha.getLastModified(request, mappedHandler.getHandler());
if (new ServletWebRequest(request, response).checkNotModified(lastModified) && isGet) {
return;
}
}
if (!mappedHandler.applyPreHandle(processedRequest, response)) {
return;
}
// Actually invoke the handler.
mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
if (asyncManager.isConcurrentHandlingStarted()) {
return;
}
applyDefaultViewName(processedRequest, mv);
mappedHandler.applyPostHandle(processedRequest, response, mv);
}
catch (Exception ex) {
dispatchException = ex;
}
catch (Throwable err) {
// As of 4.3, we're processing Errors thrown from handler methods as well,
// making them available for @ExceptionHandler methods and other scenarios.
dispatchException = new NestedServletException("Handler dispatch failed", err);
}
processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException);
}
catch (Exception ex) {
triggerAfterCompletion(processedRequest, response, mappedHandler, ex);
}
catch (Throwable err) {
triggerAfterCompletion(processedRequest, response, mappedHandler,
new NestedServletException("Handler processing failed", err));
}
finally {
if (asyncManager.isConcurrentHandlingStarted()) {
// Instead of postHandle and afterCompletion
if (mappedHandler != null) {
mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response);
}
}
else {
// Clean up any resources used by a multipart request.
if (multipartRequestParsed) {
cleanupMultipart(processedRequest);
}
}
}
}
}
3. RequestMappingHandlerAdapter
RequestMappingHandlerAdapter#invokeHandleMethod()
提供了对异步请求的核心支持,其中做了时间跟异步处理相关事情- 创建
AsyncWebRequest
并设置超时时间 - 对当前请求的
WebAsyncManager
设置四个属性 - 如果当前请求是异步请求而且已经处理出了结果,则将异步处理结果和
ModelAndViewContainer
取出来,然后调用ServletInvocableHandlerMethod#wrapConcurrentResult()
方法创建ConcurrentResultHandlerMethod
类型的ServletInvocableHandlerMethod
来替换自己 invokeAndHandler()
执行完成后,检查到当前请求已经启动了异步处理,则会直接返回null
- 创建
public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter
implements BeanFactoryAware, InitializingBean {
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
// request、response创建ServletWebRequest,相当于工具类
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
// @InitBinder处理
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
// 处理model。1-在处理器处理之前对model进行初始化,2-处理完请求后对model参数进行更新
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
// 参数绑定、处理请求、返回值处理
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
// 参数处理器
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
// 返回值处理器
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
// 参数绑定工厂对象
invocableMethod.setDataBinderFactory(binderFactory);
// 参数名称发现器
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
// 保存model、View
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
// flashMap
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
// 使用modelFactory将sessionAttributes和注释了@ModelAttribute的方法的参数设置到model中
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
// 根据配置对ignoreDefaultModelOnRedirect进行设置
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
// 创建AsyncWebRequest异步请求对象
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
// WebAsyncManager异步请求管理器
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
// 如果当前异步请求已经处理并得到结果,则将返回的结果放到mavContainer对象中,然后将invocable对象进行包装转换,转成需要的执行对象然后开始执行
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
// 转换具体的invocable执行对象
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
// 执行调用
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
// 处理完请求后的后置处理,此处一共做了三件事,
// 1. 调用ModelFactory的updateModel方法更新model,包括设置SessionAttribute和给Model设置BinderResult
// 2. 根据mavContainer创建了ModelAndView
// 3. 如果mavContainer里的model是RedirectAttributes类型,则将其设置到FlashMap
return getModelAndView(mavContainer, modelFactory, webRequest);
} finally {
// 标记请求完成
webRequest.requestCompleted();
}
}
}
4. async返回值处理器
四个异步请求的返回值处理器
AsyncTaskMethodReturnValueHandler
CallableMethodReturnValueHandler
DeferredResultMethodReturnValueHandler
ListenableFutureReturnValueHandler
每一种对应一种类型的返回值,主要作用是使用WebAsyncManager
启动异步处理
public class ServletInvocableHandlerMethod extends InvocableHandlerMethod {
/**
* 返回结果处理器组合对象
*/
@Nullable
private HandlerMethodReturnValueHandlerComposite returnValueHandlers;
/**
* Invoke the method and handle the return value through one of the
* configured {@link HandlerMethodReturnValueHandler HandlerMethodReturnValueHandlers}.
*
* @param webRequest the current request
* @param mavContainer the ModelAndViewContainer for this request
* @param providedArgs "given" arguments matched by type (not resolved)
*/
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer,
Object... providedArgs) throws Exception {
// 1. 调用父类的invokeForRequest执行请求
Object returnValue = invokeForRequest(webRequest, mavContainer, providedArgs);
// 2. 处理@ResponseStatus注解
setResponseStatus(webRequest);
// 处理返回值,判断返回值是否为空
if (returnValue == null) { // skip
// request的NotModified为true,有@ResponseStatus,RequestHandled为true,三个条件有一个成立,则设置请求处理完成并返回
if (isRequestNotModified(webRequest) || getResponseStatus() != null || mavContainer.isRequestHandled()) {
disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
}
// 返回值不为null,@ResponseStatus存在reason,这是请求处理完成并返回
else if (StringUtils.hasText(getResponseStatusReason())) { // skip
mavContainer.setRequestHandled(true);
return;
}
// 前面都不成立,则设置RequestHandled为false即请求完成?
mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {
// 使用returnValueHandlers处理返回值
this.returnValueHandlers.handleReturnValue(
returnValue, getReturnValueType(returnValue), mavContainer, webRequest);
} catch (Exception ex) {
if (logger.isTraceEnabled()) {
logger.trace(formatErrorForReturnValue(returnValue), ex);
}
throw ex;
}
}
}
1. AsyncTaskMtdRtnValHandler
public class AsyncTaskMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return WebAsyncTask.class.isAssignableFrom(returnType.getParameterType());
}
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
WebAsyncTask<?> webAsyncTask = (WebAsyncTask<?>) returnValue;
if (this.beanFactory != null) {
webAsyncTask.setBeanFactory(this.beanFactory);
}
// 1... WebAsyncUtils, WebAsyncManager
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(webAsyncTask, mavContainer);
}
}
1. WebAsyncUtils
public abstract class WebAsyncUtils {
/**
* The name attribute containing the {@link WebAsyncManager}.
*/
public static final String WEB_ASYNC_MANAGER_ATTRIBUTE =
WebAsyncManager.class.getName() + ".WEB_ASYNC_MANAGER";
/**
* 通过request获取WebAsyncManager
*/
public static WebAsyncManager getAsyncManager(ServletRequest servletRequest) {
WebAsyncManager asyncManager = null;
Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
if (asyncManagerAttr instanceof WebAsyncManager) {
asyncManager = (WebAsyncManager) asyncManagerAttr;
}
if (asyncManager == null) {
asyncManager = new WebAsyncManager();
servletRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager);
}
return asyncManager;
}
/**
* Obtain the {@link WebAsyncManager} for the current request, or if not
* found, create and associate it with the request.
*/
public static WebAsyncManager getAsyncManager(WebRequest webRequest) {
int scope = RequestAttributes.SCOPE_REQUEST; // 0
WebAsyncManager asyncManager = null;
Object asyncManagerAttr = webRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, scope);
if (asyncManagerAttr instanceof WebAsyncManager) {
asyncManager = (WebAsyncManager) asyncManagerAttr;
}
if (asyncManager == null) {
asyncManager = new WebAsyncManager();
webRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager, scope);
}
return asyncManager;
}
/**
* 创建AsyncWebRequest
* <p>
* Create an AsyncWebRequest instance. By default, an instance of
* {@link StandardServletAsyncWebRequest} gets created.
*
* @param request the current request
* @param response the current response
* @return an AsyncWebRequest instance (never {@code null})
*/
public static AsyncWebRequest createAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
return new StandardServletAsyncWebRequest(request, response);
}
}
2. WebAsyncManager
public final class WebAsyncManager {
// CallableProcessingInterceptor类型,专门用来处理Callable和WebAsyncTask类型超时的拦截器
private static final CallableProcessingInterceptor timeoutCallableInterceptor =
new TimeoutCallableProcessingInterceptor();
// DeferredResultProcessingInterceptor类型,专门用于处理DeferredResult类型超时的拦截器
private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor =
new TimeoutDeferredResultProcessingInterceptor();
// 为了支持异步处理而封装的request
private AsyncWebRequest asyncWebRequest;
/**
* 处理WebAsyncTask类型的异步请求
* <p>
* 所有的启动方法都做了以下几件事,1、启动异步处理,2、给request设置相应属性,3、调用对应的拦截器
* <p>
* Use the given {@link WebAsyncTask} to configure the task executor as well as
* the timeout value of the {@code AsyncWebRequest} before delegating to
* {@link #startCallableProcessing(Callable, Object...)}.
*
* @param webAsyncTask a WebAsyncTask containing the target {@code Callable}
* @param processingContext additional context to save that can be accessed
* via {@link #getConcurrentResultContext()}
* @throws Exception if concurrent processing failed to start
*/
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
throws Exception {
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
// 如果webAsyncTask设置了超时时间,则将其设置到request
Long timeout = webAsyncTask.getTimeout();
if (timeout != null) { // skip
this.asyncWebRequest.setTimeout(timeout);
}
// 如果webAsyncTask中定义了executor则设置到taskExecutor中
AsyncTaskExecutor executor = webAsyncTask.getExecutor();
if (executor != null) { // skip
this.taskExecutor = executor;
} else {
logExecutorWarning();
}
// 创建并初始化拦截器临时变量,包括三部分,1、webAsyncTask中包含的拦截器,2、所有CallableInterceptors属性包含的拦截器,3、超时拦截器
List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(webAsyncTask.getInterceptor());
interceptors.addAll(this.callableInterceptors.values());
interceptors.add(timeoutCallableInterceptor);
// 从webAsyncTask中取出真正执行请求的Callable任务
final Callable<?> callable = webAsyncTask.getCallable();
final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
// 给request添加超时处理器
this.asyncWebRequest.addTimeoutHandler(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Async request timeout for " + formatRequestUri());
}
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
setConcurrentResultAndDispatch(result);
}
});
// 给request添加请求错误的处理器
this.asyncWebRequest.addErrorHandler(ex -> {
if (!this.errorHandlingInProgress) {
if (logger.isDebugEnabled()) {
logger.debug("Async request error for " + formatRequestUri() + ": " + ex);
}
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
setConcurrentResultAndDispatch(result);
}
});
// 给request添加请求处理完成的处理器
this.asyncWebRequest.addCompletionHandler(() ->
interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));
// 执行拦截器链中的applyBeforeConcurrentHandling
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
// 1. 启动异步处理
startAsyncProcessing(processingContext);
// 使用taskExecutor执行请求
try {
Future<?> future = this.taskExecutor.submit(() -> {
Object result = null;
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
result = callable.call();
} catch (Throwable ex) {
result = ex;
} finally {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
}
// 设置处理结果并发送请求
setConcurrentResultAndDispatch(result);
});
interceptorChain.setTaskFuture(future);
} catch (RejectedExecutionException ex) {
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
setConcurrentResultAndDispatch(result);
throw ex;
}
}
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
// 用来保存异步处理结果
this.concurrentResult = result;
this.errorHandlingInProgress = (result instanceof Throwable);
}
// 检测当前request是否已设置为异步处理完成状态
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatRequestUri());
}
return;
}
if (logger.isDebugEnabled()) {
boolean isError = result instanceof Throwable;
logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
}
this.asyncWebRequest.dispatch();
}
private void startAsyncProcessing(Object[] processingContext) {
synchronized (WebAsyncManager.this) {
// 清空之前并发处理的结果
this.concurrentResult = RESULT_NONE;
// 将processingContext设置给concurrentResultContext属性
this.concurrentResultContext = processingContext;
this.errorHandlingInProgress = false;
}
// 1. StandardServletAsyncWebRequest 调用asyncWebRequest的startAsync方法启动异步处理
this.asyncWebRequest.startAsync();
if (logger.isDebugEnabled()) {
logger.debug("Started async request");
}
}
}