【数仓查漏补缺(一)】HiveSql回顾
基础语法Create语句
完整的Hive的建表语句
分区, 分桶, 切割, 存储方式, 存储位置, 表属性
1234567891011create [external] table 表名( 字段名 字段类型 Comment'字段的描述信息', 字段名 字段类型 Comment'字段的描述信息', 字段名 字段类型 Comment'字段的描述信息' -- 最后一行没有逗号)Comment '表的描述信息'partitioned by 中不存在的字段 字段类型) -- 分区clustered by (表中已有字段) sorted by (表中已有字段 desc/asc) into 桶的个数 buckets -- 分桶row format delimited fields terminated by ',' -- 切割stored as orc -- (stored as TestFile 行存储方式) orc是列存储方式location 'HD ...
【SQL刷题本(一)】SQL连接查询及子查询
组合两个表
编写一个SQL查询来报告 Person 表中每个人的姓、名、城市和州。如果 personId 的地址不在 Address 表中,则报告为空 null 。以 任意顺序 返回结果表。
案例12345678910111213141516171819202122232425输入: Person表:+----------+----------+-----------+| personId | lastName | firstName |+----------+----------+-----------+| 1 | Wang | Allen || 2 | Alice | Bob |+----------+----------+-----------+Address表:+-----------+----------+---------------+------------+| addressId | personId | city | state |+-----------+--------- ...
【SparkStreaming】SparkStreaming和Kafka整合
将从Kafka中读取的binary二进制数据(DataFrame结构)转为String类型的数据, 方便后续的处理
这里有两种方式
DSL风格-传统方式
select(col("value").cast(StringType()))
DSL风格使用selec表达式替换
selectExpr("cast(value as string)")
SparkStreaming和Kafka整合
无非就是Kafka中的几个组件(生产者, 消费者, topic, partition)和Streaming进行整合
这里的重点就是理解Kafka中的原理
从Kafka中读取数据
订阅一个主题
123456df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", ...
【SparkStreaming】初识Spark实时框架
从socket套接字中读取数据(被称为Socket Source)
12345input_df = spark \.readStream \.format("数据来源类型") \.option("host", "地址") \.load()
实时的从文件中读取数据(被称为File Source)
123456input_df = spark\ .readStream\ .format('csv')\ .option('sep', ';') \ .schema('naem string, age int, hobby string')\ .load('file:///root/streaming')
实时的将数据写出到文件中(被称为File Sink)
1234567rs_df\ .writeStream\ .outputMode('append& ...
【Spark笔耕不辍(二)】Kafka
置顶URL: https://blog.csdn.net/weixin_46244703?type=blog
消息队列什么是消息队列
消息队列MQ(Message Queue)用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存
消息队列的作用
消息队列可以实现两个模块之间的异步通信,并降低模块之间的耦合性,并可以限流削峰
由于消息队列是连接两个模块的桥梁,至关重要,因此消息队列都是采用分布式架构来保证数据的安全性和可靠性
常见的消息队列:
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ、Pulsar
发布和订阅介绍
1、发布者又称生成者,多个生成者都可以向消息队列生成数据,为了区分不同的消息,每个消息都有主题
2、订阅者又称消费者,每个消费者可以消费或者订阅多个主题的消息
3、消费者消费完消息之后,队列中的消息并不会立刻消失
Kafka什么是KafkaKafka是基于分布式的流式处理框架,主要用于实时分析
Kafka是基于发布订阅模式的消息队列,有生产者 消费者 主题 队列等
Kafka的核心组件
Broker1231、搭建Ka ...
【Spark笔耕不辍(三)】Kafka生产者消费者API及核心原理
RetrospectKafka常用命令
创建主题:指定分区数和副本数
topic bigdata01 主题的名字
partitions 3 分区的个数
replication-factor 2 副本个数
bootstrap-server kafka内部服务器通信地址,端口默认是9092
创建主题:不指定分区数和副本数,默认是1个分区,1个副本
查看所有主题
查看某一个主题的详情(关键字describe)
12345678910111213141516171819202122# 创建主题:指定分区数和副本数kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092# 创建主题:不指定分区数和副本数,默认是1个分区,1个副本kafka-topics.sh --create --topic test1 --bootstrap-server node1:9092, ...
【SparkSQL笔耕不辍(一)】UDF及新零售案例
Spark连接Hive
Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
本质上:SparkSQL访问了Metastore服务获取了Hive元数据,基于元数据提供的地址进行计算
命令行集成
step1:第一台机器启动HDFS和Hive的Metastore服务
1234567# 不管你其他服务什么模式运行 只要你是以hive 是使用hdfs 就需要把3台虚拟机都启动# 启动HDFS服务:NameNode和DataNodesstart-dfs.sh # 启动HiveMetaStore 服务start-metastore.sh
step2:在Spark中构建配置文件指定metastore地址【集群模式所有节点都必须配】
==spark local模式==:只需要在node1配置hive metastore服务地址即可
spark集群模式:standalone、yarn 、mesos 需要在集群中每个节点上都添加hive metastore服务地址
123456789 ...
【SparkSQL】Spark读取外部文件及写出数据(附开窗函数)
这里写一些SQL中的难点
炸裂函数
spark.sql(
"""
with t2 as(
select explode(split(value,' ')) as word from t1 where length (value) > 0
)
select word, count(*) cnt from t2 group by word order by cnt
"""
).show() # Explode函数里面的值必须为map类型或者array类型, 也就是列表
12345678910111213141516171819202122232425- <font size=4 color=red face='华文楷体'>开窗函数</font> - SQL风格- ```python # SQL风格 - 创建临时视图, 写SQL emp_df.createO ...
【SparkSQL】Spark自定义函数
自定义Spark函数函数的分类UDF:一对一的函数【User Defined Functions】
substr、length
UDAF:多对一的函数【User Defined Aggregation Functions】
count、sum、max、min、avg
UDTF:一对多的函数【User Defined Tabular Functions】
explode
函数的定义方式
1、register方式定义的函数既可以用于SQL风格,也可以用于DSL风格
2、udf和pandas_df定义的函数只能用于DSL风格
需求
原始数据:datas/udf/music.tsv
12301 周杰伦 150/17502 周杰 130/18503 周华健 148/178
目标结果
12301 周杰伦 150斤/175cm02 周杰 130斤/185cm03 周华健 148斤/178cm
udf注册方式定义UDF函数1234# 导包:DSL函数库import pyspark.sql.functions as F# 定义UDF变量 ...
【SparkSQL】SparkSQL词频统计Demo
SparkSQL的概念
1、SparkSQL是Spark发展后期产生的,是为了使用SQL风格来替换之前SparkCore的RDD风格
2、SparkSQL既可以做离线,也可以做实时
3、SparkSQL的编程有两种风格:SQL风格、DSL分格
SparkSQL和SparkCore区别
1、SparkCore的核心数据类型是RDD,SparkSQL核心数据类型是DataFrame
2、SparkCore的核心入口类是SparkContext、SparkSQL的核心入口类是:SparkSession
3、SparkSQL是基于SparkCore,SparkSQL代码底层就是rdd
4、SparkCore只侧重数据本身,没有表概念,SparkSQL要侧重:数据+表结构
SparkSQL的SQL风格词频统计
SparkSQL的入口类为SparkSession, 导包方式: from pyspark.sql import SparkSession
创建SparkSQL的入口类对象
spark = SparkSession \
.builder \
.app ...
