spark Streaming 和drools整合

最近公司打算把drools引入spark Streaming实时计算引擎,测试了一下,还是很好用

对 JavaPairDStream<String, Float> aggregateRecords 执行以下程序,创建drools session,加载配置文件并执行规则,执行后调用ChannAmount的hbaseSave()方法

  1. aggregateRecords.foreachRDD(new VoidFunction<JavaPairRDD<String, Float>>() {
  2. @Override
  3. public void call(JavaPairRDD<String, Float> stringFloatJavaPairRDD) throws Exception {
  4. stringFloatJavaPairRDD.foreach(new VoidFunction<Tuple2<String, Float>>() {
  5. @Override
  6. public void call(Tuple2<String, Float> stringFloatTuple2) throws Exception {
  7. //Droolssession创建ChannAmount
  8. KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
  9. KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
  10. kbuilder.add( ResourceFactory.newFileResource( "riskMonitor.drl"),
  11. ResourceType.DRL );
  12. Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages();
  13. kbase.addKnowledgePackages( pkgs );
  14. StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession();
  15. ChannAmount objectChannel = new ChannAmount();
  16. objectChannel.setAmount(stringFloatTuple2._2());
  17. objectChannel.setChannel(stringFloatTuple2._1());
  18. ksession.execute(objectChannel);
  19. }
  20. });
  21. }
  22. });

ChannAmount如下

  1. public static class ChannAmount implements Serializable{
  2. private String channel;
  3. private Float amount;
  4. public String getChannel() {
  5. return channel;
  6. }
  7. public void setChannel(String channel) {
  8. this.channel = channel;
  9. }
  10. public Float getAmount() {
  11. return amount;
  12. }
  13. public void setAmount(Float amount) {
  14. this.amount = amount;
  15. }
  16. public void hbaseSave(){
  17. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
  18. Date date = new Date();
  19. HashMap<String,String> map = new HashMap<String, String>();
  20. map.put("DEAL_CHANNEL",channel);
  21. map.put("RISK_TYPE","1");
  22. map.put("RISK_LEVEL","5");
  23. map.put("WARN_TIME",sdf.format(date));
  24. map.put("SUGGESTION","Drools规则实现:该渠道最近5分钟交易金额为"+amount+"超过10000元,请密切监控");
  25. HbaseUtil.insertRow("operateMonitor","amount_monitor_"+channel,"cf",map);
  26. }
  27. }
riskMonitor.drl配置规则:假设规则为如果金额大于10000则写入hbase,其中m为对象objectChannel 的引用
  1. package com.runMatch
  2. import main.java.RiskMonitor.ChannAmount;
  3. rule "channel"
  4. when
  5. m : Message(m.getAmount()>10000.00 )
  6. then
  7. m.hbaseSave();
  8. end