资源描述
推送系统实践(一)
最近由于项目需要做一个推送系统,借此机会对nodejs和pomelo有了一次实践,在此将整个实践过程记录并分享。
什么是pomelo
Pomelo 是基于 Node.js 的高性能、分布式游戏服务器框架。它包括基础的开发框架和相关的扩展组件(库和工具包),可以帮助你省去游戏开发枯燥中的重复劳动和底层逻辑的开发。Pomelo 不但适用于游戏服务器开发, 也可用于开发高实时 Web 应用,它的分布式架构可以使 Pomelo 比普通的实时 Web 框架扩展性更好。 更多详情参见
为什么选pomelo
初次接触pomelo也是项目关系,当时我们是要做一个实时在线对战游戏,当时选的就是pomelo,效果不错。所以在做推送系统的时候,也就把pomelo放到技术选型方案中,评估后觉得也比较适合,于是继续使用pomelo,下面进入正题。
pomelo安装
Ubuntu
a、安装nodejs 要求版本>0.8
b、到http://nodejs.org/获取源码包的下载地址,下载nodejs的gz包
wget http://nodejs.org/dist/v0.10.28/node-v0.10.28.tar.gz
c、解压编译安装
tar zxvf node-v0.10.28.tar.gz
cd node-v0.10.28
./configure
make
sudo make install
d、检查是否安装成功
#node -v
2. 安装g++ gcc等
apt-get install build-essential
3. 安装npm并设置成http连接
apt-get update
apt-get install npm
npm config set strict-ssl false
npm config set registry "http://registry.npmjs.org/"
npm install pomelo -g
详细安装过程可以参见
项目功能介绍
主要功能是实现针对移动端的广告实时推送系统,详细需求如下:
1. 用户状态维护
2. 在线用户实时推送
3. 对用户进行分类并打tag。
4. 挖掘用户特征及tag
5. 根据tag区分用户并制定推送策略,推送相应的广告。
举个例子解释一下,比如有如下场景:
两个用户A和B,其中A为男性,B为女性。- AB为用户, 性别为tag
广告库里面有两个广告C和D,C是汽车类的,D是化妆品类广告。- 广告
C是包段定向投放给男性,D是全网投放(无论男女都投放) - 推广策略
Tag – 从任意维度给用户打的一个标签。比如性别,年龄,地域等等
策略 – 广告检索的方法。 说白了就是如何从一堆的广告到当中根据什么样的算法去选取广告并进行推送。
那这里要实现的就是这样一个系统: 能够通过离线的数据挖掘对用户进行分类,从而打上不同的tag, 再将广告通过不同的推广策略推广给用户一个平台。
其中第4点,挖掘用户特征不在本文范围内,本系统着重在于实现一个可扩展的实时在线推送平台。
整体架构
关于如何实现可扩展的分布式系统及系统架构方面,比如在线用户登录及状态维护,如何实现分布式系统方面属于pomelo范畴,本文也不在重述,有兴趣的童鞋可以查看pomelo的官方文档,本文着重分享的是如何设计和实现业务系统架构。
从以上的例子分析其实可以看出,这里面其实有四个方面:用户,tag,策略和广告。
用户与tag是多对一的关系,一个用户可以有多个tag。
tag与策略是多对多的关系,一个策略可以应用在多个tag;一个tag也可以存在多个策略
tag通过pomelo的channel来实现,一个channel代表一个tag。
策略通过component来实现,一个component代表一个策略。
代码结构如下:
app
components
dao
mysql
domain
monitor
servers
area
connector
gate
其中,
components目录下面存放的是推广策略代码,一个推广策略是一个文件
dao目录存放数据存取,比如mysql
domain目录存放各种抽象类
monitor目录存放监控代码
servers目录存放的是三种类型的server:
a. area负责逻辑应用
b. connector负责用户连接维护
c. gate负责负载均衡
下面直接上代码。
负载均衡代码,gate/handler/gateHandler.js
var Code = require('../../../../../shared/code');
var dispatcher = require('../../../util/dispatcher');
var utils= require('../../../util/utils');
/**
* Gate handler that dispatch user to connectors.
*/
module.exports = function(app) {
return new Handler(app);
};
var Handler = function(app) {
this.app = app;
};
Handler.prototype.queryEntry = function(msg, session, next) {
var uid = msg.uid;
if(!uid) {
next(null, {code: Code.FAIL});
return;
}
var connectors = this.app.getServersByType('connector');
if(!connectors || connectors.length === 0) {
next(null, {code: Code.GATE.FA_NO_SERVER_AVAILABLE});
return;
}
var res = dispatcher.dispatch(uid, connectors);
next(null, {code: Code.OK, host: res.pubHost, port: res.clientPort});
};
其中dispatcher采用随机分流,
var crc = require('crc');
var utils = require('./utils')
module.exports.dispatch = function(uid, connectors) {
//var index = Number(uid) % connectors.length;
var index = Math.abs(crc.crc32(uid)) % connectors.length;
return connectors[index];
};
connector验证用户登录
connector/handler/entryHandler.js
var Code = require('../../../../../shared/code');
var logger = require('pomelo-logger').getLogger(__filename);
var https = require('https');
var async = require('async');
module.exports = function(app) {
return new Handler(app);
};
var Handler = function(app) {
this.app = app;
var httpConfig = app.get('httpConf');
this.option = {
hostname: httpConfig.avsHost,
port: httpConfig.avsPort,
path: httpConfig.playerValidate,
method: "POST",
headers: httpConfig.avsHeaders
};
//console.error(JSON.stringify(this.option));
};
/**
* New client entry.
*
* @param {Object} msg request message
* @param {Object} session current session object
* @param {Function} next next step callback
* @return {Void}
*/
Handler.prototype.playerValidate = function(uid, cb) {
var content = JSON.stringify({uid:uid});
var req = https.request(this.option, function(res){
logger.debug("playerValidate return code:" + res.statusCode);
cb(null, res.statusCode);
});
req.write(content);
req.end();
return;
}
Handler.prototype.entry = function(msg, session, next) {
var self = this;
var uid = msg.uid;
async.waterfall([function(cb) {
self.playerValidate(uid, cb);
},function(resCode, cb){
if(resCode !== Code.OK) {
next(null,{
code: Code.FAIL,
msg: "invalid user"
});
return;
}
var sessionService = self.app.get('sessionService');
//duplicate log in
if( !! sessionService.getByUid(uid)) {
next(null, {
code: Code.FAIL,
msg: 'duplicate login'
});
return;
}
session.bind(uid);
/**
session.set('gid', gid);
session.push('gid', function(err) {
if(err) {
logger.error('set gid for session service failed! error is : %j', err.stack);
next(null, {
code: Code.FAIL,
msg: 'set gid failed'
});
return;
}
});
**/
session.on('closed', onUserLeave.bind(null, self.app));
self.app.rpc.area.areaRemote.add(session, uid, self.app.get('serverId'),function(data){
next(null, {code: data.code, msg: data.msg});
return;
//cb(null, data.code);
});
return;
}], function(err) {
if (err) {
logger.error(err.stack);
}
});
};
/**
* User log out handler
*
* @param {Object} app current application
* @param {Object} session current session object
*
*/
var onUserLeave = function(app, session) {
var channelName = session.get('channelName');
if(!session || !session.uid || !channelName) {
logger.warn("invalid user kicked");
return;
}
app.rpc.area.areaRemote.kick(session, channelName, session.uid, function(data) {
if(data.code === Code.FAIL) {
logger.error('user leave error')
}
});
};
area/remote/areaRemote.js
/**
* Module dependencies
*/
var logger = require('pomelo-logger').getLogger(__filename);
var pomelo = require('pomelo');
var Queue = require('pomelo-collection').queue;
var Code = require('../../../../../shared/code');
module.exports = function(app) {
return new areaRemote(app, app.get('adPushService'));
};
var areaRemote = function(app, adPushService) {
this.app = app;
//this.self = this;
this.adPushService = adPushService;
};
areaRemote.prototype.add = function(uid, sid, cb){
this.adPushService.initService();
this.adPushService.add(uid, sid, cb);
return;
};
/**
areaRemote.prototype.restoreChannel = function(channelName, cb) {
this.gameService.restoreChannel(channelName, cb);
return;
};
**/
areaRemote.prototype.kick = function(channelName, uid, cb) {
this.adPushService.kick(channelName, uid, cb);
return;
};
areaRemote.js主要负责用户登录时将用户添加到相应通道里。
area/remote/adPushService.js
/**
* Module dependencies
*/
var logger = require('pomelo-logger').getLogger(__filename);
var pomelo = require('pomelo');
var Code = require('../../../../../shared/code');
var AdPushChannel = require('../../../domain/adPushGlobalChannel');
var Ssp = require('../../../domain/ssp');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Const = require('../../../../../shared/const');
var adPushService = function(app) {
EventEmitter.call(this);
this.app = app;
this.initFlag = false;
this.channelService = null;
this.channelOnDict = {};
};
util.inherits(adPushService, EventEmitter);
module.exports = adPushService;
adPushService.prototype.initService = function() {
if (! this.initFlag) {
this.channelService = this.app.get("channelService");
this.backeSessionService = this.app.get('backendSessionService');
for (var channelName in Const.PUSH_CHANNEL) {
var opts = {
channelName: Const.PUSH_CHANNEL[channelName],
app: this.app
};
var pushChannel = new AdPushChannel(opts);
this.channelOnDict[channelName] = pushChannel;
}
this.initFlag = true;
}
}
/**
adPushService.prototype.pushAd = function(adPushChannel, ads){
//logger.debug("Schedule job:"+data.name+"execute");
//var ads = [{url:' ad_id:1}];
this.channelOnDict[adPushChannel].pushAd(ads);
return;
};
**/
adPushService.prototype.add = function(uid, sid, cb){
var selGame = null;
var opts = {
uid: uid,
sid: sid
}
var ssp = new Ssp(opts);
//TODO get ssp strategy here
/**每个用户都会有一个或者多个特征,从而属于一个或者多个push通道,这里需要扩展的是:从离线挖掘系统中找出ssp属于哪些push策略
从而把这些通道名称赋予该ssp,然后会把ssp以此加到所属通道中,再由各个通道策略负责推送广告
add set ssp strategy here
**/
if (ssp.adPushStrategy in this.channelOnDict) {
this.channelOnDict[adPushStrategy].addSsp(ssp, cb);
} else {
this.channelOnDict[Const.PUSH_CHANNEL.DEFAULT].addSsp(ssp, cb);
}
return;
};
adPushService.prototype.kick = function(channelName, uid, cb) {
var game = null;
if (channelName in this.channelOnDict) {
game = this.channelOnDict[channelName];
}
if (!game) {
cb({code: Code.FAIL, msg: 'channelName does not existed'});
logger.warn("["+channelName+"] does not existed");
return;
}
game.kickSsp(uid, cb);
return;
};
domain/adPushGlobalChannel.js
主要是负责将提供真实的能力将用户添加到channel里面和向用户推送具体广告。
/**
* Module dependencies
*/
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Code = require('../../../shared/code');
var logger = require('pomelo-logger').getLogger(__filename);
var self= null;
var sType = 'connector';
//var playerPerGame = 2;
//var id = 1;
//var playerAdded = "playerAdded";
/**
* Initialize a new 'Game' with the given 'opts'.
* Game inherits EventEmitter
*
* @param {Object} opts
* @api public
*/
var adPushGlobalChannel = function(opts) {
EventEmitter.call(this);
this.channelName = opts.channelName;
this.app = opts.app;
this.ssps = {};
this.sspNames = [];
this.sspNum = 0;
//this.channel = this.app.get("globalChannelService").getChannel(this.channelName, true);
this.globalChannelService = this.app.get('globalChannelService');
self = this;
//this.initListener();
};
util.inherits(adPushGlobalChannel, EventEmitter);
/**
* Expose 'Game' constructor
*/
module.exports = adPushGlobalChannel;
/**
* Get entityId
*
* @return {Number}
* @api public
*/
adPushGlobalChannel.prototype.addSsp = function(ssp, cb) {
this.globalChannelService.add(this.channelName, ssp.uid, ssp.sid, function(err) {
if (err) {
cb({code: Code.FAIL, msg: err.stack});
return;
}
self.ssps[ssp.uid] = ssp;
self.sspNames.push(ssp.uid);
self.sspNum++;
self.app.get('backendSessionService').getByUid(ssp.sid, ssp.uid, function(err, backSessions){
if (err) {
logger.error("["+self.channelName+"] ssp "+ssp.uid + "put channelName in seesion failed");
cb({code: Code.FAIL, msg: err.stack});
return;
}
var backSession = backSessions[0];
backSession.set('channelName', self.channelName);
backSession.pushAll(null);
logger.debug("channelName has been put into session: " + backSession.get('channelName'));
});
cb({code: Code.OK});
logger.debug("["+self.channelName+"] add new ssp "+ssp.uid);
return;
});
};
adPushGlobalChannel.prototype.pushAd = function(ads) {
var param = {
ads: ads
};
//this.channel.pushMessage(param);
this.globalChannelService.pushMessage(sType, 'onNewAd', param, this.channelName, null, function(err){
if(err) {
logger.error("push NewAd to channel:["+self.channelName+'] failed: '+e.stack);
return;
}
logger.info("push NewAd to channel:["+self.channelName+'] success');
return;
});
};
adPushGlobalChannel.prototype.kickSsp = function(uid, cb) {
if (! uid in this.ssps) {
var msg = "unknow ssp:"+uid+"in adPushChannel:["+this.channelName+"]";
logger.warn(msg);
cb({code: Code.FAIL, msg: msg});
return;
}
//debugger;
var ssp = this.ssps[uid];
this.globalChannelService.leave(this.channelName, ssp.uid, ssp.sid, function(err){
if(err) {
logger.warn("user leave from ["+self.channelName+'] failed:'+e.stack);
cb({code: Code.FAIL, msg: e.stack});
return;
}
cb({code: Code.OK, msg: 'user leave success'});
return;
});
/**
if( !! this.channel) {
this.channel.leave(ssp.uid, ssp.sid);
var param = {
route: 'onLeave',
user: uid
};
this.channel.pushMessage(param);
}
**/
};
而ssp是对用户的抽象
/**
* Module dependencies
*/
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Const = require('../../../shared/const');
var logger = require('pomelo-logger').getLogger(__filename);
/**
* Initialize a new 'Ssp' with the given 'opts'.
* Ssp inherits EventEmitter
*
* @param {Object} opts
* @api public
*/
var Ssp = function(opts) {
EventEmitter.call(this);
this.uid = opts.uid;
this.sid = opts.sid;
this.adPushStrategy = opts.adPushStrategy || Const.AD_PUSH_STRATEGY.SCHEDULE.NAME;
};
util.inherits(Ssp, EventEmitter);
/**
* Expose 'Entity' constructor
*/
module.exports = Ssp;
推送策略。
/**
* Module dependencies
*/
//var EventEmitter = require('events').EventEmitter;
//var adPushChannel = require('./adPushGlobalChannel');
//var util = require('util');
var pomelo = require('pomelo');
var Code = require('../../../shared/code');
var Const = require('../../../shared/const');
var logger = require('pomelo-logger').getLogger(__filename);
//var searchSql = 'select * from pushhistory where status = ?';
var searchSql = 'select pushhistory.id,pushhistory.ad_id,pushhistory.url,pushhistory.crc,ubmc.targetUrl from pushhistory inner join ubmc on pushhistory.ad_id=ubmc.id where pushhistory.status = 0;';
var updateSql = 'update pushhistory set status=1 where id = ?';
var searchArgs = [0];
var self =null;
var sType = 'connector';
//var playerPerGame = 2;
//var id
展开阅读全文