NutzCN Logo
分享 在Nutz内支持异步处理的分析和一些实践。
发布于 3129天前 作者 whitemay 3066 次浏览 复制 上一个帖子 下一个帖子
标签:

Servlet 3.0开始起,已经支持异步HTTP请求处理。但是Nutz版本到现在依然没有对异步处理进行支持。我这里给出了一些分析和建议。
首先,在Nutz框架内,通常我们是通过NutFilter来接管用户的所有请求。这是各框架的一种常见做法。但是,它与Servlet中关于异步处理的实现理念有一点点冲突。
根据Nutz框架的设计,显然不认为长时间的工作由NutFilter来完成有任何问题。但在架构上来看,我感觉Servlet的设计者认为,Filter应该是完成一些快速的处理工作,而可能长时间处理的任务应放到Servlet中间完成。因此,Servlet 3.0中,Filter虽然包括了异步支持这个选项,但实际它本身并不支持内部有异步的操作。这就让扩展NutFilter来支持异步处理存在天然的难度。

下面提供了我使用Nutz支持异步处理的一些实践。
为了支持异步请求,首先,需要将拦截器设到NutServlet上,而不是NutFilter上。web.xml修改如下:

nutzmvc
org.nutz.mvc.NutServlet

modules
com.example.MainModule

true


nutzmvc
/*

其次,修改可能触发异步操作的MethodInvokeProcessor,自定义一个如下:
public class AsyncMethodInvokeProcessor extends AbstractProcessor {
@Override
public void process(ActionContext ac) throws Throwable {
Object module = ac.getModule();
Method method = ac.getMethod();
Object[] args = ac.getMethodArgs();
try {
Object invokeResult = null;
// 原有过程有可能被打断,准备接受一个异步Future对象。
if (Mvcs.disableFastClassInvoker)
invokeResult = method.invoke(module, args);
else
invokeResult = FastClassFactory.invoke(module, method, args);

        // 根据是否返回一个异步对象,来判断是否该启动异步过程
        if (invokeResult instanceof CompletableFuture) {
           AsyncContext async = Mvcs.getReq().startAsync(Mvcs.getReq(), Mvcs.getResp());
           ac.set("AsyncContext", async);
           CompletableFuture<?> future = (CompletableFuture<?>) invokeResult;
           future.thenAccept(result->{
             // 异步模式下,收到结果再继续进行下一步处理。
             ac.setMethodReturn(result);
             try {
              doNext(ac);
          } catch (Throwable e) {
              // 如何将异步过程的错误进行处理?没有好的想法。
              e.printStackTrace();
          }
           });
        } else {
         ac.setMethodReturn(invokeResult);
         doNext(ac);
        }
    } 
    catch (IllegalAccessException e) {
        throw Lang.unwrapThrow(e);
    }
    catch (IllegalArgumentException e) {
        throw Lang.unwrapThrow(e);
    }
    catch (InvocationTargetException e) {
        throw e.getCause();
    }
}

}
这里假设异步操作返回的是一个CompletableFuture对象,这也是Java8中间对异步操作支持非常好的一个系统对象,与Promise对象很类似。
最后,需要在全部IO操作完成后,结束异步过程。
public class AsyncEndProcessor extends AbstractProcessor {
@Override
public void process(ActionContext ac) throws Throwable {
AsyncContext async = (AsyncContext) ac.get("AsyncContext");
if (async!=null) {
async.complete();
}
}
}
自定义Nutz的chain文件,注意一下里面改了两个Processor:
{
"async" : {
"ps" : [
"org.nutz.mvc.impl.processor.UpdateRequestAttributesProcessor",
"org.nutz.mvc.impl.processor.EncodingProcessor",
"org.nutz.mvc.impl.processor.ModuleProcessor",
"!org.nutz.integration.shiro.NutShiroProcessor",
"org.nutz.mvc.impl.processor.ActionFiltersProcessor",
"org.nutz.mvc.impl.processor.AdaptorProcessor",
"!org.nutz.plugins.validation.ValidationProcessor",
"com.example.async.AsyncMethodInvokeProcessor",
"org.nutz.mvc.impl.processor.ViewProcessor",
"com.example.async.AsyncEndProcessor",
],
"error" : 'org.nutz.mvc.impl.processor.FailProcessor'
}
}
来个支持该方式的样例方法:
@At("/HelloAsync")
@Chain("async")
public CompletableFuture Hello(HttpServletRequest request, HttpServletResponse response) throws IOException{
User user = new User();
user.name = "abc";

    return CompletableFuture.supplyAsync(()->{
       try {
         Thread.sleep(10000);
       } catch (Exception e) {
       }
       user.surname = "ttyt";
       return user;
    });
}

到此为止,一个能够支持异步的Nutz框架就大功告成了。
希望有兴趣的朋友踊跃测试和提意见,也欢迎各种讨论。本人QQ:356386。
也希望对异步处理的支持能够尽快进入到Nutz的主版本内。

10 回复

重发里面的一些代码:
web.xml

  <servlet>
  	<servlet-name>nutzmvc</servlet-name>
  	<servlet-class>org.nutz.mvc.NutServlet</servlet-class>
    <init-param>
      <param-name>modules</param-name>
      <param-value>com.foperate.MainModule</param-value>
    </init-param>
    <async-supported>true</async-supported>
  </servlet>
  <servlet-mapping>
  	<servlet-name>nutzmvc</servlet-name>
  	<url-pattern>/*</url-pattern>
  </servlet-mapping>

这写法跟我之前测试的有点类似

http://wendal.net/2013/09/06.html

public class AsyncMethodInvokeProcessor extends AbstractProcessor {
	
	@Override
	public void process(ActionContext ac) throws Throwable {
        Object module = ac.getModule();
        Method method = ac.getMethod();
        Object[] args = ac.getMethodArgs();
        try {
        	Object invokeResult = null;
        	// 原有过程有可能被打断,准备接受一个异步Future对象。
        	if (Mvcs.disableFastClassInvoker)
        		invokeResult = method.invoke(module, args);
        	else
        		invokeResult = FastClassFactory.invoke(module, method, args);
        	
        	// 根据是否返回一个异步对象,来判断是否该启动异步过程
        	if (invokeResult instanceof CompletableFuture) {
        		AsyncContext async = Mvcs.getReq().startAsync(Mvcs.getReq(), Mvcs.getResp());
        		ac.set("AsyncContext", async);
        		CompletableFuture<?> future = (CompletableFuture<?>) invokeResult;
        		future.thenAccept(result->{
        			// 异步模式下,收到结果再继续进行下一步处理。
        			ac.setMethodReturn(result);
        			try {
						doNext(ac);
					} catch (Throwable e) {
						// 如何将异步过程的错误进行处理?没有好的想法。
						e.printStackTrace();
					}
        		});
        	} else {
    			ac.setMethodReturn(invokeResult);
    			doNext(ac);
        	}
        } 
        catch (IllegalAccessException e) {
            throw Lang.unwrapThrow(e);
        }
        catch (IllegalArgumentException e) {
            throw Lang.unwrapThrow(e);
        }
        catch (InvocationTargetException e) {
            throw e.getCause();
        }
    }
}

{
	"async" : {
		"ps" : [
		      "org.nutz.mvc.impl.processor.UpdateRequestAttributesProcessor",
		      "org.nutz.mvc.impl.processor.EncodingProcessor",
		      "org.nutz.mvc.impl.processor.ModuleProcessor",
		      "!org.nutz.integration.shiro.NutShiroProcessor",
		      "org.nutz.mvc.impl.processor.ActionFiltersProcessor",
		      "org.nutz.mvc.impl.processor.AdaptorProcessor",
		      "!org.nutz.plugins.validation.ValidationProcessor",
		      "com.foperate.async.AsyncMethodInvokeProcessor",
		      "org.nutz.mvc.impl.processor.ViewProcessor",
		      "com.foperate.async.AsyncEndProcessor",
		      ],
		"error" : 'org.nutz.mvc.impl.processor.FailProcessor'
	}
}

@wendal 嗯,这边的思路是直接针对Servlet 3.0的,不依赖于任何具体的WebServer。
所依赖的类只在Java8或者Servlet 3.0 API里面。

@wendal
好像不一样。Jetty的思路是试图在长时间的任务完成后重新启动请求及处理过程。
而我这个实现的思路是直接中断当前Http处理线程内的流程,把长时间任务及之后的过程直接交给其它的线程池来完成,最后再通过servlet 3.0的机制完成结果输出。
关于其它线程内的异常该怎么处理和输出,还希望你多提建议。

再来继续改这个例子;
在MVC下定义一个新的Annotation,叫做Async:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface Async {}

定义一个新的Process:

public class AsyncBeginProcessor extends AbstractProcessor  {
	@Override
	public void process(ActionContext ac) throws Throwable {
        Method method = ac.getMethod();
        if (method.getAnnotation(Async.class)!=null) {
    		AsyncContext async = Mvcs.getReq().startAsync(Mvcs.getReq(), Mvcs.getResp());
    		ac.set("AsyncContext", async);
        }
		doNext(ac);
	}
}

修改后的chain文件:

{
	"async" : {
		"ps" : [
		      "org.nutz.mvc.impl.processor.UpdateRequestAttributesProcessor",
		      "org.nutz.mvc.impl.processor.EncodingProcessor",
		      "org.nutz.mvc.impl.processor.ModuleProcessor",
		      "!org.nutz.integration.shiro.NutShiroProcessor",
		      "org.nutz.mvc.impl.processor.ActionFiltersProcessor",
		      "org.nutz.mvc.impl.processor.AdaptorProcessor",
		      "!org.nutz.plugins.validation.ValidationProcessor",
		      "com.example.async.AsyncBeginProcessor",
		      "com.example.async.AsyncMethodInvokeProcessor",
		      "org.nutz.mvc.impl.processor.ViewProcessor",
		      "com.example.async.AsyncEndProcessor",
		      ],
		"error" : 'org.nutz.mvc.impl.processor.FailProcessor'
	}
}

使用新标记的例子,换了一种使用Future的方法来举例。

	@At("/HelloAsync")
	@Chain("async")
	@Async
	public CompletableFuture<?> Hello(HttpServletRequest request, HttpServletResponse response) throws IOException{
		User user = new User();
		user.name = "abc";
		CompletableFuture<User> future = new CompletableFuture<User>();
		CompletableFuture.runAsync(()->{
			try {
				Thread.sleep(1000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).thenRun(()->{
			user.surname = "ttyt";
			future.complete(user);
		});
		return future;
	}

AsyncMethodInvokeProcessor中间相应的话要去掉:

        	// 根据是否返回一个异步对象,来判断是否该启动异步过程
        	if (invokeResult instanceof CompletableFuture) {
        		//AsyncContext async = Mvcs.getReq().startAsync(Mvcs.getReq(), Mvcs.getResp()); 去掉
        		//ac.set("AsyncContext", async); 去掉
        		CompletableFuture<?> future = (CompletableFuture<?>) invokeResult;

PS: 已经内置了一个叫Async的注解....

@wendal
嗯,我知道。那个是AOP模块下的注解。当然,注解改个名字并不是问题。
关键是我觉得现在的方案还是一个讨论稿,抛砖引玉。也想看看大家有没有什么新的思路,相互启发下。

顺便说一下引入异步模式的作用,不一定只是要应付需要长时间处理的状况,还包括:
1、便于和异步Dao接口,或其它异步接口对接;
2、将处理时间不确定的操作从Http处理线程中解放出来,用专门的线程池进行处理,有效保证Http处理线程无阻塞和高效;
3、和Jetty这类框架设计上倾向于异步模式的WebServer更容易配合工作。

再修改:使用NutServlet的一个问题是,它不像NutFilter一样拦截了所有请求后,还可以正常将请求转给容器进行处理。因此,上面的配置在处理静态文件或者其它文件时就会出问题。
需要修改web.xml中的servlet-mapping部分如下:

<!-- 需要通过下面的mapping避免静态文件送到nutz解析 -->
<servlet-mapping>
  <servlet-name>default</servlet-name>
  <url-pattern>/assets/*</url-pattern>
  <url-pattern>*.html</url-pattern>
  <url-pattern>*.htm</url-pattern>
</servlet-mapping>
<!-- 同时还需要将pattern设成/表示nutz是缺省处理器,而非匹配处理器 -->
<servlet-mapping>
  <servlet-name>nutz</servlet-name>
  <url-pattern>/</url-pattern>
</servlet-mapping>

添加回复
请先登陆
回到顶部