百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

湖仓一体电商项目(四):项目数据种类与采集

lipiwang 2024-11-02 13:39 12 浏览 0 评论

#头条创作挑战赛#

项目数据种类与采集

实时数仓项目中的数据分为两类,一类是业务系统产生的业务数据,这部分数据存储在MySQL数据库中,另一类是实时用户日志行为数据,这部分数据是用户登录系统产生的日志数据。

针对MySQL日志数据我们采用maxwell全量或者增量实时采集到大数据平台中,针对用户日志数据,通过log4j日志将数据采集到目录中,再通过Flume实时同步到大数据平台,总体数据采集思路如下图所示:

;

针对MySQL业务数据和用户日志数据构建离线+实时湖仓一体数据分析平台,我们暂时划分为会员主题和商品主题。下面了解下主题各类表情况。

一、MySQL业务数据

1、配置MySQL支持UTF8编码

在node2节点上配“/etc/my.cnf”文件,在对应的标签下加入如下配置,更改mysql数据库编码格式为utf-8:

[mysqld]
character-set-server=utf8

[client]
default-character-set = utf8

修改完成之后重启mysql即可。

2、MySQL数据表

MySQL业务数据存储在库“lakehousedb”中,此数据库中的业务数据表如下:

2.1、会员基本信息表 : mc_member_info

;

2.2、 会员收货地址表 : mc_member_address

;

2.3、用户登录数据表 : mc_user_login

;

2.4、商品分类表 : pc_product_category

;

2.5、商品基本信息表 : pc_product

;

3、MySQL业务数据采集

我们通过maxwell数据同步工具监控MySQL binlog日志将MySQL日志数据同步到Kafka topic “KAFKA-DB-BUSSINESS-DATA”中,详细步骤如下:

3.1、配置maxwell config.properties文件

进入node3“/software/maxwell-1.28.2”目录,配置config.properties文件,主要是配置监控mysql日志数据对应的Kafka topic,配置详细内容如下:

producer=kafka kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092 kafka_topic=KAFKA-DB-BUSSINESS-DATA #设置根据表将binlog写入Kafka不同分区,还可指定:database, table, primary_key, transaction_id, thread_id, column producer_partition_by=table #mysql 节点 host=node2 #连接mysql用户名和密码 user=maxwell password=maxwell #指定maxwell 当前连接mysql的实例id,这里用于全量同步表数据使用 client_id=maxwell_first

3.2、启动kafka,创建Kafka topic,并监控Kafka topic

启动Zookeeper集群、Kafka 集群,创建topic“KAFKA-DB-BUSSINESS-DATA” topic:

#进入Kafka路径,创建对应topic
[root@node1 ~]# cd /software/kafka_2.11-0.11.0.3/bin/
[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DB-BUSSINESS-DATA --partitions 3 --replication-factor 3

#监控Kafak topic 中的数据
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-DB-BUSSINESS-DATA

3.3、启动maxwell

#在node3节点上启动maxwell
[root@node3 ~]# cd /software/maxwell-1.28.2/bin/
[root@node3 bin]#  maxwell --config ../config.properties

3.4、在mysql中创建“lakehousedb”并导入数据

#进入mysql ,创建数据库lakehousedb
[root@node2 ~]# mysql -u root -p123456
mysql> create database lakehousedb;

打开“Navicat”工具,将资料中的“lakehousedb.sql”文件导入到MySQL数据库“lakehousedb”中,我们可以看到在对应的kafka topic “KAFKA-DB-BUSSINESS-DATA”中会有数据被采集过来。

;

二、用户日志数据

1、用户日志数据

目前用户日志数据只有“会员浏览商品日志数据”,其详细信息如下:

  • 接口地址:/collector/common/browselog
  • 请求方式:post
  • 请求数据类型:application/json
  • 接口描述:用户登录系统后,会有当前登录时间信息及当前用户登录后浏览商品,跳转链接、浏览所获积分等信息
  • 请求示例:
{
    "logTime": 1646393162044,
    "userId": "uid53439497",
    "userIp": "216.36.11.233",
    "frontProductUrl": "https://fo0z7oZj/rInrtrb/ui",
    "browseProductUrl": "https://2/5Rwwx/SqqwwwOUsK4",
    "browseProductTpCode": "202",
    "browseProductCode": "q6HCcpwfdgfgfxd2I",
    "obtainPoints": 16,
}
  • 请求参数解释如下:

参数名称

参数说明

logTime

浏览日志时间

userId

用户编号

userIp

浏览Ip地址

frontProductUrl

跳转前URL地址,有为null,有的不为null

browseProductUrl

浏览商品URL

browseProductTpCode

浏览商品二级分类

browseProductCode

浏览商品编号

obtainPoints

浏览商品所获积分

2、用户日志数据采集

日志数据采集是通过log4j日志配置来将用户的日志数据集中获取,这里我们编写日志采集接口项目“LogCollector”来采集用户日志数据。

当用户浏览网站触发对应的接口时,日志采集接口根据配合的log4j将用户浏览信息写入对应的目录中,然后通过Flume监控对应的日志目录,将用户日志数据采集到Kafka topic “KAFKA-USER-LOG-DATA”中。

这里我们自己模拟用户浏览日志数据,将用户浏览日志数据采集到Kafka中,详细步骤如下:

2.1、将日志采集接口项目打包,上传到node5节点

将日志采集接口项目“LogCollector”项目配置成生产环境prod,打包,上传到node5节点目录/software下。

2.2、编写Flume 配置文件a.properties

将a.properties存放在node5节点/software目录下,文件配置内容如下:

#设置source名称
a.sources = r1
#设置channel的名称
a.channels = c1
#设置sink的名称
a.sinks = k1

# For each one of the sources, the type is defined
#设置source类型为TAILDIR,监控目录下的文件
#Taildir Source可实时监控目录一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题
a.sources.r1.type = TAILDIR
#文件的组,可以定义多种
a.sources.r1.filegroups = f1
#第一组监控的是对应文件夹中的什么文件:.log文件
a.sources.r1.filegroups.f1 = /software/lakehouselogs/userbrowse/.*log

# The channel can be defined as follows.
#设置source的channel名称
a.sources.r1.channels = c1
a.sources.r1.max-line-length = 1000000
#a.sources.r1.eventSize = 512000000

# Each channel's type is defined.
#设置channel的类型
a.channels.c1.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
#设置channel道中最大可以存储的event数量
a.channels.c1.capacity = 1000
#每次最大从source获取或者发送到sink中的数据量
a.channels.c1.transcationCapacity=100

# Each sink's type must be defined
#设置Kafka接收器
a.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a.sinks.k1.brokerList=node1:9092,node2:9092,node3:9092
#设置Kafka的Topic
a.sinks.k1.topic=KAFKA-USER-LOG-DATA
#设置序列化方式
a.sinks.k1.serializer.class=kafka.serializer.StringEncoder 
#Specify the channel the sink should use
#设置sink的channel名称
a.sinks.k1.channel = c1

2.3、在Kafka中创建对应的topic并监控

#进入Kafka路径,创建对应topic
[root@node1 ~]# cd /software/kafka_2.11-0.11.0.3/bin/
[root@node1 bin]# ./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3

#监控Kafak topic 中的数据
[root@node1 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic KAFKA-USER-LOG-DATA

2.4、启动日志采集接口

在node5节点上启动日志采集接口,启动命令如下:

[root@node5 ~]# cd /software/
[root@node5 software]# java -jar ./logcollector-0.0.1-SNAPSHOT.jar 

启动之后,根据日志采集接口配置会在“/software/lakehouselogs/userbrowse”目录中汇集用户浏览商品日志数据。

2.5、 启动Flume,监控用户日志数据到Kafka

在node5节点上启动Flume,监控用户浏览日志数据到Kafka “KAFKA-USER-LOG-DATA” topic。

[root@node5 ~]# cd /software/
[root@node5 software]# flume-ng agent --name a -f /software/a.properties -Dflume.root.logger=INFO,console

2.6、启动模拟用户浏览日志代码,向日志采集接口生产数据

在window本地启动“LakeHouseMockData”项目下的“RTMockUserLogData”代码,向日志采集接口中生产用户浏览商品日志数据。

启动代码后,我们会在Kafka “KAFKA-USER-LOG-DATA” topic 中看到监控到的用户日志数据。

;

三、错误解决

如果在向mysql中创建库及表时有如下错误:

Err 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains nonaggregated column 'information_schema.PROFILING.SEQ' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by

以上错误是由于MySQL sql_mode引起,对于group by聚合操作,如果在select中的列没有在group by中出现,那么这个SQL是不合法的。按照以下步骤来处理。

1、首先停止mysql,然后在mysql节点配置my.ini文件

[root@node2 ~]# service mysqld stop

打开/etc/my.cnf文件,在mysqld标签下配置如下内容:

mysqld sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

2、重启mysql即可解决

[root@node2 ~]# service mysqld start

相关推荐

如何在 Linux 中压缩文件和目录?(linux压缩文件夹到指定目录)

在Linux系统中,文件和目录的压缩是一项常见且重要的操作。无论是为了节省存储空间、便于文件传输,还是进行备份管理,掌握压缩技术都能极大地提升工作效率。Linux中常用的压缩工具1.tar:打...

什么是LIM模具?与普通硅胶模具有何本质区别?

要深入理解LIM模具及其与普通硅胶模具的本质区别,需从成型逻辑、技术架构、应用价值三个层面拆解,以下是系统性解析:一、LIM模具:定义与核心技术1.定义LIM模具(LiquidInj...

前后端安全机制(前后端分离安全的token)

一、密钥安全管理方案1.动态密钥分发机制密钥与会话绑定后端为每个用户会话生成临时密钥(如AES-256密钥),通过HTTPS加密传输给前端,会话结束后自动失效。例如:javascript//...

Switch 2芯片细节曝光,英伟达专门定制支持DLSS,网友:掌机模式相当于PS4

Switch2处理器,细节被实锤!数毛社(DigitalFoundry)消息,已经确定Switch2采用的是英伟达真·定制版芯片,包含8核CPU和12GBLPDDR5X内存。GPU则基于Amp...

独立站的PageSpeed Insights 指标在seo中的作用?

这是一个非常关键的问题,关于独立站(如Shopify、WordPress、自建FastAPI/Vue等网站)的PageSpeedInsights指标(Google的网页性能评分工具)在...

前端工程化-webpack 分包的方式有哪些?

Webpack的分包(CodeSplitting)是优化应用性能的重要手段,主要通过合理拆分代码减少首次加载体积、提升缓存利用率。以下是常见的分包方式及生产/开发环境配置建议:一、Webpack...

液态硅胶(LSR)套啤注塑件的关键技术难点与解决方案?

液态硅胶(LSR)套啤注塑件(即二次注塑成型,一次成型基材+二次LSR包胶)在医疗、电子、汽车等领域应用广泛,但其关键技术难点需从材料、模具、工艺等多维度突破。以下是核心难点及解决方案:一、关...

spa首屏加载慢怎样解决(spa首屏优化)

SPA(SinglePageApplication,单页应用)首屏加载慢是一个常见问题,主要原因通常是首次加载需要拉取体积较大的JavaScript文件、样式表、初始化数据等。以下是一些常见的...

揭秘|为什么新华三(H3C)要自主研发运维管理软件?

1概述1.1产生背景随着互联网技术的快速发展,企业对计算、网络的需求也越来越大。为了保证整个数据系统可靠、稳定地运行,相关企业对运维系统的要求越来越高,运维成本也在随之逐步增加。H3C公司自主研发的运...

动态主机配置协议——DHCP详解(dhcp动态主机配置协议的功能是?)

一、DHCP简介DHCP(DynamicHostConfigurationProtocol),动态主机配置协议,是一个应用层协议。当我们将客户主机ip地址设置为动态获取方式时,DHCP服务器就会...

OGG同步到Kafka(oggforbigdata到kafka)

目的:测试使用OGG将数据单向同步到Kafka上。简要说明:Kafka使用单节点单Broker部署;单独部署简单ZooKeeper;需要使用到JAVA1.8;OGG需要2个版本,一个fororacl...

Zabbix入门操作指南(zabbix4.0使用手册)

上篇:安装与配置一.概述在开始之前,一些概念和定义需要我们提前了解一下(以下内容摘自官方网站)。1.1几个概念架构Zabbix由几个主要的功能组件组成,其职责如下所示。ServerZabbixs...

绝对干货!升级MySQL5.7到MySQL8.0的最佳实践分享

一、前言事出必有因,在这个月的某个项目中,我们面临了一项重要任务,即每年一次的等保测评整改。这次测评的重点是Mysql的一些高危漏洞,客户要求我们无论如何必须解决这些漏洞。尽管我们感到无奈,但为了满足...

pytorch v2.7.0震撼发布!Blackwell GPU支持+编译性能狂飙,AI开发

重点内容测试版(Beta):oTorch.Compile支持Torch函数模式oMega缓存原型(Prototype):o支持NVIDIABlackwell架构oPyTorch...

kubernetes1.31.3集群搭建(上)(kubectl连接集群)

1集群规划1.1物理机环境电脑操作系统CPU内存硬盘网卡IP地址(静态)虚拟机软件服务器操作系统联想Windows11Intel12900K24核128GB4TBPcIE4.0无线网卡192...

取消回复欢迎 发表评论: