spark处理超大文件方法 spark读取hdfs文件规则( 五 )


废话不多说 , 直接看下面的代码片段 。
@Features(Feature.ModelIde)
@Stories(Story.DataSplit)
@Description(“使用pyspark验证随机拆分中的分层拆分”)
@Test
public void dataRandomFiledTest(){
String script = “# coding: UTF-8n” +
“# input script according to definition of “run” interfacen” +
“from trailer import loggern” +
“from pyspark import SparkContextn” +
“from pyspark.sql import SQLContextn” +
“n” +
“n” +
“def run(t1, t2, context_string):n” +
” # t2为原始数据 ,  t1为经过数据拆分算子根据字段分层拆分后的数据n” +
” # 由于数据拆分是根据col_20这一列进行的分层拆分 ,  所以在这里分别n” +
” # 对这2份数据进行分组并统计每一个分组的计数 。由于这一列是labeln” +
” # 所以其实只有两个分组 , 分别是0和1n” +
” t2_row = t2.groupby(t2.col_20).agg({“*” : “count”}).cache()n” +
” t1_row = t1.groupby(t1.col_20).agg({“*” : “count”}).cache()n” +
” n” +
” n” +
” t2_0 = t2_row.filter(t2_row.col_20 == 1).collect()[0][“count(1)”]n” +
” t2_1 = t2_row.filter(t2_row.col_20 == 0).collect()[0][“count(1)”]n” +
” n” +
” t1_0 = t1_row.filter(t1_row.col_20 == 1).collect()[0][“count(1)”]n” +
” t1_1 = t1_row.filter(t1_row.col_20 == 0).collect()[0][“count(1)”]n” +
” n” +
” # 数据拆分算子是根据字段按照1:1的比例进行拆分的 。所以t1和t2的每一个分组n” +
” # 都应该只有原始数据量的一半n” +
” if t2_0/2 – t1_0 >1:n” +
” raise Run财路哥Error(“the 0 class is not splited correctly”)n” +
” n” +
” if t2_1/2 – t1_1 >1:n” +
” raise Run财路哥Error(“the 1 class is not splited correctly”)n” +
“n” +
” return [t1]”;
我们用来扫描数据表的API仍然是我们之前提到的dataframe 。上面的代码片段是我们嵌入spark任务的脚本 。里面t1和t2都是dataframe ,  分别代表原始数据和经过数据拆分算法拆分后的数据 。测试的功能是分层拆分 。也就是按某一列按比例抽取数据 。比如说100W行的数据 , 我按job这个字段分层拆分 ,  我要求的比例是30% 。也即是说每种职业抽取30%的数据出来 , 相当于这是一个数据采样的功能 。OK ,  所以在测试脚本中 , 我们分别先把原始表和经过采样的表按这一列进行分组操作 ,  也就是groupby(col_20) 。这里我选择的是按col_20进行分层拆分 。根据刚才讲的这样的分组操作后会触发shuffle , 把有相同职业的数据传到一个数据分片上 。然后我们做count这种操作统计每一个组的行数 。因为这个算法我是按1:1拆分的 , 也就是按50%采样 。所以最后我要验证拆分后的数据的每一组的行数都是原始数据中该组的一半 。


以上关于本文的内容,仅作参考!温馨提示:如遇健康、疾病相关的问题,请您及时就医或请专业人士给予相关指导!

「四川龙网」www.sichuanlong.com小编还为您精选了以下内容,希望对您有所帮助: