نکات جلسه پنجم از دانشگاه خاتم

iterator

در کد زیر مفهوم iterator مشخص شده است:

mytuple = ("apple", "banana", "cherry")
myit = iter(mytuple)

print(next(myit))
print(next(myit))
print(next(myit))

تابعی دارم به نام getnumberofpartitions

با repartition میتوان تعداد پارتیشن ها را کم و زیاد کرد.

1- RDDرا یا با paralelize میسازیم یا با textFile یا باWholetextfile

2- یا با result of transformations

در حالت fs –> اگر فایل من کمتر از 32 مگابایت بود یک پارتیشن برای آن در نظر گرفته میشود.

در حالت hdfs –> هر بلاک یک پارتیشن میشود.

py4j به ما کمک میکند که جاوا را با پایتون و اسپارک را چیز کنیم / اصطلاحا

در یک برنامه نمیتوان چندین sparkContex داشت. میتوان با کد زیر آن را استاپ کرد:

SC.stop

با اجرای این کد خطای ظاهر شده وجود نخواهد داشت.

یکی از کاربردهای repartiotion این است که میتوان تعداد فایلهای خروجی را به تعداد دلخواه محدود کرد. قبل از دستور SaveAstextFile

repartition تابع پر هزینه ای است. به جای آن از Coalesce استفاده میشود. که تنها میتواند تعداد پارتیشن را کمتر کند. توضیح بیشتر جلسه بعد- بنار این در 99 درصد موارد ترجیح بر استفاده از coalesce است.

جلسه ششم

spark sql به این دلی بوجود آمد که بتوان با زبان سطح بالاتری با دیتا کار کرد

هم دیتا فریم داریم و هم کد sql را میتوان روی دیتا ران کرد

سمت چپ شکل بالا مربوط با داده های نامنظم است. و سمت راست ساختار یافته.

spark SQL دوتا API دارد:

مشابه RDD از معماری توزیع شده تبعیت میکند.
تعریف دیتا فریم: یک rdd و اسکیما را یک دیتافریم گویند

ساختار rdd:

, در زیر تصویر دیتا فریم :(اسکیما یعنی هم اسم ستون داریم و هم نوع type)

شکل زیر نشان میدهد که حتی زمانی که با spark sql کار میکنیم با واسطه داریم با rdd کار میکنیم. ولی تفاوت در این است که در سطح بالاتری با rdd ها کار میکنیم و اصطلاحا کارمان راحت تر است.

اولین آبجکت که میسازیم spark session است که منجر به ساخت دیتا فریم میشود. چیزی که در rddمیساختیم spark context بود.

نکته مهم: درون هر اسپارک سشن نیز یک spark context وجود دارد:

کد زیر نشان میدهد با وجود اینکه سشن ساخته ایم، spark.sparkContext نیز وجود دارد.

pip install pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Practicec').getOrCreate()
spark.sparkContext

در تصویر زیر نشان داده شده که به طور اتوماتیک میتوان چه نوع فایلهایی را با متود read فراخوانی کرد:

با کد زیر دیتا را خواندیم:

# link of data:
# https://drive.google.com/file/d/1LgibGOp2aJGbqjqAuU3iKKWYWcB0SAZd/view?usp=share_link
df1=spark.read.csv('Book-Ratings.csv')

نتیجه:

در نتیجه بالا مشاهده میشود که دیتای ما به طور منظم شناخته نشد است. چرا؟

دلیل آن delimiter است. که با کد زیر اصلاح میکنیم:

df1=spark.read.options(sep=';').csv('Book-Ratings.csv')
df1.show()

نتیجه:

مشکل دیگر این است که هدر را در نظر نگرفته است. که با کد زیر آن را حل میکنیم:

df1=spark.read.options(sep=';',header=True).csv('Book-Ratings.csv')
df1.show()

نتیجه:

همه دیتاها در قالب string در نظر گرفته شده اند، که با inferschema میتواند آن را تغییر داد:

با کد زیر مواردی که مقدار rate آنها بیشتر از 5 باشد را فیلتر کرده و نمایش میدهیم:

df1.filter('rate>5').show()

نتیجه:

اگر بخواهیم از نتیجه بالا فقط شماره isbn را ببینیم:

df1.filter('rate>5').select('isbn').show()

میخواهیم میانگین rate داده شده به کتابها را بدست بیاوریم. از groupBy استفاده میکنیم:

df1.groupBy('isbn').avg('rate').show()

نتیجه:

میخواهیم تعداد rate هایی که هر کتاب بدست آورده را بدست بیاوریم:

df1.groupBy('isbn').count().show()

نتیجه:

سوال بیت اله:

اگر بخواهیم گروه بندی را بر اساس بیش از یک شرط اجرا کنیم. مثلا هم بر اساس جمع ریت ها و هم بر اساس تعداد ریت ها و هم بر اساس میانگین ریت ها:

df1.groupBy('isbn').agg(sum('rate'),count('rate'),avg('rate')).show()

نتیجه:

اگر بخواهیم بیشترین ریت هم نمایش داده شود:

df1.groupBy('isbn').agg(sum('rate'),count('rate'),avg('rate'),max('rate')).show()

نام الیاس

نام الیاس هم میتوان برای ستون های جدید گذاشت:

df3=df1.groupBy('isbn').agg(sum('rate').alias('sum'),count('rate').alias('rate'),avg('rate').alias('avge'),max('rate'))

حال با دستور زیر فیلتر مورد نظر خود را بر اساس نام الیاس تنظیم میکنیم:

df3.filter('avge>3').show()

نتیجه:

میخواهیم کتابهایی که تعداد ریت آنها بیشتر از 50 عدد بوده است را بازی بدهیم فقط:

df3=df1.groupBy('isbn').agg(sum('rate').alias('sum'),count('rate').alias('countee'),avg('rate').alias('avge'),max('rate'))
df3.filter('countee>50').show()

نتیجه:

انتخاب کتابهایی که بیش از 100 رای دریافت کرده اند و میانگین امتیاز آنها بیشتر از 4 باشد:

df3=df1.groupBy('isbn').agg(sum('rate').alias('sum'),count('rate').alias('countee'),avg('rate').alias('avge'),max('rate'))
df3.filter('countee>100').filter('avge>4').show()

سورت کردن- asc و desc

میخواهیم ابتدا count کنیم و بعد بر اساس بیشترین تعداد رای دریافتی به کمترین سورت کنیم:

کم به زیاد :

df1.groupBy('isbn').count().orderBy('count').show()

زیاد به کم:

df1.groupBy('isbn').count().orderBy(col('count').desc()).show()

نتیجه:

از df3 به طرز راحت تری میتوان نتیجه بالا را نمایش داد:

df3.orderBy(col('countee').desc()).show()

کوئری SQL :

میتوانیم برای دیتای خود یک ویو تعریف کنیم که بتوانیم دستورات SQL را استفاده کنیم:

با کد زیر به سشن میگوییم هر وقت کسی my_rating را صدا زد آن را به df1 ارجاع بده:

# مبحث بعدی
df1.createOrReplaceTempView('my_rating')

با دستور زیر اولین کویری SQL خود را اجرا میکنیم:

spark.sql('select * from my_rating where rate > 5').show()

نتیجه:

با دستور زیر یک کوئری پیچیده تر را اجرا میکنیم:

spark.sql('select isbn, avg(rate) as AVG from my_rating group By isbn').show()

نتیجه:

همان نتیجه قبلی و اگر بخواهیم نتیجه در حالت نزولی باشد:

spark.sql('select isbn, avg(rate) as AVG from my_rating group By isbn ORDER BY AVG DESC').show()

join:

انواع join در تصویر زیر مشخص شده است:

داده های دیگر را نیز با دستور زیر فراخوانی میکنیم:

rating_df=spark.read.options(sep=';',header=True,inferSchema=True).csv('Book-Ratings.csv')
book_df=  spark.read.options(sep=';',header=True,inferSchema=True).csv('Books.csv')
user_df=  spark.read.options(sep=';',header=True,inferSchema=True).csv('Users.csv')

با دستور زیر join میزنیم:

rating_df.join(user_df).show()
# the default is cross_join

نتیجه:

دستور جوین با شرط برابر بودن id یوزر در هر دوتا دیتا فریم:

rating_df.join(user_df,rating_df['userid']==user_df['UserID']).show()
# جوین با شرط

نتیجه:

تا 1 ساعت و 37 دقیقه

right join:

به این صورت عمل میکند که id جدول فرضی سمت راست در نظر گرفته میشود، اگر در جدول سمت چپ Id معادل وجود داشته باشد، که عدد آن مشاهده میشود، در غیر اینصورتnull قرار میگیرد.

left Anti:

دیتاهایی به ما برگردانده مبیشود که در راست نیست:

joinType='left_anti'
join_exp=rating_df['userid']==user_df['UserID']
rating_df.join(user_df,join_exp , joinType).show(truncate=True)

نتیجه:

در نتیجه بالا مواردی برگردانده شده اند که در چپ وجود ندارند. (left anti). یعنی اگر در فایلusers.csv به دنبال یکی از موارد فوق بگردیم، پیدا نخواهیم کرد.

در دستور فوق به جای show عبارت count را استفاده میکنیم تا ببینیم چه تعداد از داده ها، ناقص هستند:

joinType='left_anti'
join_exp=rating_df['userid']==user_df['UserID']
rating_df.join(user_df, join_exp , joinType).count()

که نتیجه تعداد 9578 مورد است. یعنی تعدادی که در rating.csv وجود دارند اما در users.csv وجود ندارند.

دقت در join:

کد زیر نشان میدهد که ضرب کارتزین چه دیتای بزرگی را برای ما تولید میکند. بنابراین در هنگام join کردن باید دقت داشته باشیم، زیرا ممکن است داده های بزرگی تولید شود:

print(rating_df.count())
print(user_df.count())
print(282381 *147789)
print(rating_df.join(user_df).count())

نتیجه: 41 میلیارد تولید شد

تا ساعت 1 و دقیقه 50

ادامه کوئری SQL با اسپارک

با دستور زیر سه شی میسازیم:

rating_df.createOrReplaceTempView('rating')
user_df.createOrReplaceTempView('users')
book_df.createOrReplaceTempView('books')

با کد زیر یک کوئری زده ایم که در آن از هر سه دیتا استفاده شده است:

spark.sql('SELECT * FROM rating,users,books WHERE rating.userid==users.userid AND rating.isbn== books.isbn').show()

بخشی از نتیجه:

برای نمایش زیبا تر این را به کد اضافه میکنیم:

(truncate=False,vertical=True)

تمرین:

حل تمرین قبلی با استفاده sparkSQL — رجوع به فولدر مربوطه

 working with DF without Schema

در واقع خودمان یک اسکیما میسازیم.

user_id_field=StructField('u_id',StringType(),True)
isbn_field=StructField('isbn',StringType(),True)
rate_field=StructField('rating',StringType(),True)

my_schema=StructType([user_id_field,isbn_field,rate_field])

تا ساعت 2 و دقیقه 30

سایر موارد را در فایل گیت هاب مطالعه نمایید.

کل جلسه 3 ساعت و 11 دقیقه

منابع:

1- گیت هاب strumer

منتشر شده در
دسته‌بندی شده در spark

دیدگاهی بنویسید

نشانی ایمیل شما منتشر نخواهد شد.