资源描述
1.1.1. 功能概述
业务数据集成是指MDM Server与外围系统之间的数据交换时需要中间处理的模块应用层。
如图所示,外围系统主要通过三种模式与MDM Server交互(这里不考虑ETL):
l 直接的web service访问 – 主要通过http协议联机访问,一般不通过业务集成层而直接访问数据服务层
l 近实时的消息通知 – 主要通过MQ消息访问,需要在业务集成层进行必要的处理
l 批量处理 – 主要通过文件进行数据传递,需要在业务集成层进行处理
业务集成服务层的主要内容包括:
l 近实时的消息通知
n 数据标准代码的转换
n 交易xml的拆分
n 其他规则
l 批量处理
n 标准代码的转换
n 入库方式判断处理,判断是否是新增还是更改
n 交易拆分
n 其他规则
1.1.2. 联机数据集成(基于MQ/XML)
联机数据集成主要处理近实时消息通知,处理模式如下图所示。
MQ组件是所有的客户信息更新消息的载体,MsgReceiver组件负责接收MQ的消息,通过MsgParser组件进行xml格式解析,MsgReceiver转交到IntegrateFW组件,IntegrateFW组件是转发框架,负责业务集成层的基本控制,根据报文的类型调用必要的规则处理进行数据转换或者信息拆分,然后交由具体逻辑处理实现模块IntegrateImpl完成业务集成逻辑,并最终通过ServiceInvocation模块调用MDM的服务,完成交易。
数据服务层主要包括扩展服务和组合服务。
如上图,根据业务需求,一般性的顺序如下详述:
1 ejbCreate – MQ中的信息通知WAS容器,根据MsgReceiver组件的MDB情况,创建一个实例进入池态;
2 onMessage – WAS容器调用MDB的onMessage方法,并把消息作为参数传递到该方法;
2.1 deliver – onMessage方法中调用IntegrateFW组件的deliver方法,并传递消息;
2.1.1 parse – IntegrateFW组件调用MsgParser组件解析传入的消息体;
2.1.1.1 Constructor – MsgParser组件根据消息体解析,生成一个IBObj对象;
2.1.2 parse – parse方法返回生成的IBObj对象;
2.1.3 getTCRMTxType – 获取IBObj对象的服务类型;
2.1.4 getTCRMTxType – 返回字符串结果;
2.1.5 getRequestName – 获取IBObj的请求者;
2.1.6 getRequestName – 返回;
2.1.7 Constructor – IntegrateFW根据服务对象和请求者,实例化一个IntegrateImpl组件中集成服务实例;
2.1.8 execute – 调用具体实例的业务逻辑,并把IBObj作为参数传递;
2.1.8.1 callRule – 调用标准代码转换规则,进行必要规则处理;
2.1.8.2 callRule – 返回规则处理后的IBObj对象;
2.1.8.3 matchRule – 调用客户识别规则,进行识别
2.1.8.4 matchRule – 调用返回
2.1.8.5 constructIBObj – 在具体的业务逻辑处理中,根据具体需要调用MsgParser创建需要的IBObj对象;
2.1.8.5.1 Constructor – MsgParser构建新的IBObj实例;
2.1.8.6 constructIBObj – 返回具体的IBObj实例;
2.1.8.7 setAttribute – 处理具体的IBObj实例内容;
2.1.8.8 setAttribute – 处理内容返回;
2.1.8.9 toServiceXml – 处理完所有的IBObj实例内容后,调用方法转换为xml标准服务格式;
2.1.8.10 toServiceXml – 返回xml字节流;
2.1.8.11 invoke – 调用MDM服务;
2.1.8.12 invoke – 返回;
2.1.9 execute – 完成具体业务集成逻辑实例的执行;
2.2 deliver – 完成整个业务集成逻辑的执行;
注:
2.1.8到2.1.9之间可能有多次的callRule和invoke调用;
2.1.8 到2.1.9 execute的调用需要包括在事务处理中;
批量处理的业务集成层设计参见《批量型》章节。
1.1.2.1. MsgReceiver组件
MsgReceiver组件主要由IntegrateMDB组成,IntegrateMDB是一个MessageDriverBean,通过onMessage(Message)方法接收MQ的消息通知。
信息接收组件主要负责从MQ接收消息,参数Message使用BytesMessage类型,消息格式为XML形式,其具体的接口定义参见《服务接口定义》中的相关通过MQ接口部分。
接口主要包括:
l 个险 – 增加客户(addPerson / CSCMQ)
l 个险 – 保全修改客户(updatePerson / CSCMQ)
l 电商网上销售 – 增加客户(addPerson / TOLMQ)
l 电商网上销售 – 修改客户基本信息(updatePersonBase / TOLMQ)
l 电商网上销售 – 保全修改客户(updatePerson / TOLMQ)
注:以上括号中的服务名是指各个业务系统提交到MQ中的消息服务名,而不是MDM调用的标准服务名;
组件调用IntegrateFW组件进行后续处理。
public void onMessage(Message msg) {
… …
if (msg instanceof BytesMessage) {
BytesMessage bm = (BytesMessage) msg;
long length = bm.getBodyLength();
byte[] bs = new byte[length];
bm.getBytes(bs);
BytesInputStream bis = new BytesInputStream(bs);
IntegrateFW.deliver(bis);
} else {
… …
}
… …
}
1.1.2.2. IntegrateFW组件
IntegrateFW组件是框架组件,负责业务集成层的控制调度。组件负责调用MsgParser组件,解析xml报文,然后根据报文类型调用具体的控制处理逻辑,同时调用业务处理规则进行数据和业务处理,最后形成处理后的报文提交ServiceInvocation组件调用MDM Service,完成整个处理,同时如果在处理中发生异常,则交由IntegException组件处理异常结果。
IntegrateFW组件中对于多个service调用作为同一事务来管理,如果调用出错需要进行回滚处理。
IntegrateFW组件暴露一个接口IntegrateLogic供IntegrateImpl组件实现,其接口方法如下。
/**
* 执行具体的业务集成任务. 用于MQ异步通知模式
* 在IntegrateImpl组件实现该方法,处理具体某一项接口的逻辑调用,一般包括
* 代码转换、服务拆分、服务变换、DSP判断等
* @param is 输入流,是通过MQ接收到的消息内容,xml字节流
* @throws ItegrateException 例外,如果发生例外需要到例外组件处理
*/
public void execute(IBobj obj) throws ItegrateException;
组件对外调用的Façade接口调用是IntegratFW类,使用静态方法。
/**
* 传递到IntegrateFW组件执行后续任务. 用于MQ异步通知模式
* @param is 输入流,是通过MQ接收到的消息内容,xml字节流
*/
public static void deliver(InputStream is);
1.1.2.3. IntegrateImpl组件
IntegrateImpl组件是根据报文类型确定的具体的业务集成逻辑的实现。
一期需求主要包括:
l 个险增加客户
l 个险修改客户基本信息
l 个险保全修改客户信息
l 网上销售增加客户
参见个险增加客户,只是其接口内容稍有差异
l 网上销售修改客户基本信息
参见个险修改客户基本信息,只是其接口内容稍有差异
l 网上销售保全修改客户信息
参见个险保全修改客户信息,只是其接口内容稍有差异
此组件需要处理地址、电话、电子邮件、客户标识等的格式变换,是把输入的对象格式变为标准的MDM扩展服务格式,包括:
l 从粗粒度服务格式转换为updatePersonName服务格式
l 转换为changePartyAddress服务格式 – 服务中确定具体调用correctPartyAddress还是addPartyAddress还是不做处理;
l 转换为changePartyContactMethod服务格式 – 服务中定具体调用updatePartyContactMethod还是addPartyContactMethod还是不做处理;
l 从粗粒度服务格式转换为changePartyIdentifer服务格式 – 服务中定具体调用updatePartyIdentifer还是addPartyIdentifer还是不做处理;
具体接口参见《服务接口》和MDM开发文档。
服务逻辑参见相关在线服务组合服务设计;
1.1.2.4. MsgParser组件
信息解析主要是根据xml报文解析其内容。
MsgParser组件根据不同的报文类型解析不同的内容。
MsgParser组件还负责输出变更后的报文结果。
MsgParser是暴露在外的Façade调用界面,通过两个个方法接收外部调用。
/**
* 解析xml.
* @param is 输入流,是通过MQ接收到的消息内容,xml字节流
*/
public static IBObj parse(InputStream is);
/**
* 把IBObj根据类型输出为标准的MDM service报文.
* @param obj 输入的IBObj
* @return 返回一个字节数组,组成xml,使用标准的iso-8859-1格式
*/
public static byte[] toServiceXml(IBObj obj);
/**
* 根据名称构建新的IBObj对象.
* @param name 输入的IBObj名称
* @return 返回IBObj对象
*/
public static IBObj constructIBObj(String name);
IBObj是用于定义报文的bean,类似MDM中的BObj。
IBObj主要定义xml中的bobj对象,IBObj内部可以嵌套。其暴露的接口如下。
/**
* 设置IBObj的类型. 是指对象类型,如TCRMPersonBObj、
* TCRMAdminContEquivBObj、TCRMPartyAddressBObj、TCRMAddressBObj等。
*/
public void setType(String name);
public String getType();
/**
* 设置服务的类型.
*/
public void setTCRMTxType(String type);
pubic String getTCRMTxType();
/**
* 设置服务的对象类型.
*/
public void setTCRMTxObject(String obj);
public String getTCRMTxObject();
/**
* 设置IBObj对象.
*/
public void setIBObj(IBObj obj);
/**
* 根据名字获取IBObj对象. 只能获取下一级对象.
*/
public IBObj[] getIBObj(String type);
public String getAttribute(String type);
/**
* 设置属性. 自动区分处理Extension属性.
*/
public String setAttribute(String type, String value);
/**
* 设置服务头.
*/
public void setHeader(String header);
public String getHeader();
/**
* 设置请求控制部分.
*/
public void setRequestControl(String rc);
public String getRequestControl();
/**
* 设置请求控制部分的请求名称.
*/
public void setRequestName(String rn);
public String getRequestName();
/**
* 设置请求控制部分的LOB.
*/
public void setRequestLOB(String rlob);
public String getRequestLOB();
1.1.2.5. DSP Rule组件
处理DSP规则,具体参见DSP设计中在线可疑客户识别。
1.1.2.6. CD Rule组件
处理标准代码的转换,根据requestName来判断来源,并把源系统标准代码转换为MDM标准代码。
CDRule组件由CDPool从数据库中装载标准代码数据映射。映射关系是各个源系统指向MDM。
sources.properites文件是每个源系统接口中的需要转换的代码定义,其格式如下:
CSCMQ= GenderType|HighestEducationType|TCRMAdminContEquivBObj. AdminSystemType|…
TOLMQ= GenderType|…
格式以Key=Value方式存放,key为源系统提交的requestName,而值是以’|’分割的多个数据域,每个数据域都需要进行标准代码替换。如果该数据域是在接口的更底层,以’.’作为路径分割,如TCRMAdminContEquivBObj.AdminSystemType表示是在该对象中的TCRMAdminContEquivBObj对象下的AdminSystemType需要进行代码转换。
具体的实现类CDRule则根据输入IBObj对象和相应数据定义进行代码转换任务。
RuleFW是对外暴露的调用类,提供静态方法。
/**
* 规则调用处理.
* @param type 规则类型,根据此类型确定是调用哪个规则实现
* @param obj 输入输出对象,规则实现对其进行处理,并形成结果返回
*/
public static void callRule(Stirng type, IBObj obj) throws IntegrateException;
一期需要进行的代码转换为:等待标准代码标对应完毕后补充
l 个险
数据域
源系统代码
源系统值
MDM代码
MDM值
GenderType
M
男
M
男
F
女
F
女
U
未知
U
未知
TCRMPartyIdentificationBObj
.IdentificationType
0
身份证
00
其它
1
参字第
10
后字第
11
空文字第
12
北文字
13
护照
14
装字第
15
北文字第
RelationshipType
1
本人
2
丈夫
3
妻子
4
父亲
5
母亲
6
儿子
HighestEducationType
'01'
'文盲'
'02'
'小学'
'03'
'初中'
'04'
'高中'
'05'
'大专'
'06'
'本科'
'07'
'研究生及以上'
'08'
'中专'
未知
MaritalStatusType
0
未婚
0
未婚
1
已婚
1
已婚
2
离婚
2
离婚
3
鳏寡
3
鳏寡
AddressUsageType
B
单位地址
2
单位地址
P
邮递地址
3
邮递地址
R
家庭地址
1
家庭地址
l 电商网上销售
l 银保
l 养老金
l 团险
1.1.2.7. Utils组件
Utils组件是工具类组件,主要包括
l 服务调用组件,负责进行MDM服务的调用
通过IIOP方式访问MDM的EJB(DWLServiceController)来处理web service格式的请求,具体请参考MDM workbench中的com.ibm.mdm.training.testerDWLServiceControllerTester。
l 数据库访问
1.1.2.8. IntegException组件
例外处理的组件,如果处理过程中有例外,则需要记录例外的原因和状态,并把该服务请求xml保存到数据库。
例外后需要保存的内容如下:
字段
代码
类型
备注
错误流水号
ERRLOGID
BigInt
PK,自增型
时间
LOG_DT
Timestamp
错误返回消息
ERRMESSAGE
Varchar(255)
exception.getMessage()
或者业务逻辑错误说明,如找不到该客户等
错误提交消息
REQUESTXML
Xml字段
错误堆栈同样需要记录到log4j日志中。
1.1.3. 批量数据集成(基于批量/XML)
批量处理的主要内容类似业务数据集成章节的近实时处理部分,主要是FileParser模块管理各个业务系统上传的批量文件,并调用MsgParser模块解析具体的数据内容,同样,根据规则处理各个业务逻辑规则,然后形成具体的批量文件,并调用MDM的BatchProcessor进行批量处理。
具体模块关系如下图所示。
批量处理的部分将会复用较多的近实时处理部分的业务组件,包括:
l MsgParser组件
l IntegException组件
l CDRule组件
1 detectFile – 守护线程,检查各个系统相应目录下的上穿文件是否ready;
2 Constructor – 读入文件,形成文件输入流和输出流,并调用批量处理,根据源系统类型实例化具体的业务逻辑处理实例,传递输入流;
3 readOneService – 从输入流读取一个service块;
4 readOneService – 返回读取的service块;
5 parse – 调用MsgParser把servie块的xml格式解析成为一个IBObj;
6 parse – 返回IBObj;
7 callRule – 调用代码转换rule,转换客户证件类型、地址类型、联系类型为MDM标准代码;
8 callRule – 调用返回;
9 writeTmp – 把客户关键信息写入数据库,内容参见临时表定义;
10 writeTmp – 返回;
以上从3到10循环处理,直到文件输入流到尾端;
11 resetFile – 把文件输入流重新定位到开始;
12 resetFile – 返回重新定位后的文件输入流;
13 callRule – 调用批量客户识别的规则处理;
13.1 updateTmp – 根据规则处理结果更新临时表,确定客户增加、修改类型以及客户的地址、联系、证件类型更改类型;
13.2 updateTmp – 处理临时表结束;
14 callRule – 调用规则返回;
15 readOneService – 从文件输入流读入一个service xml块;
16 readOneService – 返回service块;
17 parse – 调用MsgParser解析为一个IBObj;
18 parse – 返回IBObj;
19 readTmp – 读入一条与IBObj匹配的临时表记录;
20 readTmp – 读入返回数据;
21 callRule – 调用格式转换规则处理,根据临时表类型标志,转换IBObj的内容为标准服务的IBObj;
22 callRule – 返回转换完毕的IBObj;
23 toServiceXml – 把IBObj转换为标准的xml格式;
24 toServiceXml – xml结果;
25 writeXml2File – 把xml结果写入文件输出流;
26 writeXml2File – 返回;
以上15到26循环处理,直到文件处理完毕;
27 runbatch.sh – 关闭相应资源,调用MDM的BatchProcessor进行批量处理,完成后处理相应输入输出及中间文件;
1.1.1.1. 批量处理接口
批量处理主要包括三个外部系统接口:
l 银保增加客户
l 养老金增加客户
l 团险增加客户
具体接口格式参见《服务接口》
外部系统输出符合接口规范的数据文件,并FTP到规定的目录,具体参见《服务接口》中“批量接口模式”。
1.1.1.2. FileParser组件
FileParser组件是解析文件的处理,其主要处理客户的上传数据文件,同时新建输出结果文件。
解析整体文件格式;包括输入输出,输出文件每个service xml只能是一行;
FileParser调用IntegrateFW组件以处理不同的业务集成逻辑。
FileParser组件暴露一个抽象类IntegrateBatchLogic供IntegrateImpl组件实现,其抽象方法如下。
/**
* 执行具体的业务集成任务. 用于批量模式
* 在IntegrateImpl组件实现该方法,处理具体某一项接口的逻辑调用,一般包括
* 代码转换、服务拆分、服务变换、DSP判断等
* @param is 输入流,是通过MQ接收到的消息内容,xml字节流
* @throws ItegrateException 例外,如果发生例外需要到例外组件处理
*/
public abstract void executeBatch(InputStream is,OutputStream os) throws ItegrateException;
在IntegrateBatchLogic中还实现必要方法:
² resetFile – 重置文件指针到文件头;
² callBatchSDP – 调用批量客户识别规则;
² callRule – 调用代码转换规则;
² getService – 获取文件的下一个service块;
² callMDMBatch – 调用MDM的runbatch.sh
1.1.1.3. IntegrateImpl组件
具体的业务集成逻辑实现,三个不同源系统的增加客户业务集成逻辑相同,其具体内容格式有所差异。
具体逻辑处理如下图。
1.1.1.4. 批量客户识别匹配规则
批量客户识别匹配规则是指考虑执行效率而进行的客户识别匹配程序,其匹配规则与在线完全相同,但处理方式变成批量,规则参考如下:
统一客户管理平台进行客户识别的关键数据包括:客户名称+出生日期、证件类型+证件编号以及客户性别。为了提高系统的客户识别能力和信息的准确度,加快业务处理的速度,系统针对用于客户识别的关键数据进行了有效性规定,符合下列规定的信息才可以作为客户识别的依据:
n 客户名称不允许为空
n 证件编号长度至少8位。
不满足上述有效性规则的客户信息会被加入统一客户管理平台,但不会进行客户识别。
注:规则以在线匹配规则为准,请参考相关设计文档以确认规则一致性;
客户匹配的主体是MDM数据库的contact / person / identifier表;
另外进行匹配的表是临时表:tmp_contact
Sql参考:
-- 先根据客户号和源系统id进行匹配
update tmp_contact t set
(MATCH_TP
,MATCH_PARTYID
,MATCH_PARTYUPDT
,MATCH_PERSONUPDT
,PERSONNAMEID
,PERSONNAME_UPDT
)
=
(select
1
,c.cont_id
,c.last_update_dt
,p.last_update_dt
,pn.person_name_id
,pn.last_update_dt
from
contact c
,person p
,tmp_contact t
,contequiv e
,personname pn
where
c.cont_id=p.cont_id
and t.lob_tp=e.ADMIN_SYS_TP_CD
and t.lob_custno=e.ADMIN_CLIENT_ID
and e.cont_id=p.cont_id
and pn.cont_id=c.cont_id
and pn.name_usage_tp_cd=1
and (pn.end_dt is null or pn.end_dt > current timestamp)
and (c.end_dt is null or c.end_dt > current timestamp)
and (p.end_dt is null or p.end_dt > current timestamp)
and (e.end_dt is null or e.end_dt > current timestamp)
)
where exists
(select
1
from
contact c
,person p
,tmp_contact t
,contequiv e
,personname pn
where
c.cont_id=p.cont_id
and t.lob_tp=e.ADMIN_SYS_TP_CD
and t.lob_custno=e.ADMIN_CLIENT_ID
and e.cont_id=p.cont_id
and pn.cont_id=c.cont_id
and pn.name_usage_tp_cd=1
and (pn.end_dt is null or pn.end_dt > current timestamp)
and (c.end_dt is null or c.end_dt > current timestamp)
and (p.end_dt is null or p.end_dt > current timestamp)
and (e.end_dt is null or e.end_dt > current timestamp)
)
;
-- 根据证件类型证件号码客户姓名客户生日进行匹配
update tmp_contact t set
(MATCH_TP
,MATCH_PARTYID
,MATCH_PARTYUPDT
,MATCH_PERSONUPDT
,PERSONNAMEID
,PERSONNAME_UPDT
)
=
(select
1
,c.cont_id
,c.last_update_dt
,p.last_update_dt
,pn.person_name_id
,pn.last_update_dt
from
contact c
,person p
,tmp_contact t
,identifier i
,personname pn
where
c.cont_id=p.cont_id
and i.cont_id=c.cont_id
and t.id_tp_cd=i.id_tp_cd
and t.ref_num=i.ref_num
and length(rtrim(ltrim(t.ref_num)))>7
and t.contact_name=c.contact_name
and t.person_name is not null
and t.person_name<>’’
and ((t.GENDER_TP_CODE is null and p. t.GENDER_TP_CODE is null) or t.GENDER_TP_CODE=p.GENDER_TP_CODE)
and char(t.birth_dt, ISO)=substr(p.birth_dt, 1, 10)
and t.match_tp=2
and pn.cont_id=c.cont_id
and pn.name_usage_tp_cd=1
and (pn.end_dt is null or pn.end_dt > current timestamp)
and (c.end_dt is null or c.end_dt > current timestamp)
and (p.end_dt is null or p.end_dt > current timestamp)
and (i.end_dt is null or i.end_dt > current timestamp)
)
where exists
(select c.cont_id
from
contact c
,person p
,tmp_contact t
,identifier i
,personname pn
where
c.cont_id=p.cont_id
and i.cont_id=c.cont_id
and t.id_tp_cd=i.id_tp_cd
and t.ref_num=i.ref_num
and length(rtrim(ltrim(t.ref_num)))>7
and t.contact_name=c.contact_name
and t.person_name is not null
and t.person_name<>’’
and ((t.GENDER_TP_CODE is null and p.GENDER_TP_CODE is null) or t.GENDER_TP_CODE=p.GENDER_TP_CODE)
and char(t.birth_dt, ISO)=substr(p.birth_dt, 1, 10)
and t.match_tp=2
and pn.cont_id=c.cont_id
and pn.name_usage_tp_cd=1
and (pn.end_dt is null or pn.end_dt > current timestamp)
and (c.end_dt is null or c.end_dt > current timestamp)
and (p.end_dt is null or p.end_dt > current timestamp)
and (i.end_dt is null or i.end_dt > current timestamp)
)
;
-- 根据匹配结果查找需要修改的证件号码
update tmp_contact t set
(IDENT_MATCH_TP
,IDENTIFIR_ID
, IDENTIFIR_UPDT
) =
(select
1
,IDENTIFIER_ID
,LAST_UPDATE_DT
from
tmp_contact t
,identifier i
where
t.match_partyid=i.cont_id
and t.id_tp_cd=i.id_tp_cd
and t.ref_num<>i.ref_num
and (i.end_dt is null or i.end_dt > current timestamp)
)
where exists
(select
1
from
tmp_contact t
,identifier i
where
t.match_partyid=i.cont_id
and t.id_tp_cd=i.id_tp_cd
and t.ref_num<>i.ref_num
and (i.end_dt is null or i.end_dt > current timestamp)
);
-- 根据匹配结果查找需要修改的地址
update tmp_partyaddress set
(ADDR_UP_TP
, PARTYADDRESSID
, PARTYADDRESSUPDT
, LOCATIONGROUPUPDT
) =
(select
2
,a.location_group_id
,a.last_update_dt
,l.last_update_dt
from
locationgroup l
,adderssgroup a
,tmp_contact t
,tmp_partyaddress pa
where
l.location_group_id=a.location_group_id
and l.cont_id=t.matcd_partyid
and t.lob_tp=pa.lob_tp
and t.lob_custno =pa.lob_custno
and pa.ADDR_USAGE_TP_CD=a.ADDR_USAGE_TP_CD
and (l.end_dt is null or l.end_dt > current timestamp)
and (a.end_dt is null or a.end_dt > current timestamp)
)
where exists
(select
1
from
locationgr
展开阅读全文