带Spark的Geohash NEO4j图 [英] Geohash NEO4j Graph with Spark

查看:204
本文介绍了带Spark的Geohash NEO4j图的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Neo4j/Cypher,我的数据约为 200GB ,所以我想到了可扩展的解决方案"spark".

I am using Neo4j/Cypher , my data is about 200GB , so i thought of scalable solution "spark".

有两种解决方案可用来制作带有spark的neo4j图:

Two solutions are available to make neo4j graphs with spark :

1)用于Apache Spark的Cypher( CAPS )

1) Cypher for Apache Spark (CAPS)

2)Neo4j-Spark-连接器

2) Neo4j-Spark-Connector

我用了第一个,CAPS. 预处理的CSV具有两个"geohash"信息:每行一个用于拾取,另一个用于下降 我想要的是制作一个连接的geohash节点图.

I used the first one ,CAPS . The pre-processed CSV got two "geohash" informations : one for pickup and another for drop off for each row what i want is to make a connected graph of geohash nodes.

CAPS仅允许通过映射节点制作图形: 如果要将ID为0的节点连接到ID为1的节点,则需要具有起始ID 0和结束ID 1的关系.

CAPS allow only to make a graph by mapping nodes : If node with id 0 is to be connected to node with id 1 you need to have a relationship with start id 0 and end id 1.

一个非常简单的布局是:

A very simple layout would be:

Nodes: (just id, no properties)

id
0
1
2

Relationships: (just the mandatory fields)

id | start | end
0  | 0     | 1
1  | 0     | 2

基于我已将CSV加载到Spark Dataframe中的信息,然后将其拆分为:

based on that i ve loaded my CSV into a Spark Dataframe , then i 've splitted the dataframe into :

  • 提取数据框

  • Pickup dataframe

删除数据框和

旅行数据框

我已经为两个第一个数据帧生成了一个id,并通过在第三个数据帧中添加列来创建了映射 这是结果: 为每个映射的行生成一对节点(pickup- [Trip]-> drop).

I've generated an id for the two first data-frames, and created a mapping by adding columns to third data-frame and this was the result : A pair of nodes ( pickup-[Trip]->drop off) generated for each mapped rows.

我遇到的问题是:

1)可以为不同的旅程重复拾取或下车的风水散布=>我想合并节点的创建

1) the geohash of pickup or a drop off could be repeated for different trips=> i want to merge the creation of nodes

2)一次旅行的下车可能是另一次旅行的接客,因此我需要将这两个节点合并为一个

2) a drop off for a trip could be a pickup for another trip so i need to merge this two nodes into one

我试图更改图形,但令我惊讶的是火花图形是不可变的=>您无法应用密码查询来对其进行更改.

i tried to change the graph but i was surprised that spark graphs are immutable=>you can't apply cypher queries to change it.

因此,有一种方法可以使 连接 面向 geohash 带有火花的图?

So is there a way to make a connected ,oriented and merged geohash graph with spark ?

这是我的代码:

This is my code :

package org.opencypher.spark.examples

import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.{CAPSNodeTable, CAPSRelationshipTable}
import org.opencypher.spark.util.ConsoleApp
import java.net.URI
import org.opencypher.okapi.api.io.conversion.NodeMapping
import org.opencypher.okapi.api.io.conversion.RelationshipMapping
import org.opencypher.spark.api.io.neo4j.Neo4jPropertyGraphDataSource
import org.opencypher.spark.api.io.neo4j.Neo4jConfig
import org.apache.spark.sql.functions._
import org.opencypher.okapi.api.graph.GraphName

object GreenCabsInputDataFrames extends ConsoleApp {

 //1) Create CAPS session and retrieve Spark session
    implicit val session: CAPSSession = CAPSSession.local()
    val spark = session.sparkSession

 //2) Load a csv into dataframe
 val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\green_result\\green_data.csv").select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21","_c22","_c23")

//3) cache the dataframe
 val df1=df.cache()

 //4) subset the dataframe
 val pickup_dataframe=df1.select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21")
 val dropoff_dataframe=df1.select("_c22","_c23")

 //5) uncache the dataframe
 df1.unpersist()

 //6) add id columns to pickup , dropoff and trip dataframes
 val pickup_dataframe2= pickup_dataframe.withColumn("id1",monotonically_increasing_id+pickup_dataframe.count()).select("id1",pickup_dataframe.columns:_*)
 val dropoff_dataframe2= dropoff_dataframe.withColumn("id2",monotonically_increasing_id+pickup_dataframe2.count()+pickup_dataframe.count()).select("id2",dropoff_dataframe.columns:_*)
 //7) create the relationship "trip" is dataframe
 val trip_data_dataframe2=pickup_dataframe2.withColumn("idj",monotonically_increasing_id).join(dropoff_dataframe2.withColumn("idj",monotonically_increasing_id),"idj")

 //drop unnecessary columns
 val  pickup_dataframe3=pickup_dataframe2.drop("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
 val trip_data_dataframe3=trip_data_dataframe2.drop("_c20","_c21","_c22","_c23")

  //8) reordering the columns of trip dataframe

 val trip_data_dataframe4=trip_data_dataframe3.select("idj", "id1", "id2", "_c0", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c3", "_c4","_c9")

  //8.1)displaying dataframes in console
   pickup_dataframe3.show()
   dropoff_dataframe2.show()
   trip_data_dataframe4.show()
 //9) mapping the columns
 val Pickup_mapping=NodeMapping.withSourceIdKey("id1").withImpliedLabel("HashNode").withPropertyKeys("_c21","_c20")
 val Dropoff_mapping=NodeMapping.withSourceIdKey("id2").withImpliedLabel("HashNode").withPropertyKeys("_c23","_c22")
 val Trip_mapping=RelationshipMapping.withSourceIdKey("idj").withSourceStartNodeKey("id1").withSourceEndNodeKey("id2").withRelType("TRIP").withPropertyKeys("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")

 //10)  create tables
val Pickup_Table2 = CAPSNodeTable(Pickup_mapping, pickup_dataframe3)
val Dropoff_Table = CAPSNodeTable(Dropoff_mapping, dropoff_dataframe2)
val Trip_Table = CAPSRelationshipTable(Trip_mapping,trip_data_dataframe4)

//11) Create graph
val graph = session.readFrom(Pickup_Table2,Dropoff_Table, Trip_Table)

//12)  Connect to Neo4j
val boltWriteURI: URI = new URI("bolt://localhost:7687")
val neo4jWriteConfig: Neo4jConfig = new Neo4jConfig(boltWriteURI, "neo4j", Some("wakarimashta"), true)
val neo4jResult: Neo4jPropertyGraphDataSource = new Neo4jPropertyGraphDataSource(neo4jWriteConfig)(session)


 //13) Store graph in neo4j
 val neo4jResultName: GraphName = new GraphName("neo4jgraphs151")
 neo4jResult.store(neo4jResultName, graph)
 }

推荐答案

是的,CAPS就像Spark一样,是一个不可变的系统.但是,使用CAPS可以从Cypher语句中创建新图形:

You are right, CAPS is, just like Spark, an immutable system. However, with CAPS you can create new graphs from within a Cypher statement: https://github.com/opencypher/cypher-for-apache-spark/blob/master/spark-cypher-examples/src/main/scala/org/opencypher/spark/examples/MultipleGraphExample.scala

目前,CONSTRUCT子句对MERGE的支持有限.它仅允许将已经绑定的节点添加到新创建的图,而每个绑定节点仅添加一次,而与绑定表中出现的时间无关.

At the moment the CONSTRUCT clause has limited support for MERGE. It only allows to add already bound nodes to the newly created graph, while each bound node is added exactly once independent off how many time it occurs in the binding table.

考虑以下查询:

MATCH (n), (m)
CONSTRUCT
  CREATE (n), (m)
RETURN GRAPH

结果图将具有与输入图一样多的节点.

The resulting graph will have as many nodes as the input graph.

要解决您的问题,您可以使用两种方法:a)在创建图形之前已经进行重复数据删除,b)使用Cypher查询.方法b)如下所示:

To solve your problem you could use two approaches: a) already deduplicate before creating the graph, b) using Cypher queries. Approach b) would look like:

// assuming that graph is the graph created at step 11
session.catalog.store("inputGraph", graph)

session.cypher("""
  CATALOG CREATE GRAPH temp {
    FROM GRAPH session.inputGraph
    MATCH (n)
    WITH DISTINCT n.a AS a, n.b as b
    CONSTRUCT 
      CREATE (:HashNode {a: a, b as b})
    RETURN GRAPH
  }
""".stripMargin)

val mergeGraph = session.cypher("""
  FROM GRAPH inputGraph
  MATCH (from)-[via]->(to)
  FROM GRAPH temp
  MATCH (n), (m)
  WHERE from.a = n.a AND from.b = n.b AND to.a = m.a AND to.b = m.b
  CONSTRUCT 
    CREATE (n)-[COPY OF via]->(m)
  RETURN GRAPH
""".stripMargin).graph

注意:将属性名称用于机器人拾取和放置节点(例如a和b)

Note: Use the property names for bot pickup and dropoff nodes (e.g. a and b)

这篇关于带Spark的Geohash NEO4j图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆