-
Notifications
You must be signed in to change notification settings - Fork 3
pyspark
stanislawbartkowski edited this page Feb 26, 2022
·
29 revisions
Several useful code snippets
pip3 install pyspark
import pyspark
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
val n = "https://raw.githubusercontent.com/sehgalayush1/movie_revenue/master/ml_modules/data/"
val f = "BollywoodMovieDetail.csv"
spark.sparkContext.addFile(n+f)
val movies = spark.read.option("header", "true").option("inferSchema", "true").csv(SparkFiles.get(f))
products = spark.read.csv(
"products.csv", header=True, mode="DROPMALFORMED"
)
sellers.write.parquet("sellers_parquet", mode="overwrite")
products = spark.read.parquet("products_parquet")
df = spark.read.csv(path, sep=r'\t', header=True).select('col1','col2')
# import what we will need
from pyspark.sql.functions import when, col, mean, desc, round,avg,round
df_divorced = data.select(col('occupation'), when(col('marital-status') == 'Divorced',1).otherwise(0).alias('is_divorced'))
data.select('education').distinct().show()
data.select('marital-status').distinct().show()
data.select('education-num','education').distinct().show()
df_bachelor.filter(col('is_bachelor') == 1).groupBy('education').count().show()
df_bachelor.groupBy('education').count().show()
sales.groupBy('product_id').agg(count('*').alias('numof')).show()
sales.groupBy('product_id').agg(count('*').alias('numof')).filter(col('numof') > 1).count()
sales.groupBy('product_id').agg(count('*').alias('numof')).orderBy(col('numof').desc()).show()
sales.join(products,products.product_id == sales.product_id).withColumn('value',sales.num_pieces_sold * products.price).groupBy('order_id').agg(sum('value').alias('value')).orderBy(col('value').desc()).show()
sc._conf.getAll()
import os
pyspark_submit_args ='--master local[8] --executor-memory 5G --driver-memory 10G --num-executors 8 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
df.rdd.getNumPartitions()
from pyspark.sql.functions import spark_partition_id, asc, desc
df.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().orderBy(asc("count")).show()
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
import pyspark
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
servers = '192.168.122.117:9092,192.168.122.246:9092,192.168.122.176:9092'
schema = ["CAST(key AS STRING)", "CAST(value AS STRING)","CAST(timestamp as timestamp)","CAST(partition as int)"]
df = spark.read.format("kafka").option("kafka.bootstrap.servers", servers).option("subscribe", "test").option("startingOffsets", "earliest").option("endingOffsets", "latest").load().selectExpr(schema)
df.show()
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1
import pandas as pd
from tqdm import tqdm
import csv
import random
import string
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os
from typing import List
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
sc._conf.getAll()
spark.sql("select date_format(date '1970-01-01', 'MMM d , y') as DD").show()