当前位置:首页 > 数码 > 基于PySpark-SQL的媒体阅读日志ETL作业 (基于pyspark的天气分析)

基于PySpark-SQL的媒体阅读日志ETL作业 (基于pyspark的天气分析)

admin7个月前 (05-07)数码14

pyspark除了官网的文档,网上的教程资料不时很少,但基于调度平台下,经常使用pyspark编写代码十分高效,程序自身是提交到spark集群中,性能上也是毫无疑问的,在本文中,咱们将深化讨论基于Spark的媒体阅读日志ETL(提取、转换、加载)流水线的详细成功,在展现如何经常使用PySparkSQL处置大规模的媒体阅读日志数据,包含IP地址转换、数据荡涤、期间维度补充、码表关联等关键步骤。

一、环境性能

首先,咱们须要创立一个SparkSession并导入必要的库和设置自动参数,包含与IP-to-Location数据库的交互以及其余关系的性能。

假设pyspark仅仅是本地运转而不是提交加群时,可以经常使用findspark库,它能够协助咱们极速初始化Spark环境。在开局之前,确保您曾经成功装置了findspark库,并曾经下载并解压了Spark二进制文件。将Spark的装置门路和/target=_blankclass=infotextkey>Python解释器门路指定为变量。

importfindspark#指定Spark的装置门路spark_home="/usr/local/spark"#指定用于Spark的Python解释器门路python_path="/home/hadoop/.conda/envs/sparkbox/bin/python3.6"#经常使用findspark.init方法初始化Spark环境findspark.init(spark_home,python_path)

findspark.init方法将协助设置PYSPARK_PYTHON和SPARK_HOME环境变量,确保正确的Spark库和性能文件被加载。其简化了Spark环境的初始化环节,防止手动性能环境变量。

二、数据处置

1.数据读取

首先,咱们经常使用PySpark的read方法从HDFS中读取媒体阅读日志数据。咱们指定了数据的schema,以确保正确地解析每一列。

SQL的媒体阅读日志ETL作业df=spark.read.schema(schema).parquet("hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter("dtin({})".format(",".join(["'{}'".format(partition)forpartitioninlatest_partitions])))

2.IP地址转换

接上去,咱们经过iptranslate函数将IP地址转换为天文位置消息。这经常使用了XdbSearcher类,该类担任读取xdb文件并口头IP地址的二分查找。

#依据IP地址失掉地点消息from_ip_get_place_udf=udf(action.iptranslate,struct_schema)df=df.withColumn('country',from_ip_get_place_udf(col('ip'),lit('country')))df=df.withColumn("place",from_ip_get_place_udf(col('ip')))df=df.withColumn("country",df["place"]["country"])df=df.withColumn("city",df["place"]["city"])df=df.withColumn("province",df["place"]["province"])df=df.drop('place')

3.期间维度减少

咱们生成以后期间的期间戳,并减少各种期间格局的列,包含年、季度、月、日、小时等。

#生成以后期间的期间戳df=df.withColumn("current_timestamp",from_unixtime(df["operation_time"]/1000))#减少各种期间格局的列df=df.withColumn("year",date_format("current_timestamp","yyyy"))df=df.withColumn("quarter",date_format("current_timestamp","yyyy-MM"))df=df.withColumn("month",date_format("current_timestamp","yyyy-MM"))df=df.withColumn("day",date_format("current_timestamp","dd"))df=df.withColumn("hour_time",date_format("current_timestamp","yyyy-MM-ddHH"))df=df.withColumn("dt",date_format("current_timestamp","yyyy-MM-dd"))df=df.withColumn("hour",date_format("current_timestamp","HH"))df=df.drop('current_timestamp')

4.数据荡涤

最后,咱们对数据启动荡涤,包含将空值交流为自动值、字符串去除空格、数据类型转换等。

#数据荡涤newdf=newdf.withColumn("media_type",when(col("media_type").isNull(),0).otherwise(col("media_type")))newdf=newdf.withColumn("news_type",when(col("news_type").isNull(),99).otherwise(col("news_type")))newdf=newdf.withColumn("original_type",when(col("original_type").isNull(),99).otherwise(col("original_type")))#...

5.最终写入HDFS

最终,咱们将处置后的数据写入HDFS,驳回分区形式存储,以便更高效地治理和查问。

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")newdf.write.partitionBy("dt","hour").mode("overwrite").option('user','hive').parquet("hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")

三、论断

经过详细的成功步骤,深化解析了基于Spark的媒体阅读日志ETL义务的构建环节。这个义务可以依据详细需求启动调整和裁减,为大规模数据处置义务提供了一种高效而灵敏的处置打算。


如何让更多人知道自己的空间?

1,向自己的朋友推广;这种作法是大部分人都会作的事情,因为他们是自己最重要的读者。 毋庸敷言。 2,参与社区活动;积极参与社区活动是提高自己在社区中知名度的有效手段,大家熟悉了,自然更关注你的网页;3,日志转贴转贴是网络传播的一种非常重要的手段,自己的精彩日志可以到人气大的社区进行转贴,可以让更多的人留意到自己的文章,当然,应该留下自己网页的地址。 4,日志推荐好文章不一定是你写的,怎么办?此次敏思改版,增加了一个日志推荐的功能。 大家可以把自己认为精彩的文章推荐给自己的朋友,既帮了朋友,又可以留下自己网页的地址,也是一种借助别人力量的方式。 5,搜索引擎登录你的是网络空间,应该免这步了。 6,短信工具短信工具如:QQ、MSN、ICQ等在线短信工具也是推广自己网站的一个很好的平台,不过请注意,不要采用让大家厌恶的方式,否则也是反效果。 7,媒体转载博客日志数以万计,每天都有大量的原创日志出炉,而媒体是非常需要原创内容的,如果有条件,可以将这些精彩日志推荐给媒体使用,可以极大地提高网页的浏览量,成效不错,值得大家借鉴。 8,交换链接博客的世界是交换链接的世界,利用交换链接,也是推广自己网站的有效方式。 在这个方式中特别要注意的是,尽量和人气比较高的网站交换链接,这样你的网站的重要性会提高,因此产生的效益也比较高。 当然,作为媒体博客,我们更希望大家和媒体网站或者是个人主页交换链接,这样我们的网站特征就更明显。

博客是什么?

“博客”一词是从英文单词Blog翻译而来。 Blog是Weblog的简称,而Weblog则是由Web和Log两个英文单词组合而成。 log有以下几种解释: 1. A record of a ships speed, its progress, and any shipboard events of navigational importance. 航海记录:对船速、船程以及船上发生的所有对航海有意义的事件的记载。 2. The book in which this record is kept. 航海日志:保有这种记载的本子。 3. A record of a vehicles performance, as the flight record of an aircraft. 飞行日志:对交通工具工作情况的记载,如飞机的飞行记录。 4. A record, as of the performance of a machine or the progress of an undertaking: 日志:对某种机器工作情况或某项任务进展情况的记载。 Weblog就是在网络上发布和阅读的流水记录,通常称为“网络日志”,简称为“网志”。 博客(BLOGGER)概念解释为网络出版(Web Publishing)、发表和张贴(Post-这个字当名词用时就是指张贴的文章)文章,是个急速成长的网络活动,现在甚至出现了一个用来指称这种网络出版和发表文章的专有名词——Weblog,或Blog。 Blogger即指撰写Blog的人。 Blogger在很多时候也被翻译成为“博客”一词,而撰写Blog这种行为,有时候也被翻译成“博客”。 因而,中文“博客”一词,既可作为名词,分别指代两种意思Blog(网志)和Blogger(撰写网志的人),也可作为动词,意思为撰写网志这种行为,只是在不同的场合分别表示不同的意思罢了。 Blog是一个网页,通常由简短且经常更新的帖子(Post,作为动词,表示张贴的意思,作为名字,指张贴的文章)构成,这些帖子一般是按照年份和日期倒序排列的。 而作为Blog的内容,它可以是你纯粹个人的想法和心得,包括你对时事新闻、国家大事的个人看法,或者你对一日三餐、服饰打扮的精心料理等,也可以是在基于某一主题的情况下或是在某一共同领域内由一群人集体创作的内容。 它并不等同于“网络日记”。 作为网络日记是带有很明显的私人性质的,而Blog则是私人性和公共性的有效结合,它绝不仅仅是纯粹个人思想的表达和日常琐事的记录,它所提供的内容可以用来进行交流和为他人提供帮助,是可以包容整个互联网的,具有极高的共享精神和价值。 一个Blog就是一个网页,它通常是由简短且经常更新的Post所构成;这些张贴的文章都按照年份和日期排列。 Blog的内容和目的有很大的不同,从对其他网站的超级链接和评论,有关公司、个人、构想的新闻到日记、照片、诗歌、散文,甚至科幻小说的发表或张贴都有。 许多Blogs是个人心中所想之事情的发表,其他Blogs则是一群人基于某个特定主题或共同利益领域的集体创作。 Blog好象是对网络传达的实时讯息。 撰写这些Weblog或Blog的人就叫做Blogger或Blog writer。 简言之,Blog就是以网络作为载体,简易迅速便捷地发布自己的心得,及时有效轻松地与他人进行交流,再集丰富多彩的个性化展示于一体的综合性平台。 不同的博客可能使用不同的编码,所以相互之间也不一定兼容。 例如,网络空间使用的是GB2312编码,新浪博客使用的是UTF-8编码。 而且,目前很多博客都提供丰富多彩的模板等功能,这使得不同的博客各具特色

免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。

标签: PySpark