最近公司打算把drools引入spark Streaming实时计算引擎,测试了一下,还是很好用
对 JavaPairDStream<String, Float> aggregateRecords 执行以下程序,创建drools session,加载配置文件并执行规则,执行后调用ChannAmount的hbaseSave()方法
- aggregateRecords.foreachRDD(new VoidFunction<JavaPairRDD<String, Float>>() {
- @Override
- public void call(JavaPairRDD<String, Float> stringFloatJavaPairRDD) throws Exception {
- stringFloatJavaPairRDD.foreach(new VoidFunction<Tuple2<String, Float>>() {
- @Override
- public void call(Tuple2<String, Float> stringFloatTuple2) throws Exception {
- //Droolssession创建ChannAmount
- KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
- KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
- kbuilder.add( ResourceFactory.newFileResource( "riskMonitor.drl"),
- ResourceType.DRL );
- Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages();
- kbase.addKnowledgePackages( pkgs );
- StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession();
- ChannAmount objectChannel = new ChannAmount();
- objectChannel.setAmount(stringFloatTuple2._2());
- objectChannel.setChannel(stringFloatTuple2._1());
- ksession.execute(objectChannel);
- }
- });
-
-
- }
- });
ChannAmount如下
- public static class ChannAmount implements Serializable{
- private String channel;
- private Float amount;
- public String getChannel() {
- return channel;
- }
-
- public void setChannel(String channel) {
- this.channel = channel;
- }
-
- public Float getAmount() {
- return amount;
- }
-
- public void setAmount(Float amount) {
- this.amount = amount;
- }
- public void hbaseSave(){
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
- Date date = new Date();
- HashMap<String,String> map = new HashMap<String, String>();
-
- map.put("DEAL_CHANNEL",channel);
- map.put("RISK_TYPE","1");
- map.put("RISK_LEVEL","5");
- map.put("WARN_TIME",sdf.format(date));
- map.put("SUGGESTION","Drools规则实现:该渠道最近5分钟交易金额为"+amount+"超过10000元,请密切监控");
- HbaseUtil.insertRow("operateMonitor","amount_monitor_"+channel,"cf",map);
- }
- }
riskMonitor.drl配置规则:假设规则为如果金额大于10000则写入hbase,其中m为对象objectChannel 的引用
- package com.runMatch
-
- import main.java.RiskMonitor.ChannAmount;
-
- rule "channel"
- when
- m : Message(m.getAmount()>10000.00 )
- then
- m.hbaseSave();
- end