Hadoop简介

  • Hadoop是一个由Apache基金会所开发的分布式系统基础架构
  • 主要解决,海量数据的存储和海量数据的分析计算问题
  • 广义上来说,Hadoop通常是指一个更广泛的概念-Hadoop生态圈

分布式存储

Hadoop 的分布式存储主要基于 HDFS(分布式文件系统):
HDFS将数据分割成多个数据块(block),这些数据块分散存储在集群中的不同节点上。每个数据块会有多个副本,通常默认是 3 个副本.采用分布式存储在不同的节点上,提高了数据的可靠性和容错性。

image-20250701140958694

Hadoop的分布式核心组件是MapReduce编程模型:
在MapReduce任务中,数据被切分为多个任务,每个任务由或多个节点并行。每个节点负责将输入数据映射为键-值对生成中间结果。最后,中间结果按照键的排序进行合并和归并。

image-20250701141052158

image-20250701141243480

Hadoop组件(面试重点)

hadoop问答小测验

HDFS 架构概述

HDFS组件用于存储数据,主要由NameNode,DataNode,SecondaryNameNode 组成

  • NameNode (nn): 存储文件的元数据,如文件名,文件目录结构,文件属性 (生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等。
  • DataNode(dn): 在本地文件系统存储文件块数据,以及块数据的校验。
  • SecondaryNameNode(2nn): 每隔一段时间对NameNode元数据进行备份

Yarn 架构概述

Yet Another Resource Negotiator 简称YARN ,另一种资源协调者,是Hadoop 的资源管理器。

Yarn资源调度负责硬件资源管理,主要由:ResourceManager,NodeManager,ApplicationMaster组成

  • ResourceManager (资源管理器):.YARN集群中的中心调度器和资源管理器。负责整个集群的资源分配和调度 监控集群中的计算资源任务的运行状态
  • NodeManager (节点管理器):单个节点服务器资源的管理者。每个计算节点上运行的代理程序负责管理和监控节点上的资源和任务。接收来自RM的任务调度请求;启动、停止和监控任务的执行;发送节点的状态和可用资源报告
  • ApplicationMaster (应用程序管理器):单个任务运行的管理者。每个应用程序在YARN中都有一个对应的AM.AppMaster负责协调和管理应用程序的执行。它与RM交互申请资源并监任务的执行。它还负责任务的划分和调度、容错和恢复、进度跟踪等。
  • Container:容器,相当于一台独立的服务器,里面封装了任务运行所需要的资源,如内存、CPU、磁盘、网络等。

说明:
(1)客户端可以有多个
(2)集群上可以运行多个ApplicationMaster
(3)每个NodeManager上可以有多个Container

MapReduce 架构概述

MapReduce 将计算过程分为两个阶段:Map 和Reduce

  1. Map 阶段并行处理输入数据
  2. Reduce 阶段对Map 结果进行汇总

HDFS、YARN、MapReduce 三者关系

大数据技术生态体系


图中涉及的技术名词解释如下:

  1. Sqoop:Sqoop 是一款开源的工具,主要用于在Hadoop、Hive 与传统的数据库(MySQL)间进行数据的传递,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进到Hadoop 的HDFS 中,也可以将HDFS 的数据导进到关系型数据库中。
  2. Flume:Flume 是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume 支持在日志系统中定制各类数据发送方,用于收集数据。
  3. Kafka:Kafka 是一种高吞吐量的分布式发布订阅消息系统。
  4. Spark:Spark 是当前最流行的开源大数据内存计算框架。可以基于Hadoop 上存储的大数据进行计算。
  5. Flink:Flink 是当前最流行的开源大数据内存计算框架。用于实时计算的场景较多。
  6. Oozie:Oozie 是一个管理Hadoop 作业(job)的工作流程调度管理系统。
  7. Hbase:HBase 是一个分布式的、面向列的开源数据库。HBase 不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。
  8. Hive:Hive 是基于Hadoop 的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL 查询功能,可以将SQL 语句转换为MapReduce 任务进行运行。其优点是学习成本低,可以通过类SQL 语句快速实现简单的MapReduce 统计,不必开发专门的MapReduce 应用,十分适合数据仓库的统计分析。
  9. ZooKeeper:它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、分布式同步、组服务等。

推荐系统框架图

推荐系统项目框架

三个虚拟机配置分布式(环境搭建:开发重点)

image-20250626092139322

安装步骤

  1. 操作系统环境

    1. 操作系统以及软件系统环境搭建
    2. ssh免密操作
  2. hadoop软件

    1. 软件安装

    2. 集群配置

环境配置

  1. 打开虚拟机->2.虚拟网络编辑->3.更改配置->

  2. image-20250626093107076

  3. 本机网络更改适配器选项,找到

  4. image-20250626093422694

  5. 新建虚拟机->稍后安装操作系统->虚拟机名称改为hadoop100,位置改为D:vmware/hadoop100,image-20250626094736868

image-20250626094807272image-20250626094854252image-20250626094926569

启动虚拟机

  1. 最小化安装

  2. 网络配置

    image-20250626095142289

image-20250626095226047

开始安装->用户和密码都改为root

软件安装

软件准备/上传文件

  1. jdk-8u212-linux-x64.tar.gz

  2. hadoop-3.1.3.tar.gz

  3. CentOS-Base.repo

hadoop安装过程

系统环境配置

  1. 下载源的调整

    1
    2
    3
    4
    5
    6
    #1.拷贝一份新的阿里云的 下载源 到 /etc/yum.repos.d/下 
    mv /etc/yum.repos.d/CentOS-Base.repo
    #2.清空原下载池
    sudo yum clean all
    #3. 加载新源
    sudo yum makecache
  2. 安装epel-release(软件仓库)

    1
    yum install -y epel-release
  3. 安装必要工具

    1
    yum install -y net-tools rsync vim wget ntp
  4. 关闭防火墙

    1
    2
    systemctl stop firewalld
    systemctl disable firewalld.service
  5. 关闭selinux

    1
    2
    3
    4
    vim /etc/selinux/config
    # 将SELINUX=enforcing改为
    SELINUX=disabled
    #wq保存

软件安装

1
2
3
#配置主机名映射
vim /etc/hosts
ip hostname
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
# 配置环境变量
vim /etc/profile.d/my_env.sh

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

vim /etc/profile.d/my_env.sh

#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

#更新环境变量
source /etc/profile
#测试环境变量配置
hadoop version

主机克隆操作

1
2
3
4
5
vim /etc/sysconfig/network-scripts/ifcfg-ens33

IPADDR="192.168.200.102"

hostnamectl set-hostname hadoop101

SSH协议免密配置

1
2
3
4
#生成密钥对
ssh-keygen -t rsa
#传输密钥
ssh-copy-id hadoop100

image-20250701141804317

配置集群文件

core-site.xml

1
vim core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- 指定NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop100:8020</value>
</property>

<!-- 指定hadoop数据的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>

<!-- 配置HDFS网页登录使用的静态用户为root -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>root</value>
</property>
</configuration>

hdfs-site.xml

1
vim hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- nn web端访问地址-->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop100:9870</value>
</property>
<!-- 2nn web端访问地址-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop102:9868</value>
</property>
</configuration>

yarn-site.xml

1
vim yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定MR走shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 指定ResourceManager的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop101</value>
</property>

<!-- 环境变量的继承 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>

mapred-site.xml

1
vim mapred-site.xml
1
2
3
4
5
6
7
8
9
10
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<!-- 指定MapReduce程序运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

workers

1
vim /opt/module/hadoop-3.1.3/etc/hadoop/workers
1
2
3
hadoop100
hadoop101
hadoop102

格式化hdfs文件系统(谨慎使用)

1
2
3
4
5
hdfs namenode -format
#hdfs 安全模式 关闭
hdfs dfsadmin -safemode leave
#hdfs 安全模式 强制关闭
hdfs dfsadmin -safemode forceExit

启动集群

start-dfs.sh stop-dfs.sh

1
2
3
4
5
6
7
8
vim sbin/start-dfs.sh 
vim sbin/stop-dfs.sh


HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root

start-yarn.sh stop-yarn.sh

1
2
3
4
5
6
vim sbin/start-yarn.sh
vim sbin/stop-yarn.sh

YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root

配置历史服务器

mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vim mapred-site.xml
mapred --daemon start historyserver
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop100:10020</value>
</property>


<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop100:19888</value>
</property>

配置历史聚集

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
vim yarn-site.xml
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop101:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>

启停脚本

myhadoop.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi


case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="


echo " --------------- 启动 hdfs ---------------"
ssh hadoop100 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh hadoop101 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh hadoop100 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;


"stop")
echo " =================== 关闭 hadoop集群 ==================="


echo " --------------- 关闭 historyserver ---------------"
ssh hadoop100 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh hadoop101 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh hadoop100 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
"status")
echo " =================== hadoop集群状态 ==================="
for host in hadoop100 hadoop101 hadoop102
do
echo =============== $host ===============
ssh $host jps
done
;;
*)


;;
esac

同步脚本

/root/bin目录下创建xsync文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
echo "请输入文件目录的路径"
exit;
fi


#2. 遍历集群所有机器
for host in hadoop100 hadoop101 hadoop102
do


echo ==================== $host ====================
#3. 遍历所有⽬录,挨个发送
for file in $@
do
#4. 判断⽂件是否存在
if [ -e $file ]
then
#5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6. 获取当前⽂件的名称
fname=$(basename $file)
rsync -av $pdir/$fname $host:$pdir
else
echo "文件不存在 $file"
fi
done
done
xsync abc

修改脚本xsync具有执行权限

1
chmod +x xsync

结果截图

image-20250701144157828

image-20250701144252469

image-20250701144309728

hadoop大数据平台-hive组件部署介绍

image-20250701143412258

Hadoop平台-进程启停命令

1
2
3
4
5
6
#1.日志服务启停命令
mapred --daemon start historyserver
#2.HDFS文件系统服务启停命令
hdfs --daemon start namenode/datanode/secondarynamenode
#4.Yarn服务启停命令
yarn --daemon start/stop resourcemanager/nodemanager

Hadoop平台-HDFS文件系统命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]:  #查看指定路径的文件或目录列表。
-mkdir [-p] <path> ...: #创建新的文件夹。
-moveFromLocal <localsrc> ... <dst>: #从本地移动文件到HDFS。
-moveToLocal <src> <localdst>: #将文件从HDFS移动到本地。
-mv <src> ... <dst>: #在HDFS文件系统内移动文件。
-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>: #上传文件到HDFS。
-renameSnapshot <snapshotDir> <oldName> <newName>: #重命名指定目录下的快照。
-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...: #删除文件或目录(只能删除空文件夹)。
-rmdir [--ignore-fail-on-non-empty] <dir> ...: #删除空文件夹,可以使用`--ignore-fail-on-non-empty`选项删除非空文件夹。
-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]: #设置文件或目录的ACL(访问控制列表)。
-setfattr {-n name [-v value] | -x name} <path>: #设置文件或目录的扩展属性。
-setrep [-R] [-w] <rep> <path> ...: #设置文件的副本数量。
-stat [format] <path> ...: #显示文件或目录的状态信息。
-tail [-f] [-s <sleep interval>] <file>: #查看文件的末尾部分内容。
-test -[defsz] <path>: #测试文件的存在性、目录的空或非空等属性。
-text [-ignoreCrc] <src> ...: #以文本形式查看文件的内容。
-touch [-a] [-m] [-t TIMESTAMP ] [-c] <path> ...: #创建一个空文件或者更新已有文件的时间戳。
-touchz <path> ...: #创建一个空文件。
-truncate [-w] <length> <path> ...: #清空文件内容或者将文件截断到指定的长度。
-usage [cmd ...]: #显示HDFS命令的用法信息。

-appendToFile <localsrc> ... <dst>: #将本地目录追加到HDFS下的某个文件中。
-cat [-ignoreCrc] <src> ...: #查看某个文件的内容。
-checksum <src> ...: #计算并确认文件的校验和。
-chgrp [-R] GROUP PATH...: #修改文件或目录的所属用户组。
-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...: #修改文件或目录的权限。
-chown [-R] [OWNER][:[GROUP]] PATH...: #修改文件或目录的所属用户。
-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>: #从本地复制文件到HDFS。
-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>: #从HDFS复制文件到本地。
-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] <path> ...: #统计文件或目录的数量。
-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>: #在HDFS文件系统中拷贝文件。
-createSnapshot <snapshotDir> [<snapshotName>]: #创建指定目录的快照。
-deleteSnapshot <snapshotDir> <snapshotName>: #删除指定目录下的快照。
-df [-h] [<path> ...]: #查看文件系统剩余空间。
-du [-s] [-h] [-v] [-x] <path> ...: #计算文件或目录的磁盘使用情况。
-expunge: #清空HDFS垃圾箱。
-find <path> ... <expression> ...: #在指定路径下查找符合条件的文件。
-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>: #从HDFS下载文件到本地。
-getfacl [-R] <path>: #获取文件或目录的ACL(访问控制列表)信息。
-getfattr [-R] {-n name | -d} [-e en] <path>: #获取文件或目录的扩展属性信息。
-getmerge [-nl] [-skip-empty-file] <src> <localdst>: #将多个文件合并为一个文件并下载到本地。
-head <file>: #查看文件的开头部分内容。
-help [cmd ...]: #获取HDFS命令的帮助。


Hadoop平台-HDFS优缺点

HDFS组件用于存储数据,主要由*NameNode,DataNode,SecondaryNameNode 组成

image-20250701144959041

Hadoop平台-HDFS读取流程

image-20250701145056181

Hadoop平台-NameNode更新流程

image-20250701145144059

Hadoop平台-yarn工作流程

image-20250701145303204

Hadoop平台-mapreduce工作流程

image-20250701145353344

什么是HIVE

Hive 是基于 Hadoop 的一个数据仓库工具。以下是具体介绍:

  • 功能特点:Hive 可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,能将 SQL 语句转换为 MapReduce 任务进行运行。它允许熟悉 SQL 的用户方便地查询数据,也支持熟悉 MapReduce 的开发者自定义 mapper 和 reducer,以处理复杂的分析工作。
  • 优势:学习成本低,通过类 SQL 语句可快速实现简单的 MapReduce 统计,无需开发专门的 MapReduce 应用,十分适合数据仓库的统计分析。
  • 应用场景:常用于对时效性要求不高的数据分析场景。由于 Hive 底层依赖 Hadoop 的 HDFS 存储数据,利用 MapReduce 进行计算,因此能够处理大规模的数据,在处理海量结构化日志的数据统计等方面应用广泛。
  • 与数据库的区别
    1. 数据库一般用于在线应用,支持对某一行或某些行数据的更新、删除等操作,采用 “写时模式”,数据加载慢但查询快。
    2. 而 Hive 不支持对具体行的操作,也不支持事务和索引,采用 “读时模式”,适合处理非结构化或存储模式未知的数据,更侧重于对海量数据的批量处理和分析。

HIVE安装

配置mysql安装源

(在线安装方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#下载安装源
wget https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm

# 安装 mysql 源
yum localinstall mysql57-community-release-el7-11.noarch.rpm
# 导入key
rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
# 修改国内源
vim /etc/yum.repos.d/mysql-community.repo
修改 baseurl 为 https://mirrors.cloud.tencent.com/mysql/yum/mysql-5.7-community-el7-x86_64/


#安装mysql
yum install -y mysql-community-server
  1. 安装mysql (本地安装方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99


00#rpm包安装
#tar -zvxf mysql-5.7.22-linux-glibc2.12-x86_64.tar.gz -C /usr/local/mysql


#2.解压:
cd /usr/local
tar -zxvf mysql-5.7.22-linux-glibc2.12-x86_64.tar
mv mysql-5.7.22-linux-glibc2.12-x86_64 mysql-5.7.22
ln -s mysql-5.7.22 mysql


#3.添加用户组和用户
#添加用户组
groupadd mysql
#添加用户mysql 到用户组mysql
useradd -g mysql mysql
#4.安装
cd /usr/local/mysql
mkdir data
chown -R mysql:mysql ./
./bin/mysqld --initialize --user=mysql --basedir=/usr/local/mysql/m'y's'q --datadir=/usr/local/mysql/data


#将mysql/目录下除了data/目录的所有文件,改回root用户所有
chown -R root .
#mysql用户只需作为mysql-5.7.22/data/目录下所有文件的所有者
chown -R mysql data


#5.复制启动文件
cp support-files/mysql.server /etc/init.d/mysqld
chmod 755 /etc/init.d/mysqld
cp bin/my_print_defaults /usr/bin/


#6.修改启动脚本
vi /etc/init.d/mysqld
#修改项:
basedir=/usr/local/mysql-5.7.22/
datadir=/usr/local/mysql-5.7.22/data
port=3306
#加入环境变量,编辑 /etc/profile,这样可以在任何地方用mysql命令了
vi ~/.bash_profile
#添加mysql路径,加入下面内容,按ESC-->:wq保存
export PATH=$PATH:/usr/local/mysql-5.7.22/bin
#刷新立即生效
source ~/.bash_profile


#7.修改mysql配置项


vi /etc/my.cnf
#配置如下:


[mysqld]
basedir = /usr/local/mysql
datadir = /usr/local/mysql/data
socket = /tmp/mysql.sock
user = mysql
tmpdir = /tmp
symbolic-links=0


[mysqld_safe]


log-error = /usr/local/mysql/data/error.log
pid-file = /usr/local/mysql/data/mysql.pid
#!includedir /etc/my.cnf.d


#8.启动mysql
service mysqld start


#如启动失败,删除 /usr/local/mysql-5.7.22/data下所有文件,重新执行./bin/mysqld --initialize --user=mysql --basedir=/usr/local/mysql --datadir=/usr/local/mysql/data,再启动


#9.进入mysql修改初始密码,修改远程连接的用户权限问题
mysql -uroot -p


ALTER USER 'root'@'localhost' IDENTIFIED BY 'root';


use mysql;
UPDATE user SET host='%' WHERE user='root';
flush privileges;


#开机自启动
chkconfig --add mysqld
chkconfig mysqld on
chkconfig --list
mysqld 0:关 1:关 2:开 3:开 4:开 5:开 6:关
  1. 配置mysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#启动mysql 服务
systemctl start mysqld
#设置开机启动
systemctl enable mysqld
# 重载所有修改过的配置文件
systemctl daemon-reload
# 关闭密码验证
vim /etc/my.cnf
# mysqld选项下加入
validate_password=OFF
#重启mysql服务
systemctl restart mysqld
# 得到临时密码
grep 'temporary password' /var/log/mysqld.log
#登录mysql
mysql -uroot -p
# 开发环境 修改密码为root
ALTER USER 'root'@'localhost' IDENTIFIED BY 'root';
#修改root账号登录限制
use mysql;
UPDATE user SET host='%' WHERE user='root';
flush privileges;
#创建metastore元数据库
create database metastore
DEFAULT CHARACTER SET utf8
DEFAULT COLLATE utf8_general_ci;

安装HIVE

1
2
3
4
5
6
7
8
9
10
#解压apache-hive-3.1.2-bin.tar.gz到/opt/module/目录下面
tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /opt/module/
#修改`/etc/profile.d/my_env.sh`,添加环境变量
vim /etc/profile.d/my_env.sh
#添加内容
#HIVE_HOME
export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin
export PATH=$PATH:$HIVE_HOME/bin
#刷新环境变量
source /etc/profile
  1. hive基础配置
1
2
3
4
5
#解决日志Jar包冲突
mv $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.jar $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.bak

#拷贝mysql-connector.jar到lib库
cp /opt/software/mysql-connector-java-5.1.27-bin.jar $HIVE_HOME/lib
  1. 配置hive-site.xml

    挑转到/opt/module/apache-hive-3.1.2-bin/conf/目录新建文件 hive-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<?xml version="1.0" encoding="utf-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


<configuration>
<!--jdbc连接的URL-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop100:3306/metastore?useSSL=false</value>
</property>
<!--jdbc连接的Driver-->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<!--jdbc连接的username-->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<!--jdbc连接的password-->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>
<!--Hive元数据存储版本的验证-->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<!--元数据存储授权-->
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
<!--Hive默认在HDFS的工作目录-->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<!--取消本地模式改为false-->
<name>hive.exec.mode.local.auto</name>
<value>true</value>
</property>
<property>
<name>mapred.map.child.java.opts</name>
<value>-Xmx2048m</value>
</property>
</configuration>
  1. 初始化Hive元数据库
1
schematool -initSchema -dbType mysql -verbose

优化mapreduce

1
vim $HADOOP_HOME/etc/hadoop/mapred-site.xml

增加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>mapreduce.map.memory.mb</name>
<value>1536</value>
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx1024M</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>3072</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx2560M</value>
</property>

配置beeline

配置core-site.xml 使其任意节点都可以访问hadoop

1
2
3
4
5
6
7
8
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
  1. 启动 hiveserver2
1
2
3
4
#前端启动
hiveserver2
#后台启动
nohup hiveserver2 &
  1. 登录命令
1
2
beeline -u jdbc:hive2://localhost:10000 -n root -p 123456 [密码随意]
beeline -u jdbc:hive2://localhost:10000 -n root -p 123456 -e 'show tables;'
  1. dbeaver登录
  • 获取文件 hadoop-common-3.1.3.jar
  • 获取文件 hive-jdbc-3.1.2-standalone.jar
  • 添加hive数据库链接

hive 数据操作语句

元数据查看语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
--查看数据库
show database db_hive
--过滤查看数据库
show databases like 'db_hive*';
--查看详情
desc database db_hive
desc database extended db_hive;
--查看表
show tables;
--查看表列详情
desc dept;
--查看表所有详细信息
desc extended emp;
show formatted emp;
--查看分区信息
show partitions emp;

建库操作

1
2
3
4
5
-- 创建数据库
CREATE DATABASE [IF NOT EXISTS] database_name
[COMMENT database_comment]
[LOCATION hdfs_path]

建表操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name --内部表
[(col_name data_type [COMMENT col_comment], ...)] --数据类型
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] --分区表
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] --分桶表
[ROW FORMAT DELIMITED --数据格式
[FIELDS TERMINATED BY char] --列分隔
[COLLECTION ITEMS TERMINATED BY char] --复合数据item分隔
[MAP KEYS TERMINATED BY char] --复合数据key分隔
[LINES TERMINATED BY char]] --行分隔
[STORED AS file_format] --压缩格式
[LOCATION hdfs_path] --表数据文件存储路径
[TBLPROPERTIES (property_name=property_value, ...)] --内外部表转换
[AS select_statement]

上传数据

1
2
3
4
5
6
7
8
9
10
11
--load
load data [local] inpath '数据的path' [overwrite] into table student [partition (partcol1=val1,…)];

--上传hdfs
dfs -put /opt/module/hive/datas/student.txt /user/atguigu/hive;

--插入数据
insert into table student_par values(1,'wangwu'),(2,'zhaoliu');

--覆盖插入并且使用结果集进行插入
insert overwrite table student_par select id, name from student ;

下载数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
insert overwrite local directory '数据的path'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' dql_command;

--hadoop 导出
dfs -get /user/hive/warehouse/student/student.txt
/opt/module/datas/export/student3.txt;

--hive shell导出
bin/hive -e 'select * from default.student;' >
/opt/module/hive/datas/export/student4.txt;

--export导出
export table default.student to
'/user/hive/warehouse/export/student';

select语句

1
2
3
4
5
6
7
8
9
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
  FROM table_reference
  [WHERE where_condition]
  [GROUP BY col_list]
  [ORDER BY col_list]
  [CLUSTER BY col_list
    | [DISTRIBUTE BY col_list] [SORT BY col_list]
  ]
 [LIMIT number]

Hive复合数据类型

  • 数组array: array<value数据类型>

    1. 相同数据类型
    2. 有序的排列
    3. 下标为数字
    1
    2
    3
    4
    5
    --查询复合数据
    select a_score[0] from student2

    --构造复合数据-array
    select array(值,值) from student
  • 集合struct: struct<key值:value数据类型,key值:value数据类型>

    1. 预定义个数

    2. 预定义顺序

    3. key预定义

    4. 数据类型可不同

    1
    2
    3
    4
    5
    6
    --查询复合数据
    select s_score.chinese from student2

    --构造复合数据
    select named_struct(key,value,key,value)
    from student
  • 字典map: map<key数据类型,value数据类型>

    1. 标准字典类型

    2. key自定义

    3. 数据类型可不同

    4. 个数不限

    1
    2
    3
    4
    5
    6
    --查询复合数据
    select m_score['语文'] from student2

    --构造复合数据
    select map(key,value,key,value)
    from student

    hive 内置函数

    1
    2
    3
    4
    5
    6
    -- 查看系统自带的函数
    show functions;
    -- 显示自带的函数的用法
    desc function upper;
    -- 详细显示自带的函数的用法
    desc function extended upper;

python连接hive

linux环境安装python

1
2
3
4
5
6
7
8
9
10
11
12
13
# 下载安装包 或上传安装包
wget https://www.python.org/ftp/python/3.9.0/Python-3.9.0.tgz

# 下载
wget https://www.python.org/ftp/python/3.9.9/Python-3.9.9.tgz
# 解压
tar -zxvf Python-3.9.10.tgz -C /opt/module/
# 支撑包
yum install openssl-devel libffi-devel bzip2-devel gcc gcc-c++ wget -y
# 配置
./configure --enable-optimizations
# 安装
make altinstall

配置环境变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 配置环境变量
vim /etc/profile.d/my_env.sh


PATH=$PATH:/opt/python39/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin

#生效 环境变量
source /etc/profile


#或者进行软连接 配置
cp libpython3.9.a /usr/lib64/


ln -s /root/software/python3.9/Python-3.9.10/python /usr/bin/python3
ln -s /opt/module/Python-3.9.10/python /usr/bin/python3

安装pyhive库

1
2
3
4
5
6
7
8
9
10
11
pip3 install  -i https://mirrors.aliyun.com/pypi/simple/  thrift
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ thrift-sasl
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ PyHive
pip3 install -i https://mirrors.aliyun.com/pypi/simple/ PyM

from pyhive import hive
conn=hive.connect(host='localhost',port=10000,username='root',database='db_hive')
cursor=conn.cursor()
sql='show tables'
cursor.execute(sql)
print(cursor.fetchall())

分区表

  • 分区是将一个表或索引物理地分解为多个更小、更可管理的部分。

  • 分区对应用透明,即对访问数据库的应用而言,逻辑上讲只有一个表或一个索引(相当于应用“看到”的只是一个表或索引),但在物理上这个表或索引可能由数十个物理分区组成。

分区应用场景

image-20250701152059542

oracle分区表种类

  1. 范围分区(range)
  2. 列表分区(list)
  3. 散列分区(hash)
  4. 组合组合分区(subpartition)

oracle分区-范围分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE ORDER_ACTIVITIES   
( ORDER_ID NUMBER(7) NOT NULL,
ORDER_DATE DATE,
TOTAL_AMOUNT NUMBER,
CUSTOTMER_ID NUMBER(7),
PAID CHAR(1)
) -- 建表语句不变
PARTITION BY RANGE (ORDER_DATE) -- 范围分区创建 语句 关键字 range(列名)
(
PARTITION ORD_ACT_PART01 VALUES LESS THAN (TO_DATE('01- MAY -2003','DD-MON-YYYY')) TABLESPACE ORD_TS01,
-- 分区名 (时间值)表空间
PARTITION ORD_ACT_PART02 VALUES LESS THAN (TO_DATE('01-JUN-2003','DD-MON-YYYY')) TABLESPACE ORD_TS02,
PARTITION ORD_ACT_PART02 VALUES LESS THAN (MAXVALUE) TABLESPACE ORD_TS03 --使用maxvalue 将其他不符合上述范围的值放入其中
);

oracle分区-列表分区

1
2
3
4
5
6
7
8
CREATE TABLE ORDER_ACTIVITIES  
( PROBLEM_ID NUMBER(7) NOT NULL PRIMARY KEY,
CUSTOMER_ID NUMBER(7) NOT NULL,
STATUS VARCHAR2(20))
PARTITION BY LIST (STATUS) -- 范围分区创建 语句 关键字 LIST(列名)
( PARTITION PROB_ACTIVE VALUES ('ACTIVE') TABLESPACE PROB_TS01, --
PARTITION PROB_INACTIVE VALUES ('INACTIVE','unknow') TABLESPACE PROB_TS02
);

oracle分区-散列分区

1
2
3
4
5
6
7
8
9
10
CREATE TABLE HASH_TABLE   
( COL NUMBER(8),
INF VARCHAR2(100)
)
PARTITION BY HASH (COL) -- 范围分区创建 语句 关键字 HASH(列名)
(
PARTITION PART01 TABLESPACE HASH_TS01,
PARTITION PART02 TABLESPACE HASH_TS02,
PARTITION PART03 TABLESPACE HASH_TS03
)

oracle分区-组合分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE SALES   
(
PRODUCT_ID VARCHAR2(5),
SALES_DATE DATE,
SALES_COST NUMBER(10),
STATUS VARCHAR2(20)
)
PARTITION BY RANGE(SALES_DATE) SUBPARTITION BY LIST (STATUS)
(
PARTITION P1 VALUES LESS THAN(TO_DATE('2003-01-01','YYYY-MM-DD'))TABLESPACE rptfact2009
(
SUBPARTITION P1SUB1 VALUES ('ACTIVE') TABLESPACE rptfact2009,
SUBPARTITION P1SUB2 VALUES ('INACTIVE') TABLESPACE rptfact2009
),
PARTITION P2 VALUES LESS THAN (TO_DATE('2003-03-01','YYYY-MM-DD')) TABLESPACE rptfact2009
(
SUBPARTITION P2SUB1 VALUES ('ACTIVE') TABLESPACE rptfact2009,
SUBPARTITION P2SUB2 VALUES ('INACTIVE') TABLESPACE rptfact2009
)
)

oracle分区-分区表操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 添加分区
ALTER TABLE SALES ADD PARTITION P3 VALUES LESS THAN(TO_DATE('2003-06-01','YYYY-MM-DD'));
--注意:以上添加的分区界限应该高于最后一个 分区界限。
-- 添加了一个P3SUB1子分区
ALTER TABLE SALES MODIFY PARTITION P3 ADD SUBPARTITION P3SUB1 VALUES('COMPLETE');

-- 删除分区
ALTER TABLE SALES DROP PARTITION P3;
ALTER TABLE SALES DROP SUBPARTITION P4SUB1;
-- 注意:如果删除的分区是表中唯一的分区,那么此分区将不能被删除,要想删除此分区,必须删除表。

-- 交换分区
ALTER TABLE table_name EXCHANGE PARTITION partition_name WITH TABLE nonpartition_name;

--将一个分区(子分区)和非分区表进行数据交换,oracle交换的方法是其实是对逻辑存储段进行交换。使用INCLUDEING INDEXES子句可以同步将本地索引也进行交换,使用WITH VALIDATATION子句还可以实现行数据的验证。

--交换分区时如果不带UPDATE INDEXES子句,则全局索引或全局索引基于的分区将变为不可用。

hive分区-创建分区表

在 Hadoop 中,Hive 分区表通常以特定的目录结构来存储。

每个分区对应一个独立的目录,目录名通常包含分区列的值。数据文件会存储在相应的分区目录下。

1
2
3
4
5
6
7
-- 创建分区表
create table dept_partition(
deptno int, dname string, loc string
)
partitioned by (day string)
row format delimited fields terminated by '\t';

hive分区-分区表操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- 分区表数据导入
load data local inpath '/opt/module/hive/datas/dept_20200401.log' into table dept_partition partition(day='20200401');

--select分区表插入数据
insert into table log_list_6 partition(dat='20221231') select * from log_list_tmp
--多表分区插入
from student
insert overwrite table student partition(month='201707')select id, name where month='201707'
insert overwrite table student partition(month='201706')select id, name where month='201706';
-- 查看分区
show partitions tab_name;

--添加分区
alter table dept_partition add partition(day='20200404') ;
--添加多分区
alter table dept_partition add partition(day='20200405') partition(day='20200406');
--删除分区
alter table dept_partition drop partition (day='20200406');
--查看分区表信息
show partitions dept_partition;
--查看分区表结构
desc formatted dept_partition;
--修改分区表
ALTER TABLE table_name PARTITION (dt='2008-08-08') SET LOCATION "new location";
ALTER TABLE table_name PARTITION (dt='2008-08-08') RENAME TO PARTITION (dt='20080808');

超市分区表示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
create table supermarket_p (
id string, -- 行 ID
ord_id string comment '订单 ID',
ord_date string comment '订单日期',
exch_date string comment '发货日期',
exch_type string comment '邮寄方式',
cust_id string comment '客户 ID ',
cust_name string comment '客户名称',
d_type string comment '细分',
city string comment '城市',
prov string comment '省/自治区',
country string comment'国家',
area string comment '地区',
pro_id string comment '产品 ID',
type1 string comment '类别',
type2 string comment '子类别',
pro_name string comment '产品名称',
sales float comment '销售额',
count1 int comment '数量 ',
discount float comment '折扣 ',
profit float comment '利润'
)
partitioned by (c_type1 string)
row format delimited fields terminated by '\t'

动态分区配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

--开启动态分区(默认开启)
set hive.exec.dynamic.partition=true
--指定非严格模式 nonstrict模式表示允许所有的分区字段都可以使用动态分区
set hive.exec.dynamic.partition.mode=nonstrict
--在所有执行MR的节点上,最大一共可以创建多少个动态分区。默认1000
set hive.exec.max.dynamic.partitions=1000
--在每个执行MR的节点上,最大可以创建多少个动态分区(分区字段有多少种设多少个)
set hive.exec.max.dynamic.partitions.pernode=100
--整个MR Job中,最大可以创建多少个HDFS文件。默认100000
set hive.exec.max.created.files=100000
--当有空分区生成时,是否抛出异常
set hive.error.on.empty.partition=false
--打开正则查询模式`(dt|hr)?+.+`
set hive.support.quoted.identifiers=none

分桶表

  • 分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围划分。

  • 分桶是将数据集分解成更容易管理的若干部分的另一个技术。

  • 分区针对的是数据的存储路径;分桶针对的是数据文件

分桶表注意事项

  • 分桶策略

  • Hive的分桶采用对分桶字段的值进行哈希,然后除以桶的个数求余的方 式决定该条记录存放在哪个桶当中

    ==reduce的个数设置为-1,让Job自行决定需要用多少个reduce或者将reduce的个数设置为大于等于分桶表的桶数==

    ==从hdfs中load数据到分桶表中,避免本地文件找不到问题==

    ==不要使用本地模式==

hive分桶表-创建分桶表

1
2
3
4
5
6
7
8
9
10
11
12
--创建4个分桶的分桶表
create table stu_bucket(id int, name string)
clustered by(id)
into 4 buckets
row format delimited fields terminated by '\t';

--设置mapreduce数量(二选一)
set mapreduce.job.reduces=3
set mapred.reduce.tasks=3
--向分桶表导入数据
load data inpath '/student.txt'
into table stu_bucket;

hive排序关键字

image-20250701153814791


##hive****排序语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
--使用order by 排序
select * from student2 order by id
--使用sort by 排序
select * from student2 sort by class_name desc
--使用distribute by 分组
set mapreduce.job.reduces=15;
select * from student2 distribute by class_name sort by id desc

insert overwrite local directory '/root/student2/'
row format delimited fields terminated by '\t'
select * from student2_b
distribute by sex
sort by chinese desc
--使用cluster by 分组并排序
select * from student2 cluster by class_name

使用awk 清洗 log

1
2
3
4
5
6
7
8
9
10
11
cat 2021-05-20.log | awk -F "\"-\"" '{split($1, arr, " ");\
split(substr(arr[4],2),dd,":");\
split(dd[1],ee,"/");\
print arr[1]"\t"ee[1]"-
"ee[2]"-"ee[3]" "dd[2]":"dd[3]":"dd[4]"\t"arr[7]"\t"$2}' | \
awk -F "\t" '{"date -d \""$2"\" +%Y%m%d%H%M%S" | getline d;print
$1"\t"d"\t"$3"\t"$4 }' | \
awk -F "\t" '{print $1"\t"$2"\t"$3"\t"(index($4,"Windows")?"Windows":
(index($4,"Linux")?"Linux":"Mac"))"\t"(index($4,"Chrome")?"Chrome":
(index($4,"Version")?"Safari":(index($4,"Firefox")?"Firefox":"Opera")))}' >
new_2021-05-20.log

sqoop

  1. 安装sqoop

  2. sqoop介绍

    1. Sqoop 是 Apache 旗下一款专为 Hadoop 设计的数据同步工具,全称为 “SQL to Hadoop”,主要用于在关系型数据库(如 MySQL、Oracle)和Hadoop 生态系统(如 HDFS、Hive、HBase)之间高效传输数据。它通过 MapReduce 任务并行处理数据,支持大规模数据的批量导入导出,是 Hadoop 生态中连接结构化数据和非结构化数据的重要桥梁。
    2. 核心功能与特点

      1. 数据导入(Import)
        将关系型数据库中的数据抽取到 Hadoop 中(如 HDFS 存储为文件,或直接导入 Hive 表、HBase 表)。
        • 支持增量导入:可基于时间戳或主键增量同步变化的数据。
        • 并行处理:通过 MapReduce 并行读取数据库分片,提升传输效率。
      2. 数据导出(Export)
        将 Hadoop 中的数据(如 HDFS 文件、Hive 表)写回到关系型数据库。
        • 事务支持:确保导出操作的原子性,失败时可回滚。
      3. 多种数据库支持
        支持主流关系型数据库,如 MySQL、Oracle、PostgreSQL、SQL Server 等,也可通过 JDBC 连接其他数据库。
      4. 元数据映射
        自动将数据库表结构映射为 Hive 表或 HDFS 文件格式(如 Avro、Parquet),简化数据建模。
      5. 与 Hadoop 生态集成
        无缝集成 Hive、HBase、Spark 等组件,可作为 ETL(抽取 - 转换 - 加载)工具链的核心环节。

总结
Sqoop 是 Hadoop 生态中连接结构化数据源(如数据库)与分布式计算平台的关键工具,尤其适合批量数据迁移和周期性 ETL 任务。通过简单的命令行接口,它让数据工程师能够高效地在 Hadoop 与传统 IT 系统间交换数据,降低了大数据应用的集成门槛。

  1. sqoop使用

安装

  1. 配置mysql
1
2
3
4
5
6
7
8
9
10
11
create database test
DEFAULT CHARACTER SET utf8
DEFAULT COLLATE utf8_general_ci;
--创建数据库
show databases;
--创建账号
Create user 'test'@'%' identified by 'test';
Grant all privileges on test.* to test@'%'
identified by 'test' with grant option;

flush privileges;
1
2
3
4
5
6
7
8
9
10
11
12
13
# 1.拷贝安装包以及mysql的jar到/root目录 并解压
tar -zvxf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C /opt/module/

# 2.配置环境变量

vim /etc/profile.d/my_env.sh
# 加入 sqoop 路径
#SQOOP_HOME
export SQOOP_HOME=/opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha
export PATH=$PATH:$SQOOP_HOME/bin


source /etc/profile

安装sqoop

1
2
3
4
5
6
7
8
9
10
11
# 3.配置sqoop
cd $SQOOP_HOME/conf
cp sqoop-env-template.sh sqoop-env.sh

export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin
#export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10

#拷贝 jdbc 驱动到 sqoop 的 lib 目录下
cp mysql-connector-java-5.1.27-bin.jar /opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/lib/

4.测试连接

1
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password root

分区重构

1
2
beeline -u "jdbc:hive2://hadoop100:10000/db_hive" \
--outputformat=csv2 --showHeader=false -n root -p 123456 -e "msck repair table log_sqoop"

DataX

项目地址:https://github.com/alibaba/DataX
官方文档:https://github.com/alibaba/DataX/blob/master/introduction.md

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

安装 datax

确保hadoop集群没有问题

datax.tar.gz 上传到 hadoop100 的 /root目录下, 解压安装

1
2
3
tar -zxvf datax.tar.gz -C /opt/moudule
# 同步到其他机器
xsync /opt/moudule/datax

编写配置文件

开发目录

1
2
3
4
mkdir -p /zhiyun/lijinquan
cd /zhiyun/lijinquan
# 创建几个目录
mkdir jobs sql python shell data

datax的配置文件需要放在 jobs 目录下

1
vim /zhiyun/lijinquan/jobs/c_org_busi.json

加入内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "zhiyun",
"password": "zhiyun",
"column": [
"*"
],
"connection": [
{
"table": [
"c_org_busi"
],
"jdbcUrl": [
"jdbc:mysql://192.168.50.179:3306/his?useSSL=false"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop100:8020",
"fileType": "orc",
"path": "/zhiyun/lijinquan/ods/c_org_busi",
"fileName": "c_org_busi.data",
"column": [
{"name": "col1", "type": "TINYINT" },
..
],
"writeMode": "truncate`",
"fieldDelimiter": "\t"
}
}
}
]
}
}


运行

1
python /opt/datax/bin/datax.py /zhiyun/lijinquan/jobs/c_org_busi.json

列名的处理

1
2
1. 一个列一个列的处理
2. vscode多行编辑 alt+shift+鼠标拖动

抽取 c_org_busi

1
2
3
4
5
您配置的path: [/zhiyun/lijinquan/ods/c_org_busi] 不存在, 请先在hive端创建对应的数据库和表.


datax不会自动创建HDFS上的路径, 需要手动创建
hadoop fs -mkdir -p /zhiyun/lijinquan/ods/c_org_busi

确保抽取成功, 没有报错

Hive建表

1
2
3
4
5
6
7
8
9
10
-- 确保数据库存在
create database if not exists ods_lijinquan location "/zhiyun/lijinquan/ods";
-- 建表
-- ods层的表都应该是外部表
create external table if not exists ods_lijinquan.c_org_busi(
...
) row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/lijinquan/ods/c_org_busi";

验证数据

1
2
select * from ods_lijinquan.c_org_busi limit 1;
select count(1) from ods_lijinquan.c_org_busi;

调度平台的安装

将目录 \07_医药\xxl-job-student-20221220上传到 hadoop100的 /opt目录下

1
2
3
4
5
6
7
8
# 改名
mv /opt/xxl-job-student-20221220 /opt/xxljob
# 导入到mysql
# 确保mysql的密码为root
mysql -uroot -proot
# 在mysql里执行:
source /opt/xxljob/tables_xxl_job.sql;
quit;

编写启停脚本

1
vim /root/bin/xxl

内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/bin/bash
act=$1


start(){
echo "starting xxl-job"
ssh root@hadoop100 "cd /opt/xxljob; nohup java -jar xxl-job-admin-2.3.0.jar > xxl-job.log 2>&1 &"
ssh root@hadoop100 "cd /opt/xxljob; nohup java -jar xxl-job-executor-sample-springboot-2.3.0.jar > xxl-job-executor.log 2>&1 &"
ssh root@hadoop101 "cd /opt/xxljob; nohup java -jar xxl-job-executor-sample-springboot-2.3.0.jar > xxl-job-executor.log 2>&1 &"
ssh root@hadoop102 "cd /opt/xxljob; nohup java -jar xxl-job-executor-sample-springboot-2.3.0.jar > xxl-job-executor.log 2>&1 &"
}


stop(){
echo "stopping xxl-job"
ssh root@hadoop100 "ps -aux | grep xxl-job-admin | grep -v grep | awk '{print \$2}' | xargs kill -9"
ssh root@hadoop100 "ps -aux | grep xxl-job-executor-sample | grep -v grep | awk '{print \$2}' | xargs kill -9"
ssh root@hadoop101 "ps -aux | grep xxl-job-executor-sample | grep -v grep | awk '{print \$2}' | xargs kill -9"
ssh root@hadoop102 "ps -aux | grep xxl-job-executor-sample | grep -v grep | awk '{print \$2}' | xargs kill -9"
}


status(){
echo "=============== hadoop102 ================="
ssh root@hadoop102 "ps -aux | grep xxl-job-executor-sample | grep -v grep"
echo "=============== hadoop101 ================="
ssh root@hadoop101 "ps -aux | grep xxl-job-executor-sample | grep -v grep"
echo "=============== hadoop100 ================="
ssh root@hadoop100 "ps -aux | grep xxl-job | grep -v grep"
}


case $act in
start)
start
status
;;
stop)
stop
status
;;
restart)
stop
start
status
;;
status)
status
;;
esac

加权限

1
chmod +x /root/bin/xxl

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 同步
xsync /opt/xxljob

# 启动
xxl start

=============== hadoop102 =================
root 2340 0.0 1.2 2679420 22952 ? Sl 15:40 0:00 java -jar xxl-job-executor-sample-springboot-2.3.0.jar
=============== hadoop101 =================
root 2293 134 2.2 2679420 42324 ? Sl 15:40 0:01 java -jar xxl-job-executor-sample-springboot-2.3.0.jar
=============== hadoop100 =================
root 20685 146 2.0 2679452 38340 ? Sl 15:40 0:01 java -jar xxl-job-admin-2.3.0.jar
root 20710 150 2.1 2679424 40340 ? Sl 15:40 0:01 java -jar xxl-job-executor-sample-springboot-2.3.0.jar


# 状态
xxl status

等到 8080 端口启动成功后, 可以访问:

http://192.168.200.100:8080/xxl-job-admin

登录 admin / 123456

执行器: 调度平台会随机使用任一执行器去执行任务

Hive 表导出到 MySQL 数据库

test_read_hdfs.json

将 HDFS 中的数据同步到 MySQL 数据库

  1. 从 HDFS 路径 /user/hive/warehouse/db_hive.db/sqoop_emp 读取数据
  2. 按制表符分隔解析文本行,提取 8 个字段
  3. 通过 3 个并发通道将数据传输到 MySQL
  4. 将数据插入到 MySQL 的 test.emp 表中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/db_hive.db/sqoop_emp",
"defaultFS": "hdfs://hadoop100:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
},
{
"index": 7,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "test",
"password": "test",
"column": [
"empno",
"ename",
"job",
"mgr",
"hiredate",
"sal",
"comm",
"deptno"
],
"session": [
"set session sql_mode='ANSI'"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8",
"table": [
"emp"
]
}
]
}
}
}
]
}
}

image-20250707111745049

exp_log.sh

  1. 生成配置文件:根据传入的日期分区参数 $1,动态生成 DataX 配置文件 exp_log.json,配置从 HDFS 读取指定日期分区的数据,并写入 MySQL 表。
  2. 创建目标表:在 MySQL 中创建 log 表(如果不存在),定义五个 VARCHAR 类型的字段用于存储日志数据。
  3. 执行数据同步:调用 DataX 工具执行数据导出任务,将 HDFS 上的文本格式数据(/user/hive/warehouse/db_hive.db/log/dt=$part)按字段映射关系写入 MySQL 的 log 表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/bin/bash
part=$1
# 创建一个json,用来给datax调用
echo '{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/db_hive.db/log/dt='$part'",
"defaultFS": "hdfs://hadoop100:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "test",
"password": "test",
"column": [
"ip",
"date_l",
"url",
"osinfo",
"bowser"
],
"session": [
"set session sql_mode='"'ANSI'"'"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8",
"table": [
"log"
]
}
]
}
}
}
]
}
}' > /root/datax/json/exp_log.json

# 写一个建表语句,再mysql建表
mysql -utest -ptest --database=test -e \
'create table if not exists log (
ip varchar(500),
date_l varchar(500),
url varchar(500),
osinfo varchar(500),
bowser varchar(500)
)'

# 启动datax程序,完成操作
/usr/local/bin/python3.9 /opt/module/datax/bin/datax.py /root/datax/json/exp_log.json

image-20250707111856490

xxl-job-任务调度中心-新增任务管理

image-20250707111920017

GLUE IDE

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
echo "xxl-job: hello shell"

echo "脚本位置:$0"
echo "任务参数:$1"
echo "分片序号 = $2"
echo "分片总数 = $3"
/root/datax/shell/exp_log.sh $1
echo "Good bye!"
exit 0

分别手动执行一次\

image-20250707112014506

mysql.test中查询log数据量

image-20250707112033577

动态分区表导入导出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "test",
"password": "test",
"column": [
"*"
],
"splitPk": "db_id",
"connection": [
{
"querySql": [
"select * from log where osinfo = 'Windows';"
],
"jdbcUrl": [
"jjdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop100:8020",
"fileType": "orc",
"path": "/user/hive/warehouse/db_hive.db/log_tmp",
"fileName": "log_p",
"column": [
{
"name": "ip",
"type": "string"
},
{
"name": "date_l",
"type": "string"
},
{
"name": "url",
"type": "string"
},
{
"name": "osinfo",
"type": "string"
},
{
"name": "browser",
"type": "string"
}
],
"writeMode": "truncate",
"fieldDelimiter": "\t",
"compress": "NONE"
}
}
}
]
}
}

在db_hive中建log_temp表

1
2
3
4
5
6
7
8
9
10
11
12
create table log_tmp(
ip string,
date_l string,
url string,
osinfo string,
browser string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orc

运行脚本
/usr/local/bin/python3.9 /opt/module/datax/bin/datax.py /root/datax/json/imp_log.json

image-20250707112144219

imp_log.sh

  1. JSON 配置文件生成:创建 DataX 任务配置文件 imp_log.json,配置从 MySQL 读取数据并写入 HDFS 的 ORC 文件格式。
  2. Hive 表创建:使用 Beeline 连接 Hive,创建两个 ORC 格式的表:
    • log_tmp:临时表,用于存储从 MySQL 导入的原始数据
    • log_p:分区表,按操作系统类型 (os_tp) 分区
  3. 数据抽取与导入:调用 DataX 工具执行数据同步任务,将 MySQL 的 log 表数据导入到 Hive 的 log_tmp 表。
  4. 动态分区插入:配置 Hive 动态分区参数,将 log_tmp 表的数据按 osinfo 字段的值自动分配到 log_tmp2 表的不同分区中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/bin/bash
#创建json文件
echo '
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "test",
"password": "test",
"column": [
"*"
],
//"splitPk": "db_id",
"connection": [
{
"table": [
"log;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop100:8020",
"fileType": "orc",
"path": "/user/hive/warehouse/db_hive.db/log_tmp",
"fileName": "log_tmp",
"column": [
{
"name": "ip",
"type": "string"
},
{
"name": "date_l",
"type": "string"
},
{
"name": "url",
"type": "string"
},
{
"name": "osinfo",
"type": "string"
},
{
"name": "browser",
"type": "string"
}
],
"writeMode": "truncate",
"fieldDelimiter": "\t",
"compress": "NONE"
}
}
}
]
}
}' > /root/datax/json/imp_log.json

# 建临时表和分区表
beeline -u jdbc:hive2://hadoop100:10000/db_hive -u root -p 545456 -e \
"CREATE TABLE if not exists log_tmp(
ip string,
date_l string,
url string,
osinfo string,
browser string
)
row format delimited
fields terminated by '\t'
lines TERMINATED by '\n'
STORED AS orc;
CREATE TABLE if not exists log_p(
ip string,
date_l string,
url string,
osinfo string,
browser string
)
partitioned by (os_tp string)
row format delimited
fields terminated by '\t'
lines TERMINATED by '\n'
STORED AS orc "
echo '*******************建表完成--***********************'
#导入数据
/bin/python3 /opt/module/datax/bin/datax.py /root/datax/json/imp_log.json
echo '*****************数据导入成功************************'
#动态分区
beeline -u jdbc:hive2://hadoop100:10000/db_hive -u root -p 545456 -e \
"
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=200;
set hive.exec.max.dynamic.partitions.pernode=50;
set hive.exec.max.created.files=1000;
set hive.error.on.empty.partition=false;
set hive.support.quoted.identifiers=none;
insert into log_p partition (os_tp) select l.*,osinfo from log_tmp l;
"
echo '------------------完成------------------'

image-20250707112233843