您好,欢迎来到汇意旅游网。
搜索
您的当前位置:首页Spark DataSource 源码解析

Spark DataSource 源码解析

来源:汇意旅游网

SparkSession.read()

创建DataFrameReader对象,进行数据读取任务。

DataFrameReader

format

schema

option

json、csv、text…

通过format函数设置格式,并调用load函数加载数据。

load

调用DataSource.lookupDataSource方法获取source(参数为format传入的字符串)对应的Class对象。

如果该Class对象继承自DataSourceV2,且实现了ReadSupport接口,则调用DataSourceV2Relation.create方法创建DataSourceV2Relation逻辑计划,传入Dataset.ofRows方法,生成DataSet<Row>返回。

否则,调用loadV1Source方法,返回DataSet<Row>。

DataSource.lookupDataSource

从以上步骤我们可以看出自定义DataSource的方法,则有以下两种方式:

  • 用provider1正确加载DataSourceProvider类。provider即传入format方法中的字符串)= shortName,但是DataSourceProvider必须继承DataSourceRegister接口。或者provider = 自定义DataSourceProvider的全类名。
  • 用provider2正确加载DataSourceProvider类。provider = 自定义DataSourceProvider类的包名,DataSourceProvider类名为DefaultSource。

DataSourceProvider一般分为两类:继承FileFormat或RelationProvider接口.
DataSource API扩展借鉴:

DataFrameReader.loadV1Source

创建DataSource对象,并调用resolveRelation方法返回BaseRelation对象,传入sparkSession.baseRelationToDataFrame方法,返回RDD<Row>

DataSource.resolveRelation




providingClass即是调用DataSource.lookupDataSource返回的DataSource Class。

  • 如果DataSource是SchemaRelationProvider类型,且userSpecifiedSchema不为null,则调用dataSource.createRelation(传入schema)创建BaseRelation。
  • 如果DataSource是RelationProvider类型,且userSpecifiedSchema为null,则调用dataSource.createRelation(不传入schema)创建BaseRelation。
  • 如果DataSource是RelationProvider类型,且userSpecifiedSchema不为null,则调用dataSource.createRelation(不传入schema)创建BaseRelation,如果baseRelation.schema != userSpecifiedSchema则报出异常,否则返回
  • 如果DataSource是FileFormat,则创建HadoopFsRelation对象。

对于创建的BaseRelation对象进行checkColumnNameDuplication,然后返回。

SparkSession.baseRelationToDataFrame

利用BaseRelation创建LogicalRelation逻辑计划。

数据源有关的Strategy

FileSourceStrategy

SparkSQL执行过程中利用Strategy会将逻辑计算转换为物理计划。

FileSourceStrategy会传入HadoopFsRelation创建FileSourceScanExec物理计划。

FileSourceScanExec

doExecute()判断inputRDD<InternalRow>是否需要进行unsafeRow的转换。

inputRDD中利用relation.fileFormat.buildReaderWithPartitionValues作为readFile的方法,创建BucketedReadRDD或者NonBucketedReadRDD。

buildReader方法实现在FileFormat子类当中,比如TextFileFormat:

TextFileFormat使用HadoopFileLinesReader或者HadoopFileWholeTextReader读取PartitionedFile文件中的数据,返回Iterator[UnsafeRow] 。

createNonBucketedReadRDD和createBucketedReadRDD会创建FileScanRDD。FileScanRDD的compute方法会利用readFile方法读取文件数据。

DataSourceStrategy


InMemoryScans

DataSourceStrategy和InMemoryScans策略最后都会生成RowDataSourceScanExec,最终会调用CatalystScan\PrunedScan\TableScan的buildScan方法生成RDD[Row],再调用toCatalystRDD将RDD[Row]转换为RDD[InternalRow]。

总结

  1. format方法传入source字符串

  2. DataSource.lookupDataSource会找到source对应的DataSource类(一般包括FileFormat和 RelationProvider两类)

  3. DataSource.resolveRelation会根据DataSource类型创建BaseRelation(一般包括HadoopFsRelation和继承BaseRelation且实现以下接口的类:TableScan、PrunedScan、PrunedFilteredScan、InsertableRelation、CatalystScan )。

  4. SparkSession.baseRelationToDataFrame将BaseRelation传入创建LogicalRelation逻辑计划,并利用LogicalRelation创建DataSet。

  5. FileSourceScanExec\DataSourceStrategy\InMemoryScans将LogicalRelation逻辑计划转换为物理计划,生成具体的DataSourceRDD,compute函数实现真正的读取逻辑。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- hids.cn 版权所有 赣ICP备2024042780号-1

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务