【Flink】实操小案例,开发DataSet API实现简单的单词计数

601-赵同学

发表文章数:191

热门标签

, , ,
首页 » 大数据 » 正文


一、创建maven工程,导包

<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.2</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <hadoop.version>2.6.0</hadoop.version>
    <flink.version>1.7.2</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    <iheart.version>1.4.3</iheart.version>
    <fastjson.version>1.2.7</fastjson.version>
</properties>

<dependencies>
    <!-- 导入scala的依赖 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- 导入flink streaming和scala的依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 导入flink和scala的依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 指定flink-client API的版本 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 导入flink-table的依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 指定hadoop-client API的版本 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
        <!--如果要保存到hdfs,必须要排除xml-apis,因为它和dom4j冲突-->
        <exclusions>
            <exclusion>
                <groupId>xml-apis</groupId>
                <artifactId>xml-apis</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- 指定mysql-connector的依赖 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.38</version>
    </dependency>
    <!-- 指定fastjson的依赖 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.60</version>
    </dependency>
    <dependency>
        <groupId>com.jayway.jsonpath</groupId>
        <artifactId>json-path</artifactId>
        <version>2.3.0</version>
    </dependency>
    <!-- 指定flink-connector-kafka的依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.5.1</version>
            <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <!--<encoding>${project.build.sourceEncoding}</encoding>-->
            </configuration>
        </plugin>

        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <!--<arg>-make:transitive</arg>-->
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>

                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.18.1</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <!--
                                    zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF
                                    -->
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>cn.itcast.batch.BatchWordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

二、开发代码

package cn.itcast.batch

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchWordCount {
  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment


    // 2.接入数据源,批处理的数据源对象是DataSet
    // 代码后面出现报错的(...),说明需要隐式转换,要导入隐式转换的包
    import org.apache.flink.api.scala._
    val textDataSet: DataSet[String] = env.fromCollection(List("hadoop spark hive", "hadoop hadoop spark"))

    // 3.进行数据处理
    // 3.1 对数据进行切割、压平
    val wordDataSet: DataSet[String] = textDataSet.flatMap(x => x.split(" "))
    // 3.2 拿到的数据为(hadoop,spark,hive,hadoop,hadoop,spark),再对每个单词标"1"
    val wordAndOneDataSet: DataSet[(String, Int)] = wordDataSet.map(x => (x, 1))
    // 3.2 拿到的数据为(hadoop,1)(spark,1)(hive,1)... 对数据进行分组
    val groupedDataSet: GroupedDataSet[(String, Int)] = wordAndOneDataSet.groupBy(0)
    // 3.3 将分组后的数据内的"1"行累加
    val sumDataSet: AggregateDataSet[(String, Int)] = groupedDataSet.sum(1)

    // 4.输出结果
    // sumDataSet.print()
    sumDataSet.writeAsText("hdfs://node01:8020/test/flink/output/05051338001")
    // 如果是打印结果,不需要excute,反之需要
    env.execute("BatchWordCount")
  }

}

三、打包,上传到web执行

【Flink】实操小案例,开发DataSet API实现简单的单词计数
【Flink】实操小案例,开发DataSet API实现简单的单词计数

未经允许不得转载:作者:601-赵同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《【Flink】实操小案例,开发DataSet API实现简单的单词计数》 发布于2020-05-05

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录