From f73072127e6f4d99e9d2c03a850053cecbb1e2a7 Mon Sep 17 00:00:00 2001 From: ArcherShao Date: Mon, 12 May 2014 23:03:19 +0800 Subject: [PATCH 1/2] Add a function that can build an EdgePartion faster. --- .../spark/graphx/impl/EdgePartition.scala | 4 +-- .../graphx/impl/EdgePartitionBuilder.scala | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 871e81f8d245c..86834df8cce95 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -204,7 +204,7 @@ class EdgePartition[ if (size > 0) { builder.add(currSrcId, currDstId, currAttr) } - builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet) + builder.toEdgePartitionWithoutSort.withVertices(vertices).withActiveSet(activeSet) } /** @@ -238,7 +238,7 @@ class EdgePartition[ } i += 1 } - builder.toEdgePartition.withVertices(vertices).withActiveSet(activeSet) + builder.toEdgePartitionWithoutSort.withVertices(vertices).withActiveSet(activeSet) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index ecb49bef42e45..7e7467a55b6ed 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -69,4 +69,36 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla new EdgePartition(srcIds, dstIds, data, index, vertices) } + + /** If every edge add to edges in Edge.lexicographicOrdering, we don't need to sort edgeArray.*/ + def toEdgePartitionWithoutSort: EdgePartition[ED, VD] = { + val edgeArray = edges.trim().array + val srcIds = edgeArray.map(edge => edge.srcId) + val dstIds = edgeArray.map(edge => edge.dstId) + val data = edgeArray.map(edge => edge.attr) + val index = new PrimitiveKeyOpenHashMap[VertexId, Int] + + val edgeNum = edgeArray.length + if (edgeNum > 0) { + index.update(srcIds(0), 0) + var currSrcId: VertexId = srcIds(0) + var i = 0 + while (i < edgeNum) { + if (srcIds(i) != currSrcId) { + currSrcId = srcIds(i) + index.update(currSrcId, i) + } + i += 1 + } + } + + // Create and populate a VertexPartition with vids from the edges, but no attributes + val vidsIter = srcIds.iterator ++ dstIds.iterator + val vertexIds = new OpenHashSet[VertexId] + vidsIter.foreach(vid => vertexIds.add(vid)) + val vertices = new VertexPartition( + vertexIds, new Array[VD](vertexIds.capacity), vertexIds.getBitSet) + + new EdgePartition(srcIds, dstIds, data, index, vertices) + } } From 9d981ec2ee3e935f50e63de665527ff3f25716f7 Mon Sep 17 00:00:00 2001 From: ArcherShao Date: Wed, 14 May 2014 21:57:04 +0800 Subject: [PATCH 2/2] Fix scalastyle error. --- .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 7e7467a55b6ed..e2cfc079f748c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -70,7 +70,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla new EdgePartition(srcIds, dstIds, data, index, vertices) } - /** If every edge add to edges in Edge.lexicographicOrdering, we don't need to sort edgeArray.*/ + /** If every edge add to edges in Edge.lexicographicOrdering, we don't need to sort edgeArray. */ def toEdgePartitionWithoutSort: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array val srcIds = edgeArray.map(edge => edge.srcId)