博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm入门demo
阅读量:6078 次
发布时间:2019-06-20

本文共 4197 字,大约阅读时间需要 13 分钟。

一.storm入门demo的介绍

       storm的入门helloworld有2种方式,一种是本地的,另一种是远程。

  本地实现:

    本地写好demo之后,不用搭建storm集群,下载storm的相关jar包即可实现storm的相关操作

  远程实现:

    本地写好demo之后,需要将其打成jar包,然后通过nimbus将jar包运行即可

    本地打包注意事项:

      由于打好的jar包会将其放到storm的集群上,因此storm上已经包含了运行的相关环境,但是在通过maven打包时需要storm-core设置成provided范围,不需要将storm-core的相关类打进jar包,以避免引起冲突

 

二.本地demo的实现

  下载storm 所需jar包

1.spout数据源的实现

 

public class RandomStringSpout extends BaseRichSpout{        private final static Map
map = new HashMap
(); private SpoutOutputCollector collector; public RandomStringSpout(){ map.put(0, "kafka"); map.put(1, "nifi"); map.put(2, "flink"); map.put(3, "storm"); map.put(4, "spark"); } //在Spout组件初始化时被调用 public void open(Map arg0, TopologyContext topologyContextrg1, SpoutOutputCollector spoutOutputCollector) { System.err.println(" ============== open"); this.collector = spoutOutputCollector; } //nextTuple()方法是Spout实现的核心。 //也就是主要执行方法,用于输出信息,通过collector.emit方法发射 public void nextTuple() { //发送数据 collector.emit(new Values(map.get(ThreadLocalRandom.current().nextInt(4)))); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } //用于声明数据格式,即输出的一个Tuple中,包含几个字段 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("stream")); }}

 

2.Bolt数据过滤

public class WrapStarBolt extends BaseBasicBolt{    public void execute(Tuple tuple, BasicOutputCollector Collector) {        String value = tuple.getStringByField("stream");        System.err.println("******"+value);    }    public void declareOutputFields(OutputFieldsDeclarer declarer) {         //nothing to do     }}
public class WrapWellBolt extends BaseBasicBolt{    public void execute(Tuple tuple, BasicOutputCollector collector) {        String value = tuple.getStringByField("stream");        System.err.println("#######"+value);    }    public void declareOutputFields(OutputFieldsDeclarer arg0) {        //nothing to do     }}

3.创建topology

//所有的spout bolt 会组成一个topology public class RadomStringTopologyLocal {    public static void main(String[] args) throws InterruptedException {                TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("RandomStringSpout", new RandomStringSpout());                builder.setBolt("wrapStarBolt", new WrapStarBolt()).shuffleGrouping("RandomStringSpout");        builder.setBolt("wrapWellBolt", new WrapWellBolt()).shuffleGrouping("RandomStringSpout");                Config config = new Config();        config.setDebug(true);                LocalCluster cluster = new LocalCluster();        cluster.submitTopology("RadomStringTopologyLocal", config, builder.createTopology());                System.err.println("the first topology is start running at local");                TimeUnit.SECONDS.sleep(30);        cluster.killTopology("RadomStringTopologyLocal");        cluster.shutdown();    }} //从运行的结果中可以看出写的demo已经运行

三.远程demo的实现

  1.使用上一个demo中的RandomStringSpout  WrapStarBolt  WrapWellBolt  这三个类,然后编写新的RandomStringTopologyRemote类

 

public class RandomStringTopologyRemote {        public static void main(String[] args) {        final TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("RandomStringSpout", new RandomStringSpout());        builder.setBolt("WrapStarBolt", new WrapStarBolt(),4).shuffleGrouping("RandomStringSpout");        builder.setBolt("WrapWellBolt", new WrapWellBolt(),4).shuffleGrouping("RandomStringSpout");                final Config config = new Config();        config.setNumWorkers(3);        try {            StormSubmitter.submitTopology("RandomStringTopologyRemote", config, builder.createTopology());        } catch (Exception e) {            e.printStackTrace();        }     }}

 

2.把上面4个类通过maven工具打成jar包,并上传至nimbus所在的服务器上,上传完毕后通过以下命令远程启动storm

  storm jar storm_test-0.0.1-SNAPSHOT.jar com.zpb.RandomStringTopologyRemote

  远程启动storm的命令是:

    storm jar + *.jar  main函数的全路径名

3.通过UI工具查看

  

 4.关闭提交的topology

  storm kill 提交的topology名  

       

 

 

 

      

转载于:https://www.cnblogs.com/MrRightZhao/p/11005174.html

你可能感兴趣的文章
python——序列 & 集合 & 映射
查看>>
搞全闪存阵列的各执一词 宏杉说别吵了,就用我哒
查看>>
玩转搭载眼球追踪的FOVE 0,需要多高的配置呢?
查看>>
vue-router 源码:前端路由
查看>>
Flask下载文件
查看>>
java基础学习_基础语法(上)02_day03总结
查看>>
乐视印度公司裁员80%,全球化扩张遭遇滑铁卢,它还能撑多久?
查看>>
weex sdk集成到Android工程二. weex sdk集成到Android工程
查看>>
Git工程实践(二)多账号配置
查看>>
鱼鹰软件签约老牌传播机构思艾传播集团
查看>>
线程(杂)
查看>>
未来杯高校AI挑战赛激战正酣 金山云全程提供云资源
查看>>
【资讯】福布斯:旅行积分计划是区块链主要目标,对旅行者来说是好消息
查看>>
高桥智隆:未来机器人将取代智能手机,并成为人类的朋友
查看>>
工信部表示:建立网络数据安全管理体系 强化用户个人信息保护
查看>>
感受真实的华为-记山东CIO智库会员华为之行
查看>>
Spring的依赖注入概述
查看>>
为什么我的联想打印机M7450F换完墨粉之后打印机显示请更换墨粉盒?这是我的墨盒第一次灌粉·、...
查看>>
命运多舛、前途未卜,共享经济年终盘点之网约车
查看>>
研究人员研制出可有效抑制艾滋病病毒的新药,可让病毒几乎检测不出来
查看>>