Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ abstract class EdgeRDD[ED](
(other: EdgeRDD[ED2])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

/**
* Unions this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
* [[PartitionStrategy]].
*
* @param other the EdgeRDD to union with
* @param f the union function applied to corresponding values of `this` and `other`
* @param map the map function that maps `this` edge to unionized edge
* @param rmap the map function that maps `other` edge to unionized edge
* @return a new EdgeRDD containing all edges that appear in both `this` and `other`,
* with values supplied by `f`
*/
def union[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2]) (f: (VertexId, VertexId, ED, ED2) => ED3)
(map: (ED) => ED3, rmap: (ED2) => ED3) : EdgeRDD[ED3]

/**
* Changes the target storage level while preserving all other properties of the
* EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
Expand Down
21 changes: 21 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,27 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

/**
* Merges two graphs into one single graph. For correct results, the graph
* must have been partitioned using [[partitionBy]].
*
* @param other the graph to merge current graph with
* @param mergeEdges the user-supplied commutative associative function to
* merge edge attributes for duplicate edges.
* @param mergeVertices the user-supplied commutative associative function
* to merge vertex attributes for duplicate vertices
* @tparam VD2 other graph vertex type
* @tparam ED2 other graph edge type
* @tparam VD3 unionized graph vertex type
* @tparam ED3 unionized graph edge type
* @return The resulting graph with union of vertices from each constituent
* graph and a single edge for each (source, dest) vertex pair in either graph
*/
def union[VD2: ClassTag, VD3: ClassTag, ED2: ClassTag, ED3: ClassTag]
(other: Graph[VD2, ED2],
mergeEdges: (VertexId, VertexId, ED, ED2) => ED3,
mergeVertices: (VertexId, VertexId, VD, VD2) => VD3): Graph[VD3, ED3]

/**
* Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
* `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
Expand Down
24 changes: 24 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,30 @@ abstract class VertexRDD[VD](
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

/**
* Efficiently unions this VertexRDD with another VertexRDD sharing the same index. See
* [[union]] for the behavior of the union.
*/

def unionZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (VertexId, VertexId, VD, U) => VD2): VertexRDD[VD2]

/**
* Unions this VertexRDD with an RDD containing vertex attribute pairs. If the other RDD is
* backed by a VertexRDD with the same index then the efficient [[union]] implementation
* is used.
*
* @param other an RDD containing vertices to union with. If there are multiple entries
* for the same vertex, one is picked arbitrarily.
* Use [[aggregateUsingIndex]] to merge multiple entries.
* @param f the union function applied to corresponding values of `this` and `other`
* @return a VertexRDD co-indexed with `this`, containing vertices that appear in both
* `this` and `other`, with values supplied by `f`
*/

def unionJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexId, VertexId, VD, U) => VD2) : VertexRDD[VD2]

/**
* Aggregates vertices in `messages` that have the same ids using `reduceFunc`, returning a
* VertexRDD co-indexed with `this`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.graphx.impl

import java.util

import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.reflect.{classTag, ClassTag}

import org.apache.spark.graphx._
Expand Down Expand Up @@ -107,7 +111,7 @@ class EdgePartition[

@inline private def dstIds(pos: Int): VertexId = local2global(localDstIds(pos))

@inline private def attrs(pos: Int): ED = data(pos)
@inline def attrs(pos: Int): ED = data(pos)

/** Look up vid in activeSet, throwing an exception if it is None. */
def isActive(vid: VertexId): Boolean = {
Expand Down Expand Up @@ -306,6 +310,73 @@ class EdgePartition[
builder.toEdgePartition
}

/**
* Apply `f` to all edges present in either `this` or `other` and return a new `EdgePartition`
* containing the resulting edges.
*
* If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
* each edge, but each time it may be invoked on any corresponding edge in `other`.
*
* If there are multiple edges with the same src and dst in `other`, `f` will only be invoked
* once.
*/
def union[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3)
(map: (ED) => ED3, rmap: (ED2) => ED3) : EdgePartition[ED3, VD] = {
val builder = new EdgePartitionBuilder[ED3, VD]()
var i = 0
var j = 0
// For i = index of each edge in `this`...

val indexedEdgesThisPartition:
HashMap[Long, Tuple5[VertexId, VertexId, ED, PartitionID, PartitionID]] = HashMap()
val indexedEdgesOtherPartition :
HashMap[Long, Tuple5[VertexId, VertexId, ED2, PartitionID, PartitionID]] = HashMap()

while (i < this.size) {
indexedEdgesThisPartition.put(31*this.srcIds(i) + this.dstIds(i),
(this.srcIds(i), this.dstIds(i), this.data(i), localSrcIds(i), localDstIds(i)))
i = i + 1
}
while (j < other.size) {
indexedEdgesOtherPartition.put(31*other.srcIds(j) + other.dstIds(j),
(other.srcIds(j), other.dstIds(j), other.attrs(j), localSrcIds(j), localDstIds(j)))
j = j + 1
}

def unionizePartitions[ED: ClassTag, ED2: ClassTag](
thisPartitionMap: HashMap[Long, (VertexId, VertexId, ED, PartitionID, PartitionID)],
otherPartitionMap: HashMap[Long, (VertexId, VertexId, ED2, PartitionID, PartitionID)])
(f: (VertexId, VertexId, ED, ED2) => ED3)
(map: (ED) => ED3): Unit = {
val iter = thisPartitionMap.keysIterator
while (iter.hasNext) {
val key = iter.next()
val srcId = thisPartitionMap.get(key).get._1
val dstId = thisPartitionMap.get(key).get._2
val localSrcId: PartitionID = thisPartitionMap.get(key).get._4
val localDstId: PartitionID = thisPartitionMap.get(key).get._5
if (otherPartitionMap.contains(key)) {
builder.add(srcId, dstId,
f(srcId, dstId, thisPartitionMap.get(key).get._3, otherPartitionMap.get(key).get._3))
otherPartitionMap -= key
}
else {
builder.add(srcId, dstId,
map(thisPartitionMap.get(key).get._3))
}
}
}

unionizePartitions[ED, ED2](indexedEdgesThisPartition, indexedEdgesOtherPartition)(f)(map)
unionizePartitions[ED2, ED](indexedEdgesOtherPartition,
indexedEdgesThisPartition)((v1: VertexId, v2: VertexId,
e2: ED2, e1: ED) => f(v1, v2, e1, e2))(rmap)

builder.toEdgePartition
}

/**
* The number of edges in this partition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
})
}

override def union[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2]) (f: (VertexId, VertexId, ED, ED2) => ED3)
(map: (ED) => ED3, rmap: (ED2) => ED3) : EdgeRDDImpl[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.union(otherEPart)(f)(map, rmap)(ed2Tag, ed3Tag)))
})
}

def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDDImpl[ED2, VD2] = {
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.HashPartitioner
import org.apache.spark.{graphx, HashPartitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -184,6 +184,19 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}

override def union[VD2: ClassTag, VD3: ClassTag, ED2: ClassTag, ED3: ClassTag]
(other: Graph[VD2, ED2],
mergeEdges: (VertexId, VertexId, ED, ED2) => ED3,
mergeVertices: (VertexId, VertexId, VD, VD2) => VD3): Graph[VD3, ED3] = {
val newVertices : VertexRDD[VD3] = vertices.unionJoin(other.vertices)(mergeVertices)
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD3, ED]]
.updateVertices(newVertices)
val newEdges : EdgeRDDImpl[ED3, VD3] =
newReplicatedVertexView.edges.union(other.edges)(mergeEdges)(
(edge: ED) => edge.asInstanceOf[ED3], (edge: ED2) => edge.asInstanceOf[ED3])

new GraphImpl[VD3, ED3](newVertices, newReplicatedVertexView.withEdges(newEdges))
}
// ///////////////////////////////////////////////////////////////////////////////////////////////
// Lower level transformation methods
// ///////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ private[graphx] object VertexPartitionBase {
}
(map.keySet, map._values, map.keySet.getBitSet)
}

/**
* Construct the constituents of a VertexPartitionBase from the given two sets of vertices,
* merging duplicate entries using 'mergeFunc'
*/


def initFrom[VD: ClassTag](iter1: Iterator[(VertexId, VD)],
iter2: Iterator[(VertexId, VD)], mergeFunc : (VD, VD) => VD)
: (VertexIdToIndexMap, Array[VD], BitSet) = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
iter1.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
iter2.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
(map.keySet, map._values, map.keySet.getBitSet)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,47 @@ private[graphx] abstract class VertexPartitionBaseOps
innerJoin(createUsingIndex(iter))(f)
}

/** Union another VertexPartition. */
def union[U: ClassTag, VD2: ClassTag]
(other: Self[U])
(f: (VertexId, VertexId, VD, U) => VD2): Self[VD2] = {
if (self.index != other.index) {
logWarning("Unioning two VertexPartitions with different indexes is slow.")
union(createUsingIndex(other.iterator))(f)
} else {
val newMask = self.mask | other.mask
val newValues = new Array[VD2](self.capacity + other.capacity)
val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD2]
var i = newMask.nextSetBit(0)
while (i >= 0) {
if (self.mask.get(i) && other.mask.get(i)) {
newValues(i) = f(self.index.getValue(i), other.index.getValue(i),
self.values(i), other.values(i))
hashMap.update(self.index.getValue(i), newValues(i))
}
else if (self.mask.get(i)) {
newValues(i) = self.values(i).asInstanceOf[VD2]
hashMap.update(self.index.getValue(i), newValues(i))
}
else {
newValues(i) = other.values(i).asInstanceOf[VD2]
hashMap.update(other.index.getValue(i), newValues(i))
}
i = newMask.nextSetBit(i + 1)
}
this.withIndex(hashMap.keySet).withValues(newValues).withMask(newMask)
}
}

/**
* Union an iterator of messages.
*/
def union[U: ClassTag, VD2: ClassTag]
(iter: Iterator[Product2[VertexId, U]])
(f: (VertexId, VertexId, VD, U) => VD2): Self[VD2] = {
union(createUsingIndex(iter))(f)
}

/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,36 @@ class VertexRDDImpl[VD] private[graphx] (
}
}

override def unionZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (VertexId, VertexId, VD, U) => VD2): VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
Iterator(thisPart.union(otherPart)(f))
}
this.withPartitionsRDD(newPartitionsRDD)
}

override def unionJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexId, VertexId, VD, U) => VD2) : VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient union
var msgs = null
other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
unionZipJoin(other)(f)
case _ =>
this.withPartitionsRDD(
partitionsRDD.zipPartitions(
other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.union(msgs)(f))
}
)
}
}

override def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = messages.partitionBy(this.partitioner.get)
Expand Down
31 changes: 31 additions & 0 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,37 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("summation") {
withSpark { sc =>
val n = 8
val verticesG = sc.parallelize((1 to n).map(x => (x: VertexId, x)))
val edgesG = sc.parallelize(Seq(
Edge(1, 2, 1), Edge(1, 3, 1), Edge(1, 4, 1), Edge(2, 3, 2), Edge(2, 4, 2), Edge(3, 4, 3),
Edge(4, 5, 4), Edge(4, 6, 4), Edge(5, 6, 5)))
val graphG: Graph[Int, Int] = Graph(verticesG, edgesG).cache()

val verticesH = sc.parallelize((5 to 8).map(x => (x: VertexId, x)))
val edgesH = sc.parallelize(Seq(
Edge(5, 6, 5), Edge(5, 7, 5), Edge(5, 8, 5), Edge(6, 7, 6), Edge(6, 8, 6), Edge(7, 8, 7)))
val graphH: Graph[Int, Int] = Graph(verticesH, edgesH).cache()

val projectedGraph = graphG.union[Int, Int, Int, Int](graphH,
(src, dst, a, b) => a - b,
(src, dst, a, b) => a + b)

val v = projectedGraph.vertices.collect().toSet
assert(v === (1 to 8).map(e => (if (e < 5) (e, e) else (e, 2*e))).toSet)

// the map is necessary because of object-reuse in the edge iterator
val e = projectedGraph.edges.map(e => Edge(e.srcId, e.dstId, e.attr)).collect()
assert(e.toSet === Set(Edge(1, 3, 1), Edge(6, 8, 6), Edge(3, 4, 3),
Edge(4, 6, 4), Edge(2, 3, 2), Edge(2, 4, 2),
Edge(7, 8, 7), Edge(5, 7, 5), Edge(1, 2, 1),
Edge(6, 7, 6), Edge(5, 8, 5), Edge(4, 5, 4),
Edge(1, 4, 1), Edge(5, 6, 0)))
}
}

test("groupEdges") {
withSpark { sc =>
val n = 5
Expand Down
Loading