spark处理超大文件方法 spark读取hdfs文件规则( 四 )


List<StructField> fields = new ArrayList<>();
String schemaString = “name,age”;
fields.add(DataTypes.createStructField(“name”,
DataTypes.StringType, true));
fields.add(DataTypes.createStructField(“age”,
DataTypes.IntegerType, true));
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD rowRDD = distData.map( record ->{
RandomStringField randomStringField = new
RandomStringField();
randomStringField.setLength(10); BinaryIntLabelField
binaryIntLabelField = new
BinaryIntLabelField();
return RowFactory.create(randomStringField.gen(),
binaryIntLabelField.gen());
});
Dataset dataset =spark.createDataFrame(rowRDD, schema);
dataset.persist();
dataset.show();
DataFrameWriter writer = new DataFrameWriter(dataset);
writer.mode(SaveMode.Overwrite).partitionBy(“age”).
parquet(“/Users/sungaofei/gaofei”);
dataframe中每一个数据都是一行 , 也就是一个Row对象 , 而且dataframe对于每一列也就是每个schema有着严格的要求 。因为它是一个表么 。所以跟数据库的表或者pandas中的表是一样的 。要规定好每一列的schema以及每一行的数据 。所以首先我们先定义好schema ,  定义每个schema的列名和数据类型 。然后通过DataTypes的API创建schema 。这样我们的列信息就有了 。然后是关键的我们如何把一个RDD转换成dataframe需要的Row并且填充好每一行的数据 。这里我们使用RDD的map方法 ,  其实dataframe也是一个特殊的RDD ,  这个RDD里的每一行都是一个ROW对象而已 。所以我们使用RDD的map方法来填充我们每一行的数据并把这一行数据转换成Row对象 。
JavaRDD rowRDD = distData.map( record ->{
RandomStringField randomStringField = new RandomStringField();
randomStringField.setLength(10);
BinaryIntLabelField binaryIntLabelField = new BinaryIntLabelField();
return RowFactory.create(randomStringField.gen(), binaryIntLabelField.gen());
});
因为之前定义schema的时候只定义了两列 ,  分别是name和age 。所以在这里我分别用一个随机生成String类型的类和随机生成int类型的类来填充数据 。最后使用RowFactory.create方法来把这两个数据生成一个Row 。map方法其实就是让使用者处理每一行数据的方法 ,  record这个参数就是把行数据作为参数给我们使用 。当然这个例子里原始RDD的每一行都是当初生成List的时候初始化的index序号 。而我们现在不需要它 ,  所以也就没有使用 。直接返回随机字符串和int类型的数 。然后我们有了这个每一行数据都是Row对象的RDD后 。就可以通过调用下面的API来生成dataframe 。
Dataset dataset =spark.createDataFrame(rowRDD, schema);
分别把row和schema传递进去 , 生成dataframe的表 。最后利用DataFrameWriter保存数据 。
好了 ,  这就是造数的基本原理了 ,  其实也是蛮简单的 。当然要做到严格控制数据分布 , 数据类型 , 特征维度等等就需要做很多特殊的处理 。这里就不展开细节了 。
测试ETL处理的正确性
输入一份数据 , 然后判断输出的数据是否是正确的 。只不过我们这是在大数据量下的处理和测试 , 输入的数据是大数据 , ELT输出的也是大数据 ,  所以就需要一些新的测试手段 。其实这个测试手段也没什么新奇的了 ,  是我们刚才一直在讲的技术 , 也就是spark这种分布式计算框架 。我们以spark任务来测试这些ETL程序 , 这同样也是为了测试自身的效率和性能 。如果单纯使用hdfs client来读取文件的话 ,  扫描那么大的数据量是很耗时的 , 这是我们不能接受的 。所以我们利用大数据技术来测试大数据功能就成为了必然 。当然也许有些同学会认为我只是测试功能么 , 又不是测试算法的处理性能 , 没必要使用那么大的数据量 。我们用小一点的数据 , 比如一百行的数据就可以了 。但其实这也是不对的 ,  因为在分布式计算中 ,  大数量和小数据量的处理结果可能不是完全一致的 ,  比如随机拆分数据这种场景在大数据量下可能才能测试出bug 。而且大数据测试还有另外一种场景就是数据监控 ,  定期的扫描线上数据 , 验证线上数据是否出现异常 。这也是一种测试场景 , 而且线上的数据一定是海量的 。


以上关于本文的内容,仅作参考!温馨提示:如遇健康、疾病相关的问题,请您及时就医或请专业人士给予相关指导!

「四川龙网」www.sichuanlong.com小编还为您精选了以下内容,希望对您有所帮助: