【Flink】flink的数据源有哪些

601-赵同学

发表文章数:191

热门标签

, ,
首页 » 大数据 » 正文


基于本地集合的source | Collection-based-source

最常见的三种:

  • fromElements()
  • fromCollection()
  • generateSequence()

代码实现

import java.lang

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.types.LongValue
import org.apache.flink.util.{LongValueSequenceIterator, NumberSequenceIterator}

object BatchSource {
  def main(args: Array[String]): Unit = {
    /**
     * 三种形式
     * fromElements()
     * fromCollection()
     * generateSequence()
     */
    // 1.创建执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    // 2.获取数据源
    // 导入隐式转换的包
    import org.apache.flink.api.scala._

    // 2.1 用 element 创建 DataSet(fromElements)
    val elementDataSet: DataSet[String] = env.fromElements("1", "2", "3")

    // 2.2 用 collection 创建 DataSet(fromCollection)
    val collectionDataSet: DataSet[String] = env.fromCollection(List("1 2 3", "4 5 6"))

    // 2.3 用 sequence 创建 DataSet(generateSequence)
    val sequenceDataSet: DataSet[Long] = env.generateSequence(1, 5)

    // ps. 对env通过进行并行度设置
    val numsDataSet1: DataSet[lang.Long] = env.fromParallelCollection(new NumberSequenceIterator(1, 5)).setParallelism(2)
    val numsDataSet2: DataSet[LongValue] = env.fromParallelCollection(new LongValueSequenceIterator(1, 5)).setParallelism(2)

    // 3.输出结果
    elementDataSet.print()
    collectionDataSet.print()
    sequenceDataSet.print()
    numsDataSet1.print()
    numsDataSet2.print()
  }
}

基于文件的source | File-based-source

最常见的五种:

  • 读取本地文本文件
  • 读取本地csv文件
  • 读取HDFS文件
  • 读取本地压缩文件
  • 读取文件目录内的所有文件

开发代码

现在项目本地创建存放测试文件的目录,导入文件
【Flink】flink的数据源有哪些

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration

object BatchFromFile {

  /**
   * 普通类和样例类的区别
   * 1.不需要new
   * 2.默认实现了序列化
   * 3.可以做模式匹配
   *
   * @param subInt
   * @param subName
   */
  case class Subject(subInt: Int, subName: String)

  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    // 2.获取数据
    // 2.1 本地文本文件数据源
    val txtDataSet: DataSet[String] = env.readTextFile("./day01/data/input/wordcount.txt")

    // 2.2 本地csv数据源
    val csvDataSet: DataSet[Subject] = env.readCsvFile[Subject]("./day01/data/input/subject.csv")

    // 2.3 hdfs文件数据源
    val hdfsDataSet: DataSet[String] = env.readTextFile("hdfs://node01:8020/test/flink/input/wordcount.txt")

    // 2.4 本地压缩包数据源
    val gzDataSet: DataSet[String] = env.readTextFile("./day01/data/input/wordcount.txt.gz")

    // 2.5 遍历目录
    // 2.5.1 创建参数对象
    val configuration = new Configuration()
    // 2.5.2 开启递归
    configuration.setBoolean("recursive.file.enumeration", true)
    // 2.5.3 遍历整个目录
    val dirDataSet: DataSet[String] = env.readTextFile("./day01/data/input").withParameters(configuration)

    // 3.输出结果
    println("======本地文本文件数据源读取结果======")
    txtDataSet.print()
    println("======本地csv数据源读取结果======")
    csvDataSet.print()
    println("======hdfs文件数据源读取结果======")
    hdfsDataSet.print()
    println("======本地压缩包数据源读取结果======")
    gzDataSet.print()
    println("======本地目录数据源读取结果======")
    dirDataSet.print()
  }

}

未经允许不得转载:作者:601-赵同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《【Flink】flink的数据源有哪些》 发布于2020-05-05

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录