Prayer

在一般中寻求卓越
posts - 1256, comments - 190, trackbacks - 0, articles - 0
  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

在vb.net中运用多线程实现远程数据收集

Posted on 2008-12-11 16:09 Prayer 阅读(2896) 评论(1)  编辑 收藏 引用 所属分类: SOCKET

作者:杨泉波

引言

    在笔者参与的四川省重点污染源企业环境远程监控系统中,有一项非常重要的工作:将多达80台的远程DVS(视频服务器)的监测数据通过因特网传输,由上位机收集上来,写入SQL Server 2005数据库中。远程数据每隔一分钟发送一次实时数据。如果数据在一分钟内传送不成功,那么DVS将认为网络已经断开,又要不断的发起新的连接。因此,上位机能不能及时的准确的收集、写入,是系统成败的关键。

项目分析

    80多台远程DVS正在不间断的采集数据,在网络正常的情况下,会不间断的向上位机发送数据。如果采用传统的单线程结构,上位机接受连接请求,接收处理数据,将数据写入数据库,然后再接受新的连接请求,接收处理数据,……,这样,上位机程序异常繁忙,CPU利用率几乎将达100%。由于服务器不能迅速处理请求,DVS只好等待。
    更为重要的是,为了减少上位机发送响应连接的次数,设备采用的是长连接,即发送一次连接请求并得到响应后,发送数据时不再发送连接请求。因此,要求上位机能够保存客户端的Socket。
为了避免这种情形发生。笔者采用了异步、多线程来处理。所谓异步,是程序调用一个方法后立即返回,总体而言,主线程与方法线程并行执行。而同步即程序执行一个方法,等该方法返回之后,继续往下走,本系统从功能上分成3个模块,即3个前后关联的线程:主线程、数据接收线程、存入数据库线程,它们异步执行。

主线程

    主线程工作流程如图一所示。其主要功能是:初始化参数,如连接端口号、IP地址等,侦听连接请求,将传入的连接保留到TcpClient对象数组sockets,而这个数组sockets恰恰是我们后面线程中要用到的全局变量。 为了不使线程间争用这个数组变量,这里用到了VB.net提供的Monitor类,它提供同步对象的访问的机制。
    当主线程侦听到远程DVS有连接请求时,立即执行AcceptTcpClient方法,创建一个TcpClient实例,并将它放入sockets数组。同时创建线程对象serverthread。
声明创建线程时,使用 ThreadStart 委托作为其唯一参数的构造函数创建 Thread 类的新实例,创建线程时需要传递处理连接的过程或函数的地址以被线程调用。创建线程委托,传递需要操作的过程的地址,这部分的代码如下所示:
Public Sub WaitData()
        Try
            Dim ipHostInfo As IPHostEntry = Dns.Resolve(Dns.GetHostName())
            Dim localAddr As IPAddress = ipHostInfo.AddressList(0)
            s = New TcpListener(localAddr, ListenPort)
            s.Start()’开始侦听连接请求
            Dim Recdatathread As New Thread(New ThreadStart(AddressOf RecDataProc)) ’创建数据接收线程
            Recdatathread.IsBackground = True
            Recdatathread.Start()’启动线程
            While True
                Dim client As TcpClient = s.AcceptTcpClient()
                Monitor.Enter(sockets) '在指定对象上获取排他锁
                sockets(socketcount) = client
                socketcount = socketcount + 1
                Monitor.Exit(sockets) '释放指定对象上的排他锁                 
            End While
        Catch e As SocketException
            s.Stop()
            saveErrLog(Date.Now, CType(s.AcceptTcpClient.Client.RemoteEndPoint, IPEndPoint).Address.ToString(), e.Message)’写入错误日志
        Catch e As ThreadAbortException
            t.Abort()
            saveErrLog(Date.Now, CType(s.AcceptTcpClient.Client.RemoteEndPoint, IPEndPoint).Address.ToString(), e.Message) ’写入错误日志
        Finally
            t.Abort()
            End
        End Try
End Sub

数据接收线程

    数据接收线程的工作流程如图二所示。主要功能是:将挂起连接的DVS上传数据从流中读取出来,创建数据写入线程,并在listbox中显示。
从保存的socket数组中读取字节流时,必须考虑以下问题:
一、有些DVS可能会在工作一段时间后发生设备故障或者网络中断,但服务器保存的是其历史socket,因此,必须判断其connect属性,即设备是否在线。
二、为了减少服务器的空等时间,必须判断流对象(stream)的DataAvailable属性。
三、创建线程saveToDb时,必须考虑传入参数的问题。通常的线程创建是不可提供参数的。我们将线程saveToDb的执行体封装到一个类中,通过初始化类的成员变量的方法,来达到传送参数的目的。
四、由于本线程是长驻内存并循环执行的。因此,应当在适当的地方阻止,否则,CPU的利用率将达几乎100%。
这部分的代码如下:
Public Sub RecDataProc()
        Dim i As Integer
        Dim c As TcpClient
        While (True)
            Try
                For i = 0 To socketList.Count - 1
          If socketList.Item(i).client.connected Then '如果该连接在线
             Dim dh1 As DelegateHandler = New DelegateHandler(AddressOf displayStatusBarPanel2)
'New 出一个委托并指定委托方法
                      Me.Invoke(dh1, New Object() {CStr(i)})  '调用invoke方法
                      c = socketList.Item(i)
                      Dim stream As NetworkStream = c.GetStream()
                      If stream.DataAvailable Then
                      Dim dh As DelegateHandler1 = New DelegateHandler1(AddressOf ShowInBox)
                      Dim readbuff As New ReadBuffClass(c, stream, Connection, dh) '由构造函数来初始化成员变量
                      ThreadPool.QueueUserWorkItem(New WaitCallback(AddressOf readbuff.ReadBuff), readbuff)’把具体从流中读取数据的工作交给线程池的线程来进行
                       Dim workerThreads, portThreads As Integer
                       ThreadPool.GetAvailableThreads(workerThreads, portThreads)
                       Dim dh2 As DelegateHandler = New DelegateHandler(AddressOf displayStatusBarPanel4)
'New 出一个委托并指定委托方法
                       Me.Invoke(dh2, New Object() {workerThreads.ToString})  '调用invoke方法
                    End If
                       Thread.Sleep(20) '如果不阻止,则CPU利用率将为100%
                    End If
                Next
             Catch ex As System.ArgumentOutOfRangeException
             Catch ex As System.InvalidOperationException
             Catch ex As ObjectDisposedException 'TcpClient 已关闭
             Catch ex As SocketException
             Catch ex As ThreadAbortException
             Catch ex As System.IO.IOException              
             Catch ex As System.AccessViolationException
             Finally
            End Try
        End While
End Sub

数据处理线程

    这部份线程每个都由线程池来调度运行。由于要接收线程参数,因此,线程本身被封装到一个类中,限于篇幅的原因,只描述类的结构。
Public Class ReadBuffClass
    Private sck As TcpClient
    Private ns As NetworkStream
    Private sqlcnn As SqlConnection
    Private delg As frmServerMain.DelegateHandler1
 
    Dim sqlcmd As SqlCommand
Dim sqlda As SqlDataAdapter
 
Public Sub New(ByVal sc As TcpClient, ByVal n As NetworkStream, ByVal cn As SqlConnection, ByVal dh As frmServerMain.DelegateHandler1) '由构造函数来初始化成员变量
        Me.sck = sc
        Me.ns = n
        Me.sqlcnn = cn
       Me.delg = dh
End Sub
 
Public Sub ReadBuff(ByVal state As Object) ' 线程的入口函数
        Dim datastring As String = ""
        ns.ReadTimeout = 100 '读取失败前经历的毫秒数
        Try
            While (True)
                Dim bytes(2048) As Byte
                ns.Read(bytes, 0, 2048)
                datastring = datastring + Encoding.ASCII.GetString(bytes)
                If datastring.IndexOf(vbCrLf) > 0 Then
                    Exit While
                End If
            End While
            delg.Invoke(datastring, sck) '通过委托的方式,将参数传给UI
            Dim tmparr() As String = datastring.Split("##")
            Dim i As Integer
            For i = 0 To tmparr.Length - 1
                If tmparr(i) <> "" Then
                    ProcessInfo(tmparr(i))
                End If
            Next
        Catch ex As System.AccessViolationException
        Catch ex As NotSupportedException
       Catch ex As ArgumentNullException
       Catch ex As ArgumentOutOfRangeException
       Catch ex As ObjectDisposedException
Catch ex As IO.IOException '
       Catch ex As SocketException
       Catch ex As ThreadAbortException
      Finally
   End Try
End Sub
 
Private Sub ProcessInfo(ByVal tmpString As String) '对收到的数据进行解析、处理
……
End Sub
……
End Class

结束语

    本文着重论述的是在VB2005的环境下,运用多线程异步实现远程DVS数据收集的原理,重点考虑的是怎样提高程序的反应速度,特别讨论了程序开发中的一些细节问题,对有志于从事远程临控系统开发的软件人员有一定的参考意义。
文中代码在windows2003+VB2005+SqlServer2005的环境下调试通过,现在正在使用。

Feedback

# re: 在vb.net中运用多线程实现远程数据收集  回复  更多评论   

2009-05-11 17:11 by anndy
看来博主是socket方面的编程的专家了,请问下“在vb.net中运用多线程实现远程数据收集”,是你的原创吗

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理