stable_sort,SQL之外部数据源如何成为在企业开发中的一把利器?
一、简介#
1.1 多数据源支持#
Spark 支持以下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,能够满足绝大部分使用场景。
CSV
JSON
Parquet
ORC
JDBC/ODBC connections
Plain-text files
注:以下所有测试文件均可从本仓库的resources 目录进行下载
1.2 读数据格式#
所有读取 API 遵循以下调用格式:
Copy
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()
// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 读取模式
.option("inferSchema", "true") // 是否自动推断 schema
.option("path", "path/to/file(s)") // 文件路径
.schema(someSchema) // 使用预定义的 schema
.load()
读取模式有以下三种可选项:
读模式 描述
permissive 当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中
dropMalformed 删除格式不正确的行
failFast 遇到格式不正确的数据时立即失败
1.3 写数据格式#
Copy
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //写模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
写数据模式有以下四种可选项:
Scala/Java 描述
SaveMode.ErrorIfExists 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式
SaveMode.Append 数据以追加的方式写入
SaveMode.Overwrite 数据以覆盖的方式写入
SaveMode.Ignore 如果给定的路径已经存在文件,则不做任何操作
二、CSV#
CSV 是一种常见的文本文件格式,其中每一行表示一条记录,记录中的每个字段用逗号分隔。
2.1 读取CSV文件#
自动推断类型读取读取示例:
Copy
spark.read.format("csv")
.option("header", "false") // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST") // 是否快速失败
.option("inferSchema", "true") // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()
使用预定义类型:
Copy
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//预定义数据格式
val myManualSchema = new StructType(Array(
StructField("deptno", LongType, nullable = false),
StructField("dname", StringType,nullable = true),
StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
2.2 写入CSV文件#
Copy
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
也可以指定具体的分隔符:
Copy
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
2.3 可选配置#
为节省主文篇幅,所有读写配置项见文末 9.1 小节。三、JSON#
3.1 读取JSON文件#
Copy
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
需要注意的是:默认不支持一条数据记录跨越多行 (如下),可以通过配置 multiLine 为 true 来进行更改,其默认值为 false。
Copy
// 默认支持单行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//默认不支持多行
{
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
}
3.2 写入JSON文件#
Copy
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
3.3 可选配置#
为节省主文篇幅,所有读写配置项见文末 9.2 小节。
四、Parquet#
Parquet 是一个开源的面向列的数据存储,它提供了多种存储优化,允许读取单独的列非整个文件,这不仅节省了存储空间而且提升了读取效率,它是 Spark 是默认的文件格式。
4.1 读取Parquet文件#
Copy
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
2.2 写入Parquet文件#
Copy
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
2.3 可选配置#
Parquet 文件有着自己的存储规则,因此其可选配置项比较少,常用的有如下两个:
读写操作 配置项 可选值 默认值 描述
Write compression or codec None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy None 压缩文件格式
Read mergeSchema true, false 取决于配置项 spark.sql.parquet.mergeSchema
五、ORC#
ORC 是一种自描述的、类型感知的列文件格式,它针对大型数据的读写进行了优化,也是大数据中常用的文件格式。
5.1 读取ORC文件#
Copy
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
4.2 写入ORC文件#
Copy
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
六、SQL Databases#
Spark 同样支持与传统的关系型数据库进行数据读写。但是 Spark 程序默认是没有提供数据库驱动的,所以在使用前需要将对应的数据库驱动上传到安装目录下的 jars 目录中。下面示例使用的是 Mysql 数据库,使用前需要将对应的 mysql-connector-java-x.x.x.jar 上传到 jars 目录下。
6.1 读取数据#
读取全表数据示例如下,这里的 help_keyword 是 mysql 内置的字典表,只有 help_keyword_id 和 name 两个字段。
Copy
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)
从查询结果读取数据:
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()
//输出
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 1
七、Text#
Text 文件在读写性能方面并没有任何优势,且不能表达明确的数据结构,所以其使用的比较少,读写操作如下:
7.1 读取Text数据#
Copy
spark.read.textFile("/usr/file/txt/dept.txt").show()
7.2 写入Text数据#
Copy
df.write.text("/tmp/spark/txt/dept")
八、数据读写高级特性#
8.1 并行读#
多个 Executors 不能同时读取同一个文件,但它们可以同时读取不同的文件。这意味着当您从一个包含多个文件的文件夹中读取数据时,这些文件中的每一个都将成为 DataFrame 中的一个分区,并由可用的 Executors 并行读取。
8.2 并行写#
写入的文件或数据的数量取决于写入数据时 DataFrame 拥有的分区数量。默认情况下,每个数据分区写一个文件。
8.3 分区写入#
分区和分桶这两个概念和 Hive 中分区表和分桶表是一致的。都是将数据按照一定规则进行拆分存储。需要注意的是 partitionBy 指定的分区和 RDD 中分区不是一个概念:这里的分区表现为输出目录的子目录,数据分别存储在对应的子目录中。
Copy
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
输出结果如下:可以看到输出被按照部门编号分为三个子目录,子目录中才是对应的输出文件。
8.3 分桶写入#
分桶写入就是将数据按照指定的列和桶数进行散列,目前分桶写入只支持保存为表,实际上这就是 Hive 的分桶表。
val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
.......
具体介绍来源于https://www.cnblogs.com/heibaiying/p/11347390.html
简短的英文原版故事?
ass and manA man wanted to buy an ass. He went to the market, and saw a likely one. But he wanted to
test him first. So he took the ass home, and put him into the stable with the other asses.
The new ass looked around, and immediately went to choose a place next to the laziest ass in
the stable. When the man saw this he put a halter on the ass at once, and gave him back to
his owner. The owner felt quite surprised. He asked the man, "Why are you back so soon? Have
you tested him already?" "I don't want to test him any more," replied the man, "From the
companion he chose for himself, I could see what sort of animal he is."
如何入门Python数据分析库Pandas?
关于作者:Python King,Python高手大师
在使用Pandas之前,需要导入Pandas包。惯例是将pandas简写为pd,命令如下:
import pandas as pd
Pandas包含两个主要的数据结构:Series和DataFrame。其中最常用的是DataFrame,下面我们先来学习一下DataFrame。
01 DataFrame入门
DataFrame是一个表格型的数据结构。每列都可以是不同的数据类型(数值、字符串、布尔值等)。
DataFrame既有行索引也有列索引,这两种索引在DataFrame的实现上,本质上是一样的。但在使用的时候,往往是将列索引作为区分不同数据的标签。DataFrame的数据结构与SQL数据表或者Excel工作表的结构非常类似,可以很方便地互相转换。
下面先来创建一个DataFrame,一种常用的方式是使用字典,这个字典是由等长的list或者ndarray组成的,示例代码如下:
data={'A':['x','y','z'],'B':[1000,2000,3000],'C':[10,20,30]}df=pd.DataFrame(data,index=['a','b','c'])df
运行结果如图3-2所示。
▲图3-2
我们可以看到,DataFrame主要由如下三个部分组成。
数据,位于表格正中间的9个数据就是DataFrame的数据部分。索引,最左边的a、b、c是索引,代表每一行数据的标识。这里的索引是显式指定的。如果没有指定,会自动生成从0开始的数字索引。列标签,表头的A、B、C就是标签部分,代表了每一列的名称。下文列出了DataFrame函数常用的参数。其中,“类似列表”代表类似列表的形式,比如列表、元组、ndarray等。一般来说,data、index、columns这三个参数的使用频率是最高的。
data:ndarray/字典/类似列表 | DataFrame数据;数据类型可以是ndarray、嵌套列表、字典等index:索引/类似列表 | 使用的索引;默认值为range(n)columns:索引/类似列表 | 使用的列标签;默认值为range(n)dtype:dtype | 使用(强制)的数据类型;否则通过推导得出;默认值为Nonecopy:布尔值 | 从输入复制数据;默认值为False其中data的数据类型有很多种。
下文列举了可以作为data传给DataFrame函数的数据类型。
可以传给DataFrame构造器的数据:
二维ndarray:可以自行指定索引和列标签嵌套列表或者元组:类似于二维ndarray数据、列表或元组组成的字典:每个序列变成一列。所有序列长度必须相同由Series组成的字典:每个Series会成为一列。如果没有指定索引,各Series的索引会被合并另一个DataFrame:该DataFrame的索引将会被沿用前面生成了一个DataFrame,变量名为df。下面我们来查看一下df的各个属性值。
获取df数据的示例代码如下:
df.values
输出结果如下:
array([['x', 1000, 10], ['y', 2000, 20], ['z', 3000, 30]], dtype=object)
获取df行索引的示例代码如下:
df.index
输出结果如下:
Index(['a', 'b', 'c'], dtype='object')
获取df列索引(列标签)的示例代码如下:
df.columns
输出结果如下:
Index(['A', 'B', 'C'], dtype='object')
可以看到,行索引和列标签都是Index数据类型。
创建的时候,如果指定了列标签,那么DataFrame的列也会按照指定的顺序进行排列,示例代码如下:
df=pd.DataFrame(data,columns=['C','B','A'],index=['a','b','c'])df
运行结果如图3-3所示。
▲图3-3
如果某列不存在,为其赋值,会创建一个新列。我们可以用这种方法来添加一个新的列:
df['D']=10df
运行结果如图3-4所示。
▲图3-4
使用del命令可以删除列,示例代码如下:
del df['D']df
运行结果如图3-5所示。
▲图3-5
添加行的一种方法是先创建一个DataFrame,然后再使用append方法,代码如下:
new_df=pd.DataFrame({'A':'new','B':4000,'C':40},index=['d'])df=df.append(new_df)df
运行结果如图3-6所示。
▲图3-6
或者也可以使用loc方法来添加行,示例代码如下:
df.loc['e']=['new2',5000,50]df
运行结果如图3-7所示。
▲图3-7
loc方法将在后面的内容中详细介绍。
索引的存在,使得Pandas在处理缺漏信息的时候非常灵活。下面的示例代码会新建一个DataFrame数据df2。
df2=pd.DataFrame([1,2,3,4,5],index=['a','b','c','d','z'],columns=['E'])df2
运行结果如图3-8所示。
▲图3-8
如果现在想要合并df和df2,使得df有一个新的列E,那么可以使用join方法,代码如下:
df.join(df2)
运行结果如图3-9所示。
▲图3-9
可以看到,df只接受索引已经存在的值。由于df2中没有索引e,所以是NaN值,而且df2索引为z的值已经丢失了。为了保留df2中索引为z的值,我们可以提供一个参数,告诉Pandas如何连接。示例代码如下:
df.join(df2,how='outer')
运行结果如图3-10所示。
▲图3-10
在上述代码中,how='outer'表示使用两个索引中所有值的并集。连接操作的其他选项还有inner(索引的交集)、left(默认值,调用方法的对象的索引值)、right(被连接对象的索引值)等。
在金融数据分析中,我们要分析的往往是时间序列数据。下面介绍一下如何基于时间序列生成DataFrame。为了创建时间序列数据,我们需要一个时间索引。这里先生成一个DatetimeIndex对象的日期序列,代码如下:
dates=pd.date_range('20160101',periods=8)dates
输出结果如下:
DatetimeIndex(['2016-01-01', '2016-01-02', '2016-01-03', '2016-01-04', '2016-01-05', '2016-01-06', '2016-01-07', '2016-01-08'],dtype='da tetime64[ns]', freq='D')
可以看到,使用Pandas的date_range函数生成的是一个DatetimeIndex对象。date_range函数的参数及说明如下所示:
start:字符串/日期时间 | 开始日期;默认为Noneend:字符串/日期时间 | 结束日期;默认为Noneperiods:整数/None | 如果start或者end空缺,就必须指定;从start开始,生成periods日期数据;默认为Nonefreq:dtype | 周期;默认是D,即周期为一天。也可以写成类似5H的形式,即5小时。其他的频率参数见下文tz:字符串/None | 本地化索引的时区名称normalize:布尔值 | 将start和end规范化为午夜;默认为Falsename:字符串 | 生成的索引名称date_range函数频率的参数及说明如下所示:
B:交易日C:自定义交易日(试验中)D:日历日W:每周M:每月底SM:半个月频率(15号和月底)BM:每个月份最后一个交易日CBM:自定义每个交易月MS:日历月初SMS:月初开始的半月频率(1号,15号)BMS:交易月初CBMS:自定义交易月初Q:季度末BQ:交易季度末QS:季度初BQS:交易季度初A:年末BA:交易年度末AS:年初BAS:交易年度初BH:交易小时H:小时T,min:分钟S:秒L,ms:毫秒U,us:微秒N:纳秒接下来,我们再基于dates来创建DataFrame,代码如下:
df=pd.DataFrame(np.random.randn(8,4),index=dates,columns=list('ABCD'))df
运行结果如图3-11所示。
▲图3-11
有了df,我们就可以使用多个基于DataFrame的内建方法了,下面来看看相关的示例。
按列求总和,代码如下:
df.sum()
输出结果如下:
A 0.241727B -0.785350C -0.547433D -1.449231dtype: float64
按列求均值,代码如下:
df.mean()
输出结果如下:
A 0.030216B -0.098169C -0.068429D -0.181154dtype: float64
按列求累计总和,代码如下:
df.cumsum()
运行结果如图3-12所示。
▲图3-12
使用describe一键生成多种统计数据,代码如下:
df.describe()
运行结果如图3-13所示。
▲图3-13
可以根据某一列的值进行排序,代码如下:
df.sort_values('A')
运行结果如图3-14所示。
▲图3-14
根据索引(日期)排序(这里是倒序),代码如下:
df.sort_index(ascending=False)
运行结果如图3-15所示。
▲图3-15
选取某一列,返回的是Series对象,可以使用df.A,代码如下:
df['A']
输出结果如下:
2016-01-01 -1.1423502016-01-02 -0.8161782016-01-03 0.0302062016-01-04 1.9301752016-01-05 0.5715122016-01-06 0.2204452016-01-07 0.2921762016-01-08 -0.844260Freq: D, Name: A, dtype: float64
使用[]选取某几行,代码如下:
df[0:5]
运行结果如图3-16所示。
▲图3-16
根据标签(Label)选取数据,使用的是loc方法,代码如下:
df.loc[dates[0]]
输出结果如下:
A -1.142350B -1.999351C 0.772343D -0.851840Name: 2016-01-01 00:00:00, dtype: float64
再来看两个示例代码。
df.loc[:,['A','C']]
运行结果如图3-17所示。
▲图3-17
df.loc['20160102':'20160106',['A','C']]
运行结果如图3-18所示。
▲图3-18
需要注意的是,如果只有一个时间点,那么返回的值是Series对象,代码如下:
df.loc['20160102',['A','C']]
输出结果如下:
A -0.816178C -0.595195Name: 2016-01-02 00:00:00, dtype: float64
如果想要获取DataFrame对象,需要使用如下命令:
df.loc['20160102':'20160102',['A','C']]
运行结果如图3-19所示。
▲图3-19
上面介绍的是loc方法,是按标签(索引)来选取数据的。有时候,我们会希望按照DataFrame的绝对位置来获取数据,比如,如果想要获取第3行第2列的数据,但不想按标签(索引)获取,那么这时候就可以使用iloc方法。
根据位置选取数据,代码如下:
df.iloc[2]
输出结果如下:
A 0.030206B 0.759953C -1.446549D -0.874364Name: 2016-01-03 00:00:00, dtype: float64
再来看一个示例:
df.iloc[3:6,1:3]
运行结果如图3-20所示。
▲图3-20
注意:对于DataFrame数据类型,可以使用[]运算符来进行选取,这也是最符合习惯的。但是,对于工业代码,推荐使用loc、iloc等方法。因为这些方法是经过优化的,拥有更好的性能。
有时,我们需要选取满足一定条件的数据。这个时候可以使用条件表达式来选取数据。这时传给df的既不是标签,也不是绝对位置,而是布尔数组(Boolean Array)。下面来看一下示例。
例如,寻找A列中值大于0的行。首先,生成一个布尔数组,代码如下:
df.A>0
输出结果如下:
2016-01-01 False2016-01-02 False2016-01-03 True2016-01-04 True2016-01-05 True2016-01-06 True2016-01-07 True2016-01-08 FalseFreq: D, Name: A, dtype: bool
可以看到,这里生成了一个Series类型的布尔数组。可以通过这个数组来选取对应的行,代码如下:
df[df.A>0]
运行结果如图3-21所示。
▲图3-21
从结果可以看到,A列中值大于0的所有行都被选择出来了,同时也包括了BCD列。
现在我们要寻找df中所有大于0的数据,先生成一个全数组的布尔值,代码如下:
df>0
运行结果如图3-22所示。
▲图3-22
下面来看一下使用df>0选取出来的数据效果。由图3-23可以看到,大于0的数据都能显示,其他数据显示为NaN值。
df[df>0]
运行结果如图3-23所示。
▲图3-23
再来看一下如何改变df的值。首先我们为df添加新的一列E,代码如下:
df['E']=0df
运行结果如图3-24所示。
▲图3-24
使用loc改变一列值,代码如下:
df.loc[:,'E']=1df
运行结果如图3-25所示。
▲图3-25
使用loc改变单个值,代码如下:
df.loc['2016-01-01','E'] = 2df
运行结果如图3-26所示。
▲图3-26
使用loc改变一列值,代码如下:
df.loc[:,'D'] = np.array([2] * len(df))df
运行结果如图3-27所示。
▲图3-27
可以看到,使用loc的时候,x索引和y索引都必须是标签值。对于这个例子,使用日期索引明显不方便,需要输入较长的字符串,所以使用绝对位置会更好。这里可以使用混合方法,DataFrame可以使用ix来进行混合索引。比如,行索引使用绝对位置,列索引使用标签,代码如下:
df.ix[1,'E'] = 3df
运行结果如图3-28所示。
▲图3-28
ix的处理方式是,对于整数,先假设为标签索引,并进行寻找;如果找不到,就作为绝对位置索引进行寻找。所以运行效率上会稍差一些,但好处是这样操作比较方便。
对于ix的用法,需要注意如下两点。
假如索引本身就是整数类型,那么ix只会使用标签索引,而不会使用位置索引,即使没能在索引中找到相应的值(这个时候会报错)。如果索引既有整数类型,也有其他类型(比如字符串),那么ix对于整数会直接使用位置索引,但对于其他类型(比如字符串)则会使用标签索引。总的来说,除非想用混合索引,否则建议只使用loc或者iloc来进行索引,这样可以避免很多问题。
02 Series
Series类似于一维数组,由一组数据以及相关的数据标签(索引)组成。示例代码如下:
import pandas as pds=pd.Series([1,4,6,2,3])s
Out:
0 11 42 63 24 3
在这段代码中,我们首先导入pandas并命名为pd,然后向Series函数传入一个列表,生成一个Series对象。在输出Series对象的时候,左边一列是索引,右边一列是值。由于没有指定索引,因此会自动创建0到(N-1)的整数索引。也可以通过Series的values和index属性获取其值和索引。示例代码如下:
s.values
Out:
array([1, 4, 6, 2, 3], dtype=int64)
s.index
Out:
Int64Index([0, 1, 2, 3, 4], dtype='int64')
当然,我们也可以对索引进行定义,代码如下:
s=pd.Series([1,2,3,4],index=['a','b','c','d'])s
Out:
a 1b 2c 3d 4
在这里,我们将索引定义为a、b、c、d。这时也可以用索引来选取Series的数据,代码如下:
s['a']
Out:
1
s[['b','c']]
Out:
b 2c 3
对Series进行数据运算的时候也会保留索引。示例代码如下:
s[s>1]
Out:
b 2c 3d 4
s*3
Out:
a 3b 6c 9d 12
Series最重要的功能之一是在不同索引中对齐数据。示例代码如下:
s1=pd.Series([1,2,3],index=['a','b','c'])s2=pd.Series([4,5,6],index=['b','c','d'])s1+s2
Out:
a NaNb 6c 8d NaN
Series的索引可以通过赋值的方式直接修改,示例代码如下:
s.index
Out:
Index([u'a', u'b', u'c', u'd'], dtype='object')
s.index=['w','x','y','z']s.index
Out:
Index([u'w', u'x', u'y', u'z'], dtype='object')
s
Out:
w 1x 2y 3z 4
还没有评论,来说两句吧...