在Python中将Spark RDD加载到Neo4j [英] Load Spark RDD to Neo4j in Python
问题描述
我正在一个正在使用 Spark 进行数据处理的项目中.现在,我的数据已处理完毕,我需要将数据加载到 Neo4j 中.加载到Neo4j中后,我将使用它来展示结果.
I am working on a project where I am using Spark for Data processing. My data is now processed and I need to load the data into Neo4j. After loading into Neo4j, I will be using that to showcase the results.
我希望所有实现都可以在 Python 编程中完成.但是我在网上找不到任何库或示例.您能否提供链接或库或任何示例的帮助?
I wanted all the implementation to de done in Python Programming. But I could't find any library or example on net. Can you please help with links or the libraries or any example.
我的RDD是PairedRDD.而且在每个元组中,我必须创建一个关系.
PairedRDD
My RDD is a PairedRDD. And in every tuple, I have to create a relationship.
PairedRDD
Key Value
Jack [a,b,c]
为简单起见,我将RDD转换为
For simplicity purpose, I transformed the RDD to
Key value
Jack a
Jack b
Jack c
然后我必须在
Jack->a
Jack->b
Jack->c
基于William Answer,我能够直接加载列表.但是这些数据引发了密码错误.
Based on William Answer, I am able to load a list directly. But this data is throwing the cypher error.
我这样尝试过:
def writeBatch(b):
print("writing batch of " + str(len(b)))
session = driver.session()
session.run('UNWIND {batch} AS elt MERGE (n:user1 {user: elt[0]})', {'batch': b})
session.close()
def write2neo(v):
batch_d.append(v)
for hobby in v[1]:
batch_d.append([v[0],hobby])
global processed
processed += 1
if len(batch) >= 500 or processed >= max:
writeBatch(batch)
batch[:] = []
max = userhobbies.count()
userhobbies.foreach(write2neo)
b是列表列表.展开式Elt是两个元素elt [0],elt [1]作为键和值的列表.
b is the list of lists. Unwinded elt is a list of two elements elt[0],elt[1] as key and values.
错误
ValueError: Structure signature must be a single byte value
感谢前进.
推荐答案
您可以在RDD上执行foreach
,例如:
You can do a foreach
on your RDD, example :
from neo4j.v1 import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("",""), encrypted=False)
from pyspark import SparkContext
sc = SparkContext()
dt = sc.parallelize(range(1, 5))
def write2neo(v):
session = driver.session()
session.run("CREATE (n:Node {value: {v} })", {'v': v})
session.close()
dt.foreach(write2neo)
不过,我将改进批量写入操作的功能,但是此简单代码段适用于基本实现
I would however improve the function to batch the writes, but this simple snippet is working for basic implementation
使用批处理示例进行更新
sc = SparkContext()
batch = []
max = None
processed = 0
def writeBatch(b):
print("writing batch of " + str(len(b)))
session = driver.session()
session.run('UNWIND {batch} AS elt CREATE (n:Node {v: elt})', {'batch': b})
session.close()
def write2neo(v):
batch.append(v)
global processed
processed += 1
if len(batch) >= 500 or processed >= max:
writeBatch(batch)
batch[:] = []
dt = sc.parallelize(range(1, 2136))
max = dt.count()
dt.foreach(write2neo)
-
16/09/15 12:25:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
writing batch of 500
writing batch of 500
writing batch of 500
writing batch of 500
writing batch of 135
16/09/15 12:25:47 INFO PythonRunner: Times: total = 279, boot = -103, init = 245, finish = 137
16/09/15 12:25:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1301 bytes result sent to driver
16/09/15 12:25:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 294 ms on localhost (1/1)
16/09/15 12:25:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/09/15 12:25:47 INFO DAGScheduler: ResultStage 1 (foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36) finished in 0.295 s
16/09/15 12:25:47 INFO DAGScheduler: Job 1 finished: foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36, took 0.308263 s
这篇关于在Python中将Spark RDD加载到Neo4j的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!