PythonSpark:需要从文件列执行配置单元查询 [英] PythonSpark: need to execute hive queries from file columns
问题描述
我有一个文件,其行如下所示(文件名:sample.csv
)
I have a file with rows like below (file name: sample.csv
)
Id,Query
T1012,"Select * from employee_dim limit 100"
T1212,"Select * from department_dim limit 100"
T1231,"Select dept_number,location,dept_name from locations"
我需要遍历此文件(sample.csv
)并获取第二列(查询"),在hive数据库中运行它并获取结果,然后将其保存到名为T1012_result.csv
的新文件中,并针对所有行执行类似操作.
I need to iterate through this file (sample.csv
) and take the second column("query"), run it in hive database and get the result of this, then save it to a new file named T1012_result.csv
, and similarly do it for all rows.
可以帮忙吗?
我尝试通过spark读取文件并将其转换为列表,然后使用不起作用的sparksession执行SQL查询.
I tried reading the file through spark and converting it to a list and then executing the SQL queries using sparksession which is not working .
from pyspark.sql import SparkSession,HiveContext
spark=SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql("use sample")
input=spark.read.csv("sample.csv")
#input.select('_c1').show()
import pandas as pd
a=input.toPandas().values.tolist()
for i in a :
print i[1]
spark.sql('pd.DataFrame(i)')
推荐答案
已更新:spark
file_path="file:///user/vikrant/inputfiles/multiquery.csv"
df=spark.read.format("com.databricks.spark.csv").option("header", "true").load(file_path)
+---+-------------------------------+
|id |query |
+---+-------------------------------+
|1 |select * from exampledate |
|2 |select * from test |
|3 |select * from newpartitiontable|
+---+-------------------------------+
def customFunction(row):
for row in df.rdd.collect():
item=(row[1])
filename=(row[0])
query=""
query+=str(item)
newdf=spark.sql(query)
savedataframe(newdf,filename)
def savedataframe(newdf,filename):
newdf.coalesce(1).write.csv("/user/dev/hadoop/external/files/file_" + filename + ".csv")
customFunction(df)
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_1.csv
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_2.csv
drwxr-xr-x - vikct001 hdfs 0 2019-08-02 11:49 /user/dev/hadoop/external/files/file_3.csv
更新:使用熊猫 我在sql server上有很少的测试表,并且正如您在问题中提到的,我正在将它们读取到pandas数据框,并将查询结果保存到每个不同的文件中,并将其重命名为数据框的第一列:
Update: using pandas I have few test tables on sql server and I am reading them to pandas dataframe as you mentioned in your question and will be saving the query result to each different files with renamed as first column of your dataframe:
import pandas as pd
import pyodbc
from pandas import DataFrame
connection = pyodbc.connect('Driver={ODBC Driver 13 for SQL Server};SERVER=yourservername;DATABASE=some_db;UID=username;PWD=password')
cursor = connection.cursor()
data=[['1','select * from User_Stage_Table'],['2','select * from User_temp_Table']]
df=pd.DataFrame(data,columns=['id','query'])
def get_query(df):
a=df.values.tolist()
for i in a:
query=i[1] #reading second column value as query
filename=i[0] #reading first column value as filename
write_query(query,filename) #calling write_query function
def write_query(query,filename):
df=pd.read_sql_query(query,connection)
df.to_csv(outfile_location+filename+".txt",sep=',',encoding='utf-8',index=None,mode='a')
get_query(df) #calling get_query function to build the query
out_file_location='G:\Testing\OutputFile\outfile'
您将输出文件名为:
outfile1.txt
#这将具有表User_Stage_Table
outfile2.txt
#这将具有表User_temp_Table'
让我知道这是否可以解决您的问题或进一步面临任何问题.
Let me know if this solves your problem or face any issues further.
这篇关于PythonSpark:需要从文件列执行配置单元查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!