阅读: 2 评论: 0 作者: Fervour 发表于 2009-12-19 17:59 原文链接

最近开始研究jetty 7。已经出到稳定版了。我相信大多JETTY的爱好者已经看过了。
这里呢。对jetty 7的continuation总结一下。

为了做一个server long push的WEB应用。我选择了jetty。对于Jetty,我只能说是一个新手,在网上搜资料的时候,发现相关资料少个可怜,中文的资料都是一个抄一个,或者就是翻译的。对于jetty我走了弯路。迫使自己看是看jetty的源文件。

大家肯定都知道jetty的continuation是建立的NIO技术的基础上。使WEB 服务器对大量的HTTP申请做阻塞不必开启过多的线程。避免了浪费。而jetty是将封装好的servlet加入一个等待队列。之后做轮询。哪个满足条件。就调用用户的方法去操作response。

刚开始作为一个第一次用jetty的人,我把suspend/complete和suspend/resunme产生了一个错误的理解。我以为这个方法只是阻塞申请。于是我建立了一个方法。里面做了一个循环去判断某个值,如果存在就反馈并complete。而最后发现jetty自带了遍历的功能。不用你去写循环。
正确的写法如下:
//判断applition中是否有msg有的话反馈,没有的话就阻塞。
protected void processRequest(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {
//得到applition对象
ServletContext applition = getServletConfig().getServletContext();
//获取continuation
Continuation continuation = ContinuationSupport.getContinuation(request);
//设定超时时间、可以不设置。默认为30秒
continuation.setTimeout(0);
//阻塞
continuation.suspend();
//如果applition中有msg这个字符。开放阻塞
if(applition.getAttribute("msg").toString() != null){
continuation.complete();
response.getServletResponse().getWriter().print(
applition.getAttribute("msg").toString()
);
}
}

其他你都不用管。服务器会做循环来调用你的serlvet。然后调用你的判断方法做处理。
要注意的一点是:
我做了一个小小的测试。我挂起了20个长连接去判断自己需求的信息,但是呢。我第一步就给与第9个长连接做判断的对象赋值,按理论上来说,它应该立即反馈的。
错!
它不会反馈给你。因为服务器端对每个客户端挂起的长连接做了限制。代码如下:
package org.eclipse.jetty.continuation;
import java.util.ArrayList;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.ServletResponseWrapper;
class FauxContinuation
implements ContinuationFilter.FilteredContinuation
{
private static final ContinuationThrowable __exception = new ContinuationThrowable();
//这里就是限制你为什么不能一台机器申请挂起多个长连接的原因。
private static final int __HANDLING = 1;
private static final int __SUSPENDING = 2;
private static final int __RESUMING = 3;
private static final int __COMPLETING = 4;
private static final int __SUSPENDED = 5;
private static final int __UNSUSPENDING = 6;
private static final int __COMPLETE = 7;
private final ServletRequest _request;
private ServletResponse _response;
private int _state = 1;
private boolean _initial = true;
private boolean _resumed = false;
private boolean _timeout = false;
private boolean _responseWrapped = false;
private long _timeoutMs = 30000L;
private ArrayList<ContinuationListener> _listeners;
FauxContinuation(ServletRequest request)
{
this._request = request;
}
public void onComplete()
{
if (this._listeners != null)
for (ContinuationListener l : this._listeners)
l.onComplete(this);
}
public void onTimeout()
{
if (this._listeners != null)
for (ContinuationListener l : this._listeners)
l.onTimeout(this);
}
public boolean isResponseWrapped()
{
return this._responseWrapped;
}
public boolean isInitial()
{
synchronized (this)
{
return this._initial;
}
}
public boolean isResumed()
{
synchronized (this)
{
return this._resumed;
}
}
public boolean isSuspended()
{
synchronized (this)
{
switch (this._state)
{
case 1:
return false;
case 2:
case 3:
case 4:
case 5:
return true;
case 6:
}
return false;
}
}
public boolean isExpired()
{
synchronized (this)
{
return this._timeout;
}
}
public void setTimeout(long timeoutMs)
{
this._timeoutMs = timeoutMs;
}
public void suspend(ServletResponse response)
{
this._response = response;
this._responseWrapped = response instanceof ServletResponseWrapper;
suspend();
}
public void suspend()
{
synchronized (this)
{
switch (this._state)
{
case 1:
this._timeout = false;
this._resumed = false;
this._state = 2;
return;
case 2:
case 3:
return;
case 4:
case 5:
case 6:
throw new IllegalStateException(getStatusString());
}
throw new IllegalStateException("" + this._state);
}
}
public void resume()
{
synchronized (this)
{
switch (this._state)
{
case 1:
this._resumed = true;
return;
case 2:
this._resumed = true;
this._state = 3;
return;
case 3:
case 4:
return;
case 5:
fauxResume();
this._resumed = true;
this._state = 6;
break;
case 6:
this._resumed = true;
return;
default:
throw new IllegalStateException(getStatusString());
}
}
}
public void complete()
{
synchronized (this)
{
switch (this._state)
{
case 1:
throw new IllegalStateException(getStatusString());
case 2:
this._state = 4;
break;
case 3:
break;
case 4:
return;
case 5:
this._state = 4;
fauxResume();
break;
case 6:
return;
default:
throw new IllegalStateException(getStatusString());
}
}
}
public boolean enter(ServletResponse response)
{
this._response = response;
return true;
}
public ServletResponse getServletResponse()
{
return this._response;
}
void handling()
{
synchronized (this)
{
this._responseWrapped = false;
switch (this._state)
{
case 1:
throw new IllegalStateException(getStatusString());
case 2:
case 3:
throw new IllegalStateException(getStatusString());
case 4:
return;
case 5:
fauxResume();
case 6:
this._state = 1;
return;
}
throw new IllegalStateException("" + this._state);
}
}
public boolean exit()
{
synchronized (this)
{
switch (this._state)
{
case 1:
this._state = 7;
onComplete();
return true;
case 2:
this._initial = false;
this._state = 5;
fauxSuspend();
if ((this._state == 5) || (this._state == 4))
{
onComplete();
return true;
}
this._initial = false;
this._state = 1;
return false;
case 3:
this._initial = false;
this._state = 1;
return false;
case 4:
this._initial = false;
this._state = 7;
onComplete();
return true;
case 5:
case 6:
}
throw new IllegalStateException(getStatusString());
}
}
protected void expire()
{
synchronized (this)
{
this._timeout = true;
}
onTimeout();
synchronized (this)
{
switch (this._state)
{
case 1:
return;
case 2:
this._timeout = true;
this._state = 3;
fauxResume();
return;
case 3:
return;
case 4:
return;
case 5:
this._timeout = true;
this._state = 6;
break;
case 6:
this._timeout = true;
return;
default:
throw new IllegalStateException(getStatusString());
}
}
}
private void fauxSuspend()
{
long expire_at = System.currentTimeMillis() + this._timeoutMs;
long wait = this._timeoutMs;
while ((this._timeoutMs > 0L) && (wait > 0L))
{
try
{
super.wait(wait);
}
catch (InterruptedException e)
{
break label51:
}
wait = expire_at - System.currentTimeMillis();
}
if ((this._timeoutMs > 0L) && (wait <= 0L))
label51: expire();
}
private void fauxResume()
{
this._timeoutMs = 0L;
super.notifyAll();
}
public String toString()
{
return getStatusString();
}
String getStatusString()
{
synchronized (this)
{
return ((this._state == 4) ? "COMPLETING" : (this._state == 6) ? "UNSUSPENDING" : (this._state == 3) ? "RESUMING" : (this._state == 5) ? "SUSPENDED" : (this._state == 2) ? "SUSPENDING" : (this._state == 1) ? "HANDLING" : new StringBuilder().append("???").append(this._state).toString()) + ((this._initial) ? ",initial" : "") + ((this._resumed) ? ",resumed" : "") + ((this._timeout) ? ",timeout" : "");
}
}
public void addContinuationListener(ContinuationListener listener)
{
if (this._listeners == null)
this._listeners = new ArrayList();
this._listeners.add(listener);
}
public Object getAttribute(String name)
{
return this._request.getAttribute(name);
}
public void removeAttribute(String name)
{
this._request.removeAttribute(name);
}
public void setAttribute(String name, Object attribute)
{
this._request.setAttribute(name, attribute);
}
public void undispatch()
{
if (isSuspended())
{
if (ContinuationFilter.__debug)
throw new ContinuationThrowable();
throw __exception;
}
throw new IllegalStateException("!suspended");
}
}

所以遇到这个问题的朋友不必担心。我使用多台机器测试过。服务器的servlet轮询队列是很长的。但对于单独用户来说只能接受6个suspend.在发送长连接挂起请求。他们加入到你所处的用户等待列表。当你挂起的6个连接中有一个被释放了。你的第7个连接将被挂起。

之前用了webbench做压测。

不管你怎么测非阻塞技术的长连接。你都会被jetty拒之门外。。。

就是因为这个做了限制。这样的限制大大降低对服务器的压力。