创建DataFrameReader对象,进行数据读取任务。
通过format函数设置格式,并调用load函数加载数据。
调用DataSource.lookupDataSource方法获取source(参数为format传入的字符串)对应的Class对象。
如果该Class对象继承自DataSourceV2,且实现了ReadSupport接口,则调用DataSourceV2Relation.create方法创建DataSourceV2Relation逻辑计划,传入Dataset.ofRows方法,生成DataSet<Row>返回。
否则,调用loadV1Source方法,返回DataSet<Row>。
从以上步骤我们可以看出自定义DataSource的方法,则有以下两种方式:
DataSourceProvider一般分为两类:继承FileFormat或RelationProvider接口.
DataSource API扩展借鉴:
创建DataSource对象,并调用resolveRelation方法返回BaseRelation对象,传入sparkSession.baseRelationToDataFrame方法,返回RDD<Row>
providingClass即是调用DataSource.lookupDataSource返回的DataSource Class。
对于创建的BaseRelation对象进行checkColumnNameDuplication,然后返回。
利用BaseRelation创建LogicalRelation逻辑计划。
SparkSQL执行过程中利用Strategy会将逻辑计算转换为物理计划。
FileSourceStrategy会传入HadoopFsRelation创建FileSourceScanExec物理计划。
doExecute()判断inputRDD<InternalRow>是否需要进行unsafeRow的转换。
inputRDD中利用relation.fileFormat.buildReaderWithPartitionValues作为readFile的方法,创建BucketedReadRDD或者NonBucketedReadRDD。
buildReader方法实现在FileFormat子类当中,比如TextFileFormat:
TextFileFormat使用HadoopFileLinesReader或者HadoopFileWholeTextReader读取PartitionedFile文件中的数据,返回Iterator[UnsafeRow] 。
createNonBucketedReadRDD和createBucketedReadRDD会创建FileScanRDD。FileScanRDD的compute方法会利用readFile方法读取文件数据。
DataSourceStrategy和InMemoryScans策略最后都会生成RowDataSourceScanExec,最终会调用CatalystScan\PrunedScan\TableScan的buildScan方法生成RDD[Row],再调用toCatalystRDD将RDD[Row]转换为RDD[InternalRow]。
format方法传入source字符串
DataSource.lookupDataSource会找到source对应的DataSource类(一般包括FileFormat和 RelationProvider两类)
DataSource.resolveRelation会根据DataSource类型创建BaseRelation(一般包括HadoopFsRelation和继承BaseRelation且实现以下接口的类:TableScan、PrunedScan、PrunedFilteredScan、InsertableRelation、CatalystScan )。
SparkSession.baseRelationToDataFrame将BaseRelation传入创建LogicalRelation逻辑计划,并利用LogicalRelation创建DataSet。
FileSourceScanExec\DataSourceStrategy\InMemoryScans将LogicalRelation逻辑计划转换为物理计划,生成具体的DataSourceRDD,compute函数实现真正的读取逻辑。
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- hids.cn 版权所有 赣ICP备2024042780号-1
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务