带Spark的Geohash NEO4j图 [英] Geohash NEO4j Graph with Spark
问题描述
我正在使用Neo4j/Cypher,我的数据约为 200GB ,所以我想到了可扩展的解决方案"spark".
I am using Neo4j/Cypher , my data is about 200GB , so i thought of scalable solution "spark".
有两种解决方案可用来制作带有spark的neo4j图:
Two solutions are available to make neo4j graphs with spark :
1)用于Apache Spark的Cypher( CAPS )
1) Cypher for Apache Spark (CAPS)
2)Neo4j-Spark-连接器
2) Neo4j-Spark-Connector
我用了第一个,CAPS. 预处理的CSV具有两个"geohash"信息:每行一个用于拾取,另一个用于下降 我想要的是制作一个连接的geohash节点图.
I used the first one ,CAPS . The pre-processed CSV got two "geohash" informations : one for pickup and another for drop off for each row what i want is to make a connected graph of geohash nodes.
CAPS仅允许通过映射节点制作图形: 如果要将ID为0的节点连接到ID为1的节点,则需要具有起始ID 0和结束ID 1的关系.
CAPS allow only to make a graph by mapping nodes : If node with id 0 is to be connected to node with id 1 you need to have a relationship with start id 0 and end id 1.
一个非常简单的布局是:
A very simple layout would be:
Nodes: (just id, no properties)
id
0
1
2
Relationships: (just the mandatory fields)
id | start | end
0 | 0 | 1
1 | 0 | 2
基于我已将CSV加载到Spark Dataframe中的信息,然后将其拆分为:
based on that i ve loaded my CSV into a Spark Dataframe , then i 've splitted the dataframe into :
-
提取数据框
Pickup dataframe
删除数据框和
旅行数据框
我已经为两个第一个数据帧生成了一个id,并通过在第三个数据帧中添加列来创建了映射 这是结果: 为每个映射的行生成一对节点(pickup- [Trip]-> drop).
I've generated an id for the two first data-frames, and created a mapping by adding columns to third data-frame and this was the result : A pair of nodes ( pickup-[Trip]->drop off) generated for each mapped rows.
我遇到的问题是:
1)可以为不同的旅程重复拾取或下车的风水散布=>我想合并节点的创建
1) the geohash of pickup or a drop off could be repeated for different trips=> i want to merge the creation of nodes
2)一次旅行的下车可能是另一次旅行的接客,因此我需要将这两个节点合并为一个
2) a drop off for a trip could be a pickup for another trip so i need to merge this two nodes into one
我试图更改图形,但令我惊讶的是火花图形是不可变的=>您无法应用密码查询来对其进行更改.
i tried to change the graph but i was surprised that spark graphs are immutable=>you can't apply cypher queries to change it.
因此,有一种方法可以使 连接 , 面向 和 geohash 带有火花的图?
So is there a way to make a connected ,oriented and merged geohash graph with spark ?
这是我的代码:
This is my code :
package org.opencypher.spark.examples
import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.{CAPSNodeTable, CAPSRelationshipTable}
import org.opencypher.spark.util.ConsoleApp
import java.net.URI
import org.opencypher.okapi.api.io.conversion.NodeMapping
import org.opencypher.okapi.api.io.conversion.RelationshipMapping
import org.opencypher.spark.api.io.neo4j.Neo4jPropertyGraphDataSource
import org.opencypher.spark.api.io.neo4j.Neo4jConfig
import org.apache.spark.sql.functions._
import org.opencypher.okapi.api.graph.GraphName
object GreenCabsInputDataFrames extends ConsoleApp {
//1) Create CAPS session and retrieve Spark session
implicit val session: CAPSSession = CAPSSession.local()
val spark = session.sparkSession
//2) Load a csv into dataframe
val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\green_result\\green_data.csv").select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21","_c22","_c23")
//3) cache the dataframe
val df1=df.cache()
//4) subset the dataframe
val pickup_dataframe=df1.select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21")
val dropoff_dataframe=df1.select("_c22","_c23")
//5) uncache the dataframe
df1.unpersist()
//6) add id columns to pickup , dropoff and trip dataframes
val pickup_dataframe2= pickup_dataframe.withColumn("id1",monotonically_increasing_id+pickup_dataframe.count()).select("id1",pickup_dataframe.columns:_*)
val dropoff_dataframe2= dropoff_dataframe.withColumn("id2",monotonically_increasing_id+pickup_dataframe2.count()+pickup_dataframe.count()).select("id2",dropoff_dataframe.columns:_*)
//7) create the relationship "trip" is dataframe
val trip_data_dataframe2=pickup_dataframe2.withColumn("idj",monotonically_increasing_id).join(dropoff_dataframe2.withColumn("idj",monotonically_increasing_id),"idj")
//drop unnecessary columns
val pickup_dataframe3=pickup_dataframe2.drop("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
val trip_data_dataframe3=trip_data_dataframe2.drop("_c20","_c21","_c22","_c23")
//8) reordering the columns of trip dataframe
val trip_data_dataframe4=trip_data_dataframe3.select("idj", "id1", "id2", "_c0", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c3", "_c4","_c9")
//8.1)displaying dataframes in console
pickup_dataframe3.show()
dropoff_dataframe2.show()
trip_data_dataframe4.show()
//9) mapping the columns
val Pickup_mapping=NodeMapping.withSourceIdKey("id1").withImpliedLabel("HashNode").withPropertyKeys("_c21","_c20")
val Dropoff_mapping=NodeMapping.withSourceIdKey("id2").withImpliedLabel("HashNode").withPropertyKeys("_c23","_c22")
val Trip_mapping=RelationshipMapping.withSourceIdKey("idj").withSourceStartNodeKey("id1").withSourceEndNodeKey("id2").withRelType("TRIP").withPropertyKeys("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
//10) create tables
val Pickup_Table2 = CAPSNodeTable(Pickup_mapping, pickup_dataframe3)
val Dropoff_Table = CAPSNodeTable(Dropoff_mapping, dropoff_dataframe2)
val Trip_Table = CAPSRelationshipTable(Trip_mapping,trip_data_dataframe4)
//11) Create graph
val graph = session.readFrom(Pickup_Table2,Dropoff_Table, Trip_Table)
//12) Connect to Neo4j
val boltWriteURI: URI = new URI("bolt://localhost:7687")
val neo4jWriteConfig: Neo4jConfig = new Neo4jConfig(boltWriteURI, "neo4j", Some("wakarimashta"), true)
val neo4jResult: Neo4jPropertyGraphDataSource = new Neo4jPropertyGraphDataSource(neo4jWriteConfig)(session)
//13) Store graph in neo4j
val neo4jResultName: GraphName = new GraphName("neo4jgraphs151")
neo4jResult.store(neo4jResultName, graph)
}
推荐答案
是的,CAPS就像Spark一样,是一个不可变的系统.但是,使用CAPS可以从Cypher语句中创建新图形:
You are right, CAPS is, just like Spark, an immutable system. However, with CAPS you can create new graphs from within a Cypher statement: https://github.com/opencypher/cypher-for-apache-spark/blob/master/spark-cypher-examples/src/main/scala/org/opencypher/spark/examples/MultipleGraphExample.scala
目前,CONSTRUCT
子句对MERGE
的支持有限.它仅允许将已经绑定的节点添加到新创建的图,而每个绑定节点仅添加一次,而与绑定表中出现的时间无关.
At the moment the CONSTRUCT
clause has limited support for MERGE
. It only allows to add already bound nodes to the newly created graph, while each bound node is added exactly once independent off how many time it occurs in the binding table.
考虑以下查询:
MATCH (n), (m)
CONSTRUCT
CREATE (n), (m)
RETURN GRAPH
结果图将具有与输入图一样多的节点.
The resulting graph will have as many nodes as the input graph.
要解决您的问题,您可以使用两种方法:a)在创建图形之前已经进行重复数据删除,b)使用Cypher查询.方法b)如下所示:
To solve your problem you could use two approaches: a) already deduplicate before creating the graph, b) using Cypher queries. Approach b) would look like:
// assuming that graph is the graph created at step 11
session.catalog.store("inputGraph", graph)
session.cypher("""
CATALOG CREATE GRAPH temp {
FROM GRAPH session.inputGraph
MATCH (n)
WITH DISTINCT n.a AS a, n.b as b
CONSTRUCT
CREATE (:HashNode {a: a, b as b})
RETURN GRAPH
}
""".stripMargin)
val mergeGraph = session.cypher("""
FROM GRAPH inputGraph
MATCH (from)-[via]->(to)
FROM GRAPH temp
MATCH (n), (m)
WHERE from.a = n.a AND from.b = n.b AND to.a = m.a AND to.b = m.b
CONSTRUCT
CREATE (n)-[COPY OF via]->(m)
RETURN GRAPH
""".stripMargin).graph
注意:将属性名称用于机器人拾取和放置节点(例如a和b)
Note: Use the property names for bot pickup and dropoff nodes (e.g. a and b)
这篇关于带Spark的Geohash NEO4j图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!