socketref,再见!高德

https://github.com/adoggie

  C++博客 :: 首页 :: 联系 :: 聚合  :: 管理
  246 Posts :: 4 Stories :: 312 Comments :: 0 Trackbacks

常用链接

留言簿(54)

我参与的团队

搜索

  •  

最新评论

阅读排行榜

评论排行榜

贴代码 
   1 #--coding:utf-8--
   2 
   3 
   4 import os,os.path,sys,struct,time,traceback,signal,threading,copy
   5 import base64
   6 
   7 from datetime import datetime
   8 from base import *
   9 import tcelib as tce
  10 from showbox import *
  11 import utils.misc
  12 import utils.config
  13 # import django.showbox
  14 
  15 #sys.path.insert(0,r'F:\projects\shoebox\database\showbox')
  16 os.environ.setdefault("DJANGO_SETTINGS_MODULE""database.showbox.showbox.settings")
  17 
  18 from django.db import connection
  19 from django.db.models import Sum
  20 from django.db import transaction
  21 # import sns.core.models as cm
  22 import database.showbox.core.models as  core
  23 import database.showbox.nosql.models as nosql
  24 import msgentity
  25 import utils
  26 
  27 
  28 # sys.exit()
  29 
  30 # def dumpMsg(m):
  31 #     '''
  32 #         from MimeMessage_t to mongo-json object {}
  33 #     '''
  34 #     r = {'sender_id':0,
  35 #             'target_id':0,
  36 #         'team_id':0,
  37 #         'issue_time':datetime.now(),
  38 #         'ack_time':None,
  39 #         'status':SendMsgStatus.UNACKED,
  40 #         'level':SendMsgLevel.DURABLE,
  41 #         'type':m.type,
  42 #         'text':m.text.text,
  43 #         'im_type':m.image.type,
  44 #         'im_width':m.image.width,
  45 #         'im_height':m.image.height,
  46 #         'im_data':base64.encodestring(m.image.data),
  47 #         'au_type':m.audio.type,
  48 #         'au_sample':0,
  49 #         'au_channel':0,
  50 #         'au_timelen':m.audio.timelen,
  51 #         'au_data':base64.encodestring(m.audio.data)
  52 #         }
  53 #     return r
  54 #
  55 # def loadMsg(d):
  56 #     '''
  57 #          从db对象还原为 MimeMessage_t()
  58 #          '''
  59 #     m = MimeMessage_t()
  60 #     m.type = d['type']
  61 #     m.text.type = 1
  62 #     m.text.text = d['text']
  63 #     m.timesec = time.mktime(d['issue_time'].timetuple())
  64 #     m.sender_id = d['sender_id']
  65 #
  66 #     m.image.type = d['im_type']
  67 #     m.image.width = d['im_width']
  68 #     m.image.height = d['im_height']
  69 #     data = d['im_data']
  70 #     if data:
  71 #         m.image.data = base64.decodestring(data)
  72 #
  73 #     m.audio.type = d['au_type']
  74 #     data = d['au_data']
  75 #     if data:
  76 #         m.audio.data = base64.decodestring(data)
  77 #     return m
  78 
  79 
  80 
  81 
  82 
  83 
  84 
  85 
  86 #class MessagingServiceImpl(IMessageServer,IUserTeamServer,IUserEventListener):
  87 class MessagingServiceImpl(IMessageServer,IUserEventListener):
  88     def __init__(self,app):
  89         IMessageServer.__init__(self)
  90         #IUserTeamServer.__init__(self)
  91         IUserEventListener.__init__(self)
  92         self.app = app
  93         self.serviceprxlist={}  #服务器代理对象缓存
  94         self.usergws = {}
  95 
  96 
  97     def onUserExitTeam(self,userid,teamid,ctx):
  98         '''
  99             将用户退出群的消息发送给群内其他用户
 100         '''
 101 
 102         ents = core.UserTeam.objects.get(id = int(teamid)).teamuser_set.exclude(user__id=int(userid) )
 103         for en in ents:
 104             nf = nosql.Notification()
 105             nf.sid = utils.misc.genUUID()
 106             nf.sender_id = userid
 107             nf.target_id = str(en.user.id)
 108             nf.type =NotifyMsgType.ExitTeam
 109             nf.p1 = str(teamid)
 110             nf.save()
 111             prx = self.__getTerminalProxyByUserId(nf.target_id)
 112             if prx:
 113                 prx.onTeamUserLeave_oneway(userid,teamid)
 114 
 115 
 116 
 117     def onUserIntoTeam(self,userid,teamid,ctx):
 118         '''
 119             用户加入群之后通知群内其他用户
 120         '''
 121         ents = core.UserTeam.objects.get(id = int(teamid)).teamuser_set.exclude(user__id=int(userid) )
 122         for en in ents:
 123             nf = nosql.Notification()
 124             nf.sid = utils.misc.genUUID()
 125             nf.sender_id = userid
 126             nf.target_id = str(en.user.id)
 127             nf.type =NotifyMsgType.JoinTeam
 128             nf.p1 = str(teamid)
 129             nf.save()
 130             prx = self.__getTerminalProxyByUserId(nf.target_id)
 131             if prx:
 132                 prx.onTeamUserJoinin_oneway(userid,teamid)
 133 
 134     def onUserDismissTeam(self,userid,team_id,ctx):
 135         '''
 136             用户群集散,通知群内所有用户
 137         '''
 138         ents = core.UserTeam.objects.get(id = int(team_id)).teamuser_set.exclude(user__id=int(userid) )
 139         for en in ents:
 140             nf = nosql.Notification()
 141             nf.sid = utils.misc.genUUID()
 142             nf.sender_id = userid
 143             nf.target_id = str(en.user.id)
 144             nf.type =NotifyMsgType.DestroyTeam
 145             nf.p1 = str(team_id)
 146             nf.save()
 147             prx = self.__getTerminalProxyByUserId(nf.target_id)
 148             if prx:
 149                 prx.onTeamDismiss_oneway(team_id)
 150 
 151     def onUserOnline(self,userid,gws_id,device,ctx):
 152         print 'onUserOnline..',userid,gws_id
 153         userid = int(userid)
 154         self.usergws[userid] = gws_id
 155         #保存用户状态到数据表
 156         user = core.User.objects.get(id = int(userid))
 157         user.status = UserStatus.Online
 158         user.save()
 159         #传递未发送消息到前端用户
 160         self._sendPendingMsgToUser(userid)
 161         self._sendNotifications(userid)
 162 
 163         self._sendPendingInvitationToInvitee(userid)
 164         self._sendPendingInvitationActToInviter(userid)
 165         self._notifyUserStatusChanged(userid,UserStatus.Online)
 166 
 167 
 168     def onUserOffline(self,userid,gws_id,device,ctx):
 169         print 'onUserOffline..'
 170         userid = int(userid)
 171         gws = self.usergws.get(userid)
 172         if gws != None:
 173             del self.usergws[userid]
 174         user = core.User.objects.get(id = int(userid))
 175         user.status = UserStatus.Offline
 176         user.save()
 177         #通知所有用户告知本人离线
 178         self._notifyUserStatusChanged(userid,UserStatus.Offline)
 179 
 180 
 181     def inviteUser(self,target_id,whisper,ctx):
 182         '''
 183             发送加入好友请求
 184         '''
 185         userid = USER_ID(ctx)
 186         target_id = int(target_id)
 187         res = CallReturn()
 188         try:
 189             target_id = int(target_id)
 190             user = core.User.objects.get(id=userid)
 191             if core.UserRelation.objects.filter(user__id=userid,friend__id=target_id):
 192                 return CallReturn(ErrorDefs.UserIsFriend)
 193             if core.User.objects.filter(id=target_id).count() ==0:
 194                 return CallReturn(ErrorDefs.UserIdNotExisted)
 195 
 196             # 1.write to invitation
 197             inv = nosql.Invitation()
 198             #inv.sid = utils.misc.genUUID()
 199             inv.sender_id = userid
 200             inv.target_id = target_id
 201             inv.greeting = whisper
 202             inv.sender_name = user.name
 203             inv.send_type = InviteSendType.SysInternal
 204             inv.confirm_result = InvitationResult.NO_ACK
 205 
 206             if not inv.sender_name:
 207                 inv.sender_name = user.user #替换合适的名称
 208             inv.save()
 209             # 2.send to PeerUser
 210             prx = self.__getTerminalProxyByUserId(target_id)
 211             if  prx:
 212                 prx.onInviteRequest_oneway( str(inv.id),str(userid),whisper,CALL_USER_ID(target_id))
 213         except :
 214             print traceback.print_exc()
 215             res = CallReturn(ErrorDefs.InternalException)
 216         return res
 217 
 218     def inviteReject(self,seq,reason,ctx):
 219         try:
 220             userid = USER_ID(ctx)
 221             seq = int(seq)
 222             inv = nosql.Invitation.objects.get(id=seq,target_id=userid)
 223             inv.confirm_time = datetime.now()
 224             inv.confirm_result = InvitationResult.REJECT
 225             inv.save()
 226             prx = self.__getTerminalProxyByUserId(inv.sender_id)
 227             if prx:
 228                 prx.onInviteReject_oneway(seq,inv.target_id,CALL_USER_ID(inv.sender_id))
 229         except:
 230             traceback.print_exc()
 231 
 232     def inviteAccept(self,seq,ctx):
 233         '''
 234             B 接收到邀请之后 发送接受加为好友的答复到服务器
 235         '''
 236         try:
 237             userid = USER_ID(ctx)
 238             inv = nosql.Invitation.objects.get(id=int(seq),target_id=userid)
 239             inv.confirm_time = datetime.now()
 240             inv.confirm_result = InvitationResult.ACCEPT
 241             print inv,inv.id,inv.issue_time
 242             inv.save()
 243             #接着记录两者的关系  ,表  UserRelation
 244             user1 = core.User.objects.get(id=inv.sender_id)
 245             user2 = core.User.objects.get(id=inv.target_id)
 246             rel = core.UserRelation()
 247             rel.user =user1
 248             rel.friend = user2
 249             rel.save()
 250 
 251             rel = core.UserRelation()
 252             rel.user =user2
 253             rel.friend = user1
 254             rel.save()
 255 
 256             prx = self.__getTerminalProxyByUserId(inv.sender_id)
 257             if prx:
 258                 prx.onInviteAccept_oneway(seq,inv.target_id,CALL_USER_ID(inv.sender_id))
 259         except:
 260             traceback.print_exc()
 261 
 262     # def getMessageLogList(self,target_id,type,timerange,ctx):
 263     #     return QueryMessageLogResult_t()
 264 
 265     def inviteResultConfirm(self,seq,ctx):
 266         '''
 267             A->S->B
 268             B->S->A
 269             A->S
 270         '''
 271         try:
 272             userid = USER_ID(ctx)
 273             inv = nosql.Invitation.objects.get(id=int(seq))
 274             inv.issuer_confirm_time = datetime.now()
 275             inv.save()
 276         except:
 277             traceback.print_exc()
 278 
 279     def nofityConfirm(self,seq_id,ctx):
 280         '''
 281             除了消息发送、邀请之外的处理类型,需要确认接收到了,必须走此接口
 282         '''
 283         userid = USER_ID(ctx) #当前用户编号
 284         e = nosql.Notification.objects.get(id=int(seq_id),target_id=int(userid))
 285         e.confirm_time = datetime.now()
 286         e.save()
 287 
 288     def _sendNotifications(self,userid):
 289         userid = int(userid)
 290         prx = self.__getTerminalProxyByUserId(userid)
 291         #提取未发送的通知消息
 292         ents = nosql.Notification.objects.filter(target_id=int(userid),confirm_time=None).order_by('issue_time')
 293 
 294         for e in ents:
 295             nm = NotifyMessage_t()
 296             nm.seq = str(e.id)
 297             nm.issuer = str(e.sender_id)
 298             nm.type = e.type
 299             nm.issue_time = utils.misc.maketimestamp(e.issue_time)
 300             nm.p1 = e.p1
 301             nm.p2 = e.p2
 302             prx.onNotifyMessage_oneway( nm.issuer,nm,CALL_USER_ID(userid))
 303 
 304 
 305     def _sendPendingMsgToUser(self,userid):
 306         '''
 307             发送未传送的消息到终端用户
 308         '''
 309         print '_sendPendingMsgToUser..',userid
 310         ents = nosql.SendMessage.objects.filter(target_id=userid,confirm_result=SendMsgStatus.UNACKED).order_by('issue_time')
 311         prx = self.__getTerminalProxyByUserId(userid)
 312         if not prx:
 313             return
 314 
 315         for e in ents:
 316 
 317             m = MimeText_t()
 318             m.seq =  e.id
 319             m.text = e.content
 320             m.issue_time = utils.misc.maketimestamp(e.issue_time)
 321             m.type = e.type
 322             m.entities = e.entities
 323             if e.team_id ==0:
 324                 prx.onMessageText_oneway(str(e.sender_id),m,CALL_USER_ID(userid))
 325             else#传递到用户组
 326                 prx.onTeamMessageText_oneway(str(e.sender_id),e.team_id,m,CALL_USER_ID(userid))
 327 
 328 
 329     def _sendPendingInvitationToInvitee(self,invitee_id):
 330         '''
 331             发送邀请,传送到前端好友
 332         '''
 333         print '_sendPendingInvitationToInvitee..'
 334         invitee_id = int(invitee_id)
 335         ents = nosql.Invitation.objects.filter(target_id=invitee_id,
 336                                                confirm_result = InvitationResult.NO_ACK,
 337                                                send_type = InviteSendType.SysInternal)
 338         prx = self.__getTerminalProxyByUserId(invitee_id)
 339         if not prx: return
 340         print ents
 341         for e in ents:
 342             prx.onInviteRequest_oneway( str(e.id),str(e.sender_id),str(e.greeting),CALL_USER_ID(invitee_id))
 343 
 344     def _sendPendingInvitationActToInviter(self,userid):
 345         '''
 346             被邀请者接收到邀请,并提交到服务器。并将提交结果转发给邀请发送者
 347         '''
 348         userid = int(userid)
 349         ents = nosql.Invitation.objects.filter(sender_id=userid,issuer_confirm_time = None,
 350                                                send_type = InviteSendType.SysInternal).\
 351                                             exclude(confirm_result=InvitationResult.NO_ACK)
 352         prx = self.__getTerminalProxyByUserId(userid)
 353         if not prx: return
 354         for e in ents:
 355             if e.confirm_result == InvitationResult.ACCEPT:
 356                 prx.onInviteAccept_oneway(e.id,e.target_id,CALL_USER_ID(userid))
 357             if e.confirm_result == InvitationResult.REJECT:
 358                 prx.onInviteReject_oneway(e.id,e.target_id,CALL_USER_ID(userid))
 359 
 360 
 361     def _sendPendingJoinTeamRequestToTeamOwner(self,userid):
 362         '''
 363             将请求加入隶属B的群的消息,发送给B
 364         '''
 365         userid = int(userid)
 366         rs = nosql.JoinTeam.objects.filter(owner_id = userid,issuer_confirm_time = None,
 367                                            confirm_result=JoinTeamResult.NO_ACK)
 368         
 369 
 370 
 371     def _notifyUserStatusChanged(self,userid,status):
 372         '''
 373             通知user的好友或者加入群内的用户,本人状态改变
 374             此消息不用持久化
 375         '''
 376         user = core.User.objects.get(id = userid)
 377         entries = user.userfriend_set.all()
 378         for en in entries:
 379             target_id = en.friend.id
 380             prx = self.__getTerminalProxyByUserId(target_id)
 381             if not prx:
 382                 print 'prx is None ,user_id: %s not online!'%target_id
 383                 continue
 384             nm = NotifyMessage_t()
 385             nm.seq = str(0)
 386             nm.issuer = str(userid)
 387             nm.type_ = NotifyMsgType.UserStatusChanged
 388             nm.p1 = status
 389             nm.issue_time = int(time.time())
 390             prx.onNotifyMessage_oneway(nm.issuer,nm,CALL_USER_ID(target_id))
 391 
 392     #------------- IUserTeamServer --------------------------------------
 393 
 394     def sendTeamMessageText(self,team_id,text,ctx):
 395         print 'sendTeamMessageText..',team_id
 396 
 397         userid = USER_ID(ctx)
 398         #群内非本人所有朋友记录, 群主不在 teamrelation表哦,
 399         users=[]
 400         team_id = team_id
 401 
 402         team = core.UserTeam.objects.get(id=team_id)
 403         if team.user.id != userid: #不是本人的team,把team的owner也加入群发数组
 404             users.append(team.user)
 405 
 406         rs = core.TeamRelation.objects.filter(team__id=int(team_id)).exclude(user__id=userid)
 407         for r in rs:
 408             users.append(r.user)
 409 
 410         for r in users:
 411             m = nosql.SendMessage()
 412             m.sender_id = userid
 413             m.target_id = r.id
 414             m.team_id = int(team_id)
 415             m.type = text.type_
 416             m.level = SendMsgLevel.DURABLE
 417             m.content = text.text  #to see MineText_t
 418             m.entities = text.entities
 419             m.save()
 420             text.seq = m.id
 421             self._ripMessageContent(m,text)
 422 
 423             #查询目标用户是否接入到某个gws,并将消息传送过去
 424             prx = self.__getTerminalProxyByUserId(m.target_id)
 425             print 'prx is:',prx
 426             if prx :
 427                 prx.onTeamMessageText_oneway(str(userid),team_id,text,CALL_USER_ID(m.target_id))
 428 
 429 
 430 
 431     #---------------- IMessageServer -----------------------------------
 432     def sendMessageText(self,target_id,text,ctx):
 433         print 'sendMessageText:', target_id,text.text
 434         userid = USER_ID(ctx)
 435         m = nosql.SendMessage()
 436         #m.sid = utils.misc.genUUID()
 437         m.sender_id = userid
 438         m.target_id = int(target_id)
 439         m.team_id = 0
 440         m.type = text.type_
 441         m.level = SendMsgLevel.DURABLE
 442         m.content = text.text  #to see MineText_t
 443         m.entities = text.entities
 444         m.save()
 445         self._ripMessageContent(m,text)
 446 
 447         text.seq = m.id
 448         #查询目标用户是否接入到某个gws,并将消息传送过去
 449         prx = self.__getTerminalProxyByUserId(target_id)
 450         print 'prx is:',prx
 451         if prx :
 452             prx.onMessageText_oneway(str(userid),text,CALL_USER_ID(target_id))
 453 
 454 
 455     def _ripMessageContent(self,dbobj,m):
 456         '''
 457         抽取消息文本中各个entities,并写入db
 458         包括 图像、音频 适当调整和处理
 459         图像等新记录会写到 Message的text字段,内包含指向资源编号
 460         MimeText内的image和audio数据存放在 MimeText.datas数据内,一次摆放
 461         '''
 462         enset = msgentity.MessageEntitySet.parse(m.text)
 463         if not enset:
 464             print 'MessageEntitiySet parse failed.'
 465             return
 466         idx = 0
 467         print repr(m.datas)
 468         try:
 469             for en in enset.entities:
 470                 if isinstance(en,msgentity.ImageEntity) or \
 471                     isinstance(en,msgentity.AudioEntity):
 472                     en.content = m.datas[idx]
 473 
 474                 if isinstance(en,msgentity.ImageEntity):
 475                     r = nosql.ImageEntity()
 476                     r.msg_id = dbobj.id
 477                     r.image_type = 0
 478                     r.width = en.width
 479                     r.height = en.height
 480                     r.content = utils.misc.encodeBase64( m.datas[idx] )
 481                     print r.content
 482                     r.save()
 483                     en.id = r.id #记录image记录编号
 484                     en.content=''
 485 
 486                 if isinstance(en,msgentity.AudioEntity):
 487                     r = nosql.AudioEntity()
 488                     r.msg_id = dbobj.id
 489                     r.duration = en.duration
 490                     r.content = utils.misc.encodeBase64(m.datas[idx])
 491                     r.save()
 492                     en.id = r.id
 493                     en.content = ''
 494 
 495                 if isinstance(en,msgentity.ImageEntity) or \
 496                     isinstance(en,msgentity.AudioEntity):
 497                     idx+=1
 498 
 499             #更新消息的文本内容
 500             dbobj.content = enset.toJson()
 501             dbobj.save()
 502         except:
 503             traceback.print_exc()
 504     
 505     #def sendMessageImage(self,target_id,image,ctx):
 506     #    userid = USER_ID(ctx)
 507     #    m = nosql.SendMessage()
 508     #    m.sid = utils.misc.genUUID()
 509     #    m.sender_id = userid
 510     #    m.target_id = int(target_id)
 511     #    m.type = MimeMessageType.IMAGE
 512     #    m.level = SendMsgLevel.DURABLE
 513     #    m.im_type = image.type
 514     #    m.im_width = image.width
 515     #    m.im_height = image.height
 516     #    m.im_content = base64.encodestring(image.data)
 517     #    m.save()
 518     #    #查询目标用户是否接入到某个gws,并将消息传送过去
 519     #    prx = self.__getTerminalProxyByUserId(userid)
 520     #    if prx :
 521     #        prx.onMessageImage_oneway(str(userid),image,m.sid,CALL_USER_ID(target_id))
 522     #
 523     #def sendMessageAudioClip(self,target_id,clip,ctx):
 524     #    userid = USER_ID(ctx)
 525     #    m = nosql.SendMessage()
 526     #    m.sid = utils.misc.genUUID()
 527     #    m.sender_id = userid
 528     #    m.target_id = int(target_id)
 529     #    m.type = MimeMessageType.AUDIO
 530     #    m.level = SendMsgLevel.DURABLE
 531     #    m.au_channel = clip.channel
 532     #    m.au_duration = clip.timelen
 533     #    m.au_sample = clip.samples
 534     #    m.au_type = clip.type
 535     #    m.au_content = base64.encodestring(clip.data)
 536     #    m.save()
 537     #    #查询目标用户是否接入到某个gws,并将消息传送过去
 538     #    prx = self.__getTerminalProxyByUserId(userid)
 539     #    if prx :
 540     #        prx.onMessageAudioclip_oneway(str(userid),clip,m.sid,CALL_USER_ID(target_id))
 541 
 542 
 543     def sendMessageConfirm(self,seq_id,ctx):
 544         '''
 545             B 接收到消息之后发送 确认消息,
 546             否则系统将定时重发当初的消息或者当B再次在线online时被推送到B
 547         '''
 548         print 'sendMessageConfirm..''seq_id:',seq_id
 549         userid = USER_ID(ctx)
 550         m = nosql.SendMessage.objects.get(id=int(seq_id),target_id=int(userid))
 551         m.confirm_time = datetime.now()
 552         m.confirm_result = SendMsgStatus.ACKED
 553         m.save()
 554 
 555     #def sendLocation(self,target_id,gps,ctx):
 556     #    '''
 557     #        转发gps坐标到目标用户
 558     #    '''
 559     #    userid = USER_ID(ctx)
 560     #    prx = self.__getTerminalProxyByUserId(target_id)
 561     #    if prx:
 562     #        prx.onLocation_oneway(str(userid),gps,CALL_USER_ID(target_id))
 563     #
 564 
 565 
 566     def sendFilePrepare(self,type_,target_id,info,ctx):
 567         return CallReturn_t()
 568 
 569     def putFilePrepare(self,info,path,category,ctx):
 570         return CallReturn_t()
 571 
 572     def getFilePrepare(self,file_id,ctx):
 573         return CallReturn_t()
 574 
 575     def getFileServerUri(self,seq_id,ctx):
 576         return ServiceURI_t()
 577 
 578     def retrieveFileFinished(self,seq_id,ctx):
 579         pass
 580 
 581     def retrieveFileAccept(self,seq_id,ctx):
 582         pass
 583 
 584     def retrieveFileFileReject(self,seq_id,reason,ctx):
 585         pass
 586 
 587     def queryUserMessage(self,target_id,type_,timerange,limit,ctx):
 588         return UserMessageQueryResult_t()
 589 
 590     #-------------  TEAM  operations ------------------
 591     def requestJoinTeam(self, team_id, identity, ctx):
 592         '''
 593             用户请求加入指定的群
 594         '''
 595         userid = USER_ID(ctx)
 596         cr = CallReturn()
 597         try:
 598             team_id = int(team_id)
 599             team = core.UserTeam.objects.get(id = team_id)
 600             ownerid = team.user.id
 601             if ownerid == userid:  #自己加入自己的群,此操作非法
 602                 return CallReturn(ErrorDefs.TargetInvalid,msg='user_id equals team-owner!')
 603             jt = nosql.JoinTeam()
 604             jt.sender_id = userid
 605             jt.target_id = team_id
 606             jt.owner_id = ownerid
 607             jt.greeting = identity
 608             jt.sender_name = team.user.name
 609             jt.confirm_result = JoinTeamResult.NO_ACK
 610             jt.save()
 611             #立马是发送给群拥有者
 612             prx = self.__getTerminalProxyByUserId(ownerid)
 613             if  prx:
 614                 prx.onJoinTeamRequest_oneway( str(jt.id),team_id,str(userid),identity,CALL_USER_ID(ownerid))
 615         except:
 616             traceback.print_exc()
 617             cr = CallReturn(ErrorDefs.InternalException)
 618         return cr
 619 
 620     def joinTeamResultConfirm(self, seq, ctx):
 621         '''
 622             请求加入群的用户收到 accept,reject之后发送确认信息
 623         '''
 624         try:
 625             userid = USER_ID(ctx)
 626             jt = nosql.JoinTeam.objects.get(id = int(seq))
 627             jt.issuer_confirm_time = datetime.now()
 628             jt.save()
 629         except:
 630             traceback.print_exc()
 631 
 632 
 633     def removeTeamUser(self, team_id, friend_id, ctx):
 634         '''
 635             群创建者直接删除群内用户
 636         '''
 637         cr = CallReturn()
 638         try:
 639             userid = USER_ID(ctx)
 640             team_id = int(team_id)
 641             friend_id = int(friend_id)
 642             if core.UserTeam.objects.filter(user__id=userid,id = team_id).count() ==0:
 643                 return CallReturn_Error(ErrorDefs.TargetObjectNotExisted,msg='team id  invalid!')
 644             if core.TeamRelation.objects.filter(user__id=friend_id,team__id=team_id).count() == 0:
 645                 return CallReturn_Error(ErrorDefs.TargetObjectNotExisted,msg='friend or team unmatched!')
 646 
 647             #通知群内用户 指定用户离开群
 648             users = self._getUsersInTeam(team_id)
 649             for user in users:
 650                 #写入数据库
 651                 nf = nosql.Notification()
 652                 nf.sender_id = userid
 653                 nf.target_id = user.id
 654                 nf.type = NotifyMsgType.TeamUserLeave #新成员加入
 655                 nf.p1 = team_id
 656                 nf.p2 = friend_id
 657                 nf.save()
 658                 #发送到群用户
 659                 prx = self.__getTerminalProxyByUserId(user.id)
 660                 if prx:
 661                     prx.onNotifyMessage_oneway(userid,nf,CALL_USER_ID(user.id))
 662             #删除数据库记录
 663             rs = core.TeamRelation.filter(user__id = friend_id,team__id=team_id)
 664             rs.delete()
 665         except:
 666             traceback.print_exc()
 667             cr = CallReturn_Error(ErrorDefs.InternalException)
 668         return cr
 669 
 670     def addTeamUser(self, team_id, friend_id, ctx):
 671         '''
 672             群创建者直接将好友加入指定的群
 673 
 674         '''
 675 
 676         cr = CallReturn()
 677         try:
 678             userid = USER_ID(ctx)
 679             team_id = int(team_id)
 680             friend_id = int(friend_id)
 681             rs = core.UserTeam.objects.filter(id=team_id)
 682             if not rs:
 683                 return CallReturn_Error(ErrorDefs.TargetInvalid,msg='team id do not existed!')
 684             team = rs[0]
 685             rs = core.UserRelation.objects.filter(user__id=userid,friend__id=friend_id).count()
 686             if not rs:
 687                 return CallReturn_Error(ErrorDefs.TargetInvalid,msg='team id or friend id not matched!')
 688             friend = rs[0].friend
 689             rel = core.TeamRelation()
 690             rel.user = friend
 691             rel.team = team
 692             rel.save()
 693 
 694             users = self._getUsersInTeam(team_id)
 695             #发送通知到群成员
 696             for user in users:
 697                 #写入数据库
 698                 nf = nosql.Notification()
 699                 nf.sender_id = userid
 700                 nf.target_id = user.id
 701                 nf.type = NotifyMsgType.TeamUserEnter #新成员加入
 702                 nf.p1 = team_id
 703                 nf.p2 = friend_id
 704                 nf.save()
 705                 #发送到群用户
 706                 prx = self.__getTerminalProxyByUserId(user.id)
 707                 if prx:
 708                     prx.onNotifyMessage_oneway(userid,nf,CALL_USER_ID(user.id))
 709 
 710         except:
 711             traceback.print_exc()
 712             cr = CallReturn_Error(ErrorDefs.InternalException)
 713         return cr
 714 
 715     def dismissTeam(self, team_id, ctx):
 716         '''
 717             群创建者解散群
 718         '''
 719         cr = CallReturn()
 720         userid = USER_ID(ctx)
 721         try:
 722             team_id = int(team_id)
 723             rs = core.UserTeam.objects.filter(id=team_id,user__id=userid)
 724             if not rs:
 725                 return CallReturn_Error(ErrorDefs.TargetInvalid,'team object not existed!')
 726             team = rs[0]
 727             users = self._getUsersInTeam(team_id)
 728 
 729             #发送通知到群成员
 730             for user in users:
 731                 #写入数据库
 732                 nf = nosql.Notification()
 733                 nf.sender_id = userid
 734                 nf.target_id = user.id
 735                 nf.type = NotifyMsgType.DismissTeam
 736                 nf.p1 = team_id
 737                 nf.save()
 738                 #发送到群用户
 739                 prx = self.__getTerminalProxyByUserId(user.id)
 740                 if prx:
 741                     prx.onNotifyMessage_oneway(userid,nf,CALL_USER_ID(user.id))
 742 
 743             rs.delete() #删除群内成员
 744             team.delete()   #删除群记录
 745         except:
 746             traceback.print_exc()
 747             cr = CallReturn_Error(ErrorDefs.InternalException)
 748         return cr
 749 
 750 
 751 
 752 
 753     def _getUsersInTeam(self,team_id,exclusive=0):
 754         '''
 755             获取群内所有成员用户,包含群主
 756         '''
 757         users=[]
 758         try:
 759             team  =core.UserTeam.objects.get(id=int(team_id))
 760             if exclusive != team.user.id:
 761                 users.append(team.user)
 762             rs = core.TeamRelation.objects.filter(team__id=int(team_id))
 763             for r in rs:
 764                 if exclusive != r.user.id:
 765                     users.append(r.user)
 766         except:
 767             traceback.print_exc()
 768         return users
 769 
 770     def exitTeam(self, team_id, ctx):
 771         '''
 772             群用户退出将通知群内其他用户(群创建者不允许)
 773         '''
 774         cr = CallReturn()
 775         try:
 776             userid = USER_ID(ctx)
 777             team_id = int(team_id)
 778             #群主不能退出自己创建的群
 779             if core.UserTeam.objects.filter(user__id= userid,id = team_id).count():
 780                 return CallReturn_Error(ErrorDefs.IllegalOperation,msg='the team belong to curernt user!')
 781             rs = core.TeamRelation.objects.filter(user__id=userid,team__id = team_id)
 782             if not rs:
 783                 return CallReturn_Error(ErrorDefs.IllegalOperation,msg='user not in team !')
 784             rs.delete()
 785             #通知群内其他人
 786             users = self._getUsersInTeam(team_id)
 787             for user in users:
 788                 nt = nosql.Notification()
 789                 nt.sender_id = userid
 790                 nt.target_id = user.id
 791                 nt.type = NotifyMsgType.TeamUserLeave
 792                 nt.p1 = team_id
 793                 nt.p2 = userid
 794                 nt.save() #完成通知存储
 795 
 796                 #发送通知前端客户
 797                 prx = self.__getTerminalProxyByUserId(user.id)
 798                 if prx:
 799                     prx.onNotifyMessage_oneway(userid,nt,CALL_USER_ID(user.id))
 800         except:
 801             traceback.print_exc()
 802             cr = CallReturn_Error(ErrorDefs.InternalException)
 803         return cr
 804 
 805 
 806     def joinTeamAccept(self, seq, ctx):
 807         try:
 808             userid = USER_ID(ctx)
 809             jt = nosql.JoinTeam.objects.get(id = int(seq))
 810             jt.confirm_time = datetime.now()
 811             jt.confirm_result = JoinTeamResult.ACCEPT
 812             jt.save()
 813             if core.TeamRelation.objects.filter(team__id=jt.target_id,user__id=jt.sender_id).count() ==0:
 814                 user = core.User.objects.get(id=jt.sender_id)
 815                 team = core.UserTeam.objects.get(id=jt.target_id)
 816                 rel = core.TeamRelation()
 817                 rel.user = user
 818                 rel.team = team
 819                 rel.save()
 820             #通知请求用户,加入群okay
 821             prx = self.__getTerminalProxyByUserId(jt.sender_id)
 822             if prx:
 823                 prx.onJoinTeamAccept_oneway(seq,jt.target_id,userid,CALL_USER_ID(jt.sender_id))
 824 
 825             #通知群内所有其他用户,告知新用户进入
 826             users = self._getUsersInTeam(jt.target_id)
 827 
 828             for user in users:    #群内的所有好友
 829                 #1.写入通知表
 830                 nt = nosql.Notification()
 831                 nt.sender_id = jt.owner_id
 832                 nt.target_id = user.id
 833                 nt.type = NotifyMsgType.TeamUserEnter
 834                 nt.p1 = jt.target_id
 835                 nt.p2 = jt.sender_id
 836                 nt.save() #完成通知存储
 837 
 838                 #发送通知前端客户
 839                 prx = self.__getTerminalProxyByUserId(user.id)
 840                 if prx:
 841                     prx.onJoinTeamAccept_oneway(seq,jt.target_id,jt.owner_id,CALL_USER_ID(user.id))
 842         except:
 843             traceback.print_exc()
 844 
 845 
 846     def joinTeamReject(self, seq, reason, ctx):
 847         '''
 848             用户请求加入群,群主拒绝
 849         '''
 850         try:
 851             userid = USER_ID(ctx)
 852             jt = nosql.JoinTeam.objects.get(id = int(seq))
 853             jt.confirm_time = datetime.now()
 854             jt.confirm_result = JoinTeamResult.REJECT
 855             jt.save()
 856             #通知请求用户
 857             prx = self.__getTerminalProxyByUserId(jt.sender_id)
 858             if prx:
 859                 prx.onJoinTeamReject_oneway(seq,jt.target_id,userid,CALL_USER_ID(jt.sender_id))
 860         except:
 861             traceback.print_exc()
 862 
 863 
 864     #---------------- Others -----------------------------------
 865 
 866 
 867     def __getTerminalProxyByUserId(self,user_id):
 868         '''
 869             server_eps.conf 记录gws对应的接收rpc消息的endpoint名称,
 870             获取ep名称,通过RpcCommunicator.findEndpoints()得到ep
 871             ep.impl就是对应服务器接收消息的连接
 872         '''
 873         prx = None
 874         try:
 875             user_id = int(user_id)
 876             if not self.usergws.get(user_id):
 877                 return None  #user没有上线
 878             gws = self.usergws.get(user_id)
 879             prx = self.serviceprxlist.get(gws)
 880             if not prx:
 881                 cf = utils.config.SimpleConfig()
 882                 cf.load('server_eps.conf')
 883                 epname = cf.getValue(gws)
 884                 ep = tce.RpcCommunicator.instance().currentServer().findEndPointByName(epname)
 885                 prx = ITerminalPrx(ep.impl)
 886                 self.serviceprxlist[gws] = prx
 887             return prx
 888         finally:
 889             if not prx:
 890                 print 'user: %s is not online!'%user_id
 891 
 892 
 893 #     def sendMessage(self,targets,type,msg,ctx):
 894 #         '''
 895 #             消息进入mongodb,待用户发送ack之后才删除,否则等用户再次连接进入时发送给用户
 896 #             targets - id list
 897 #             type  - MsgTargetType
 898 #             msg - MimeMessage_t
 899 #         @deal :
 900 #             delivery msg to sender
 901 #         '''
 902 #         userid = USER_ID(ctx)
 903 # #        self.app.msgsync.sendMessage(targets,type,msg,userid)
 904 #         ids = map(int,targets)
 905 #         for id in ids:
 906 #             if type == MsgTargetType.USER:
 907 #                 self.app.sendMsgToUser(id,msg,userid)
 908 #             elif type == MsgTargetType.TEAM:
 909 #                 #发送到群
 910 #                 self.app.sendMsgToTeam(id,msg,userid)
 911 #
 912 #
 913 #     def sendLocation(self,targets,type,loc,ctx):
 914 #         userid = USER_ID(ctx)
 915 #         ids = map(int,targets)
 916 #         x,y = geotools.point_g2m(loc.gps.loc.lon,loc.gps.loc.lat)
 917 #         loc.gps.loc.lon = x
 918 #         loc.gps.loc.lat = y
 919 #         for id in ids:
 920 #             if type == MsgTargetType.USER:
 921 #                 self.app.sendLocationToUser(id,loc,userid)
 922 #             elif type == MsgTargetType.TEAM:
 923 #                 #发送到群
 924 #                 self.app.sendLocationToTeam(id,loc,userid)
 925 #             elif type == MsgTargetType.SYSTEM:
 926 #                 pass
 927 #         self.app.sendLocationToReciever(loc,userid)
 928 #
 929 #     def uploadLocation(self,loc,ctx):
 930 #         userid = USER_ID(ctx)
 931 #         pass
 932 #
 933 #     def sendMsgAck(self,seqs,ctx):
 934 #         #用户发送消息确认消息
 935 #         db = self.app.getNosqlDb()
 936 #         for seq in seqs:
 937 #             try:
 938 #                 #r = db.user_sendmessage.find_one({'_id':seq})
 939 #                 #if r:
 940 #                 db.user_sendmessage.update({'_id':ObjectId(seq)},
 941 #                         {'$set':{'status':1,'ack_time':datetime.now()}},
 942 #                         multi = True
 943 #                 )
 944 # #                r['status'] = 1
 945 # #                r['ack_time']= datetime.now()
 946 # #                r.save()
 947 #             except:
 948 #                 print traceback.format_exc()
 949 #
 950 #
 951 #     def createTalking(self,type,ctx):
 952 #         '''
 953 #             talkin建立发送talking创建到talkingserver
 954 #             用户根据talkingid链接到talkingserver
 955 #         '''
 956 #         return TalkingResource_t()
 957 #
 958 #     def inviteTalking(self,target_id,talking_id,ctx):
 959 #         pass
 960 #
 961 #     def inviteTalkingAccept(self,talking_id,ctx):
 962 #         pass
 963 #
 964 #     def inviteTalkingReject(self,talking_id,reason,ctx):
 965 #         pass
 966 #
 967 #     def initOnlineUserList(self,gwaid,useridlist,ctx):
 968 #         for uid in useridlist:
 969 #             self.app.userOnline(uid,gwaid)
 970 #     #--------  above interface implements ---
 971 #     #----- local functions ----
 972 #
 973 
 974 
 975 
 976 
 977 
 978 class ServerApp:
 979     def __init__(self):
 980         pass
 981 
 982     def getConfig(self):
 983         #return self.app.getConfig()
 984         pass
 985 
 986     _handle = None
 987     @classmethod
 988     def instance(cls):
 989         if cls._handle == None:
 990             cls._handle = cls()
 991         return cls._handle
 992 
 993     def run(self):
 994         tce.RpcCommunicator.instance().init('messageserver').initMessageRoute('./services.xml')
 995         server = tce.RpcCommunicator.instance().currentServer().findEndPointByName('mq_messageserver').impl
 996         conn = tce.RpcCommunicator.instance().currentServer().findEndPointByName('mq_user_event_listener').impl
 997         adapter  = tce.RpcAdapterEasyMQ.create('server',server)
 998         adapter.addConnection(conn)
 999         #没有主动发送消息的情形
1000         servant = MessagingServiceImpl(self)
1001         adapter.addServant(servant)
1002         tce.RpcCommunicator.instance().waitForShutdown()
1003 
1004 
1005 if __name__ == '__main__':
1006     ServerApp.instance().run()
1007     # print cm.User.objects.all()
1008     # print cm.User.objects.filter(user='abc')
1009 #    print dir(tce) #.mqset_inst
1010     pass #sys.exit( MainApp().run())
1011 
posted on 2013-12-07 01:54 放屁阿狗 阅读(732) 评论(0)  编辑 收藏 引用 所属分类: IM 系统

只有注册用户登录后才能发表评论。
【推荐】超50万行VC++源码: 大型组态工控、电力仿真CAD与GIS源码库
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理