Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.baidu.hugegraph.backend.cache;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.slf4j.Logger;
Expand All @@ -45,6 +46,9 @@ public abstract class AbstractCache<K, V> implements Cache<K, V> {
private final long capacity;
private final long halfCapacity;

// For user attachment
private final AtomicReference<Object> attachment;

public AbstractCache() {
this(DEFAULT_SIZE);
}
Expand All @@ -55,6 +59,7 @@ public AbstractCache(long capacity) {
}
this.capacity = capacity;
this.halfCapacity = this.capacity >> 1;
this.attachment = new AtomicReference<>();
}

@Watched(prefix = "cache")
Expand Down Expand Up @@ -204,6 +209,19 @@ public final long capacity() {
return this.capacity;
}

@Override
public <T> T attachment(T object) {
this.attachment.compareAndSet(null, object);
return this.attachment();
}

@Override
public <T> T attachment() {
@SuppressWarnings("unchecked")
T attachment = (T) this.attachment.get();
return attachment;
}

protected final long halfCapacity() {
return this.halfCapacity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ public interface Cache<K, V> {
public long hits();

public long miss();

public <T> T attachment(T object);

public <T> T attachment();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.ram.IntObjectMap;
import com.baidu.hugegraph.backend.tx.SchemaTransaction;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.schema.SchemaElement;
Expand All @@ -44,17 +43,25 @@ public final class CachedSchemaTransaction extends SchemaTransaction {
private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;

private final SchemaCaches<SchemaElement> arrayCaches;

private EventListener storeEventListener;
private EventListener cacheEventListener;

private static final Map<String, CachedTypes> CACHED_TYPES =
new ConcurrentHashMap<>();

public CachedSchemaTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);

this.idCache = this.cache("schema-id");
this.nameCache = this.cache("schema-name");
final long capacity = graph.configuration()
.get(CoreOptions.SCHEMA_CACHE_CAPACITY);
this.idCache = this.cache("schema-id", capacity);
this.nameCache = this.cache("schema-name", capacity);

SchemaCaches<SchemaElement> attachment = this.idCache.attachment();
if (attachment == null) {
int acSize = (int) (capacity >> 3);
attachment = this.idCache.attachment(new SchemaCaches<>(acSize));
}
this.arrayCaches = attachment;

this.listenChanges();
}
Expand All @@ -69,23 +76,12 @@ public void close() {
}
}

private Cache<Id, Object> cache(String prefix) {
HugeConfig conf = super.params().configuration();

private Cache<Id, Object> cache(String prefix, long capacity) {
final String name = prefix + "-" + this.graphName();
final long capacity = conf.get(CoreOptions.SCHEMA_CACHE_CAPACITY);
// NOTE: must disable schema cache-expire due to getAllSchema()
return CacheManager.instance().cache(name, capacity);
}

private CachedTypes cachedTypes() {
String graph = this.params().name();
if (!CACHED_TYPES.containsKey(graph)) {
CACHED_TYPES.putIfAbsent(graph, new CachedTypes());
}
return CACHED_TYPES.get(graph);
}

private void listenChanges() {
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Expand All @@ -111,6 +107,8 @@ private void listenChanges() {
if ("invalid".equals(args[0])) {
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.arrayCaches.remove(type, id);

id = generateId(type, id);
Object value = this.idCache.get(id);
if (value != null) {
Expand Down Expand Up @@ -139,7 +137,7 @@ private void listenChanges() {
private void clearCache() {
this.idCache.clear();
this.nameCache.clear();
this.cachedTypes().clear();
this.arrayCaches.clear();
}

private void unlistenChanges() {
Expand All @@ -159,6 +157,10 @@ private void resetCachedAllIfReachedCapacity() {
}
}

private CachedTypes cachedTypes() {
return this.arrayCaches.cachedTypes();
}

private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
Expand All @@ -175,16 +177,26 @@ protected void addSchema(SchemaElement schema) {

this.resetCachedAllIfReachedCapacity();

// update id cache
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

// update name cache
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
// try get from optimized array cache
if (id.number() && id.asLong() > 0) {
SchemaElement value = this.arrayCaches.get(type, id);
if (value != null) {
return (T) value;
}
}

Id prefixedId = generateId(type, id);
Object value = this.idCache.get(prefixedId);
if (value == null) {
Expand All @@ -199,6 +211,12 @@ protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
this.nameCache.update(prefixedName, schema);
}
}

// update optimized array cache
if (value != null && id.number() && id.asLong() > 0) {
this.arrayCaches.set(type, id, (SchemaElement) value);
}

return (T) value;
}

Expand Down Expand Up @@ -236,6 +254,9 @@ protected void removeSchema(SchemaElement schema) {
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.invalidate(prefixedName);
}

// remove from optimized array cache
this.arrayCaches.remove(schema.type(), schema.id());
}

@Override
Expand Down Expand Up @@ -270,6 +291,114 @@ protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
}
}

private static final class SchemaCaches<V extends SchemaElement> {

private final int size;

private final IntObjectMap<V> pks;
private final IntObjectMap<V> vls;
private final IntObjectMap<V> els;
private final IntObjectMap<V> ils;

private final CachedTypes cachedTypes;

public SchemaCaches(int size) {
// TODO: improve size of each type for optimized array cache
this.size = size;

this.pks = new IntObjectMap<>(size);
this.vls = new IntObjectMap<>(size);
this.els = new IntObjectMap<>(size);
this.ils = new IntObjectMap<>(size);

this.cachedTypes = new CachedTypes();
}

public V get(HugeType type, Id id) {
assert id.number() && id.asLong() > 0 : id;
int key = (int) id.asLong();
if (key >= this.size) {
return null;
}
switch (type) {
case PROPERTY_KEY:
return this.pks.get(key);
case VERTEX_LABEL:
return this.vls.get(key);
case EDGE_LABEL:
return this.els.get(key);
case INDEX_LABEL:
return this.ils.get(key);
default:
return null;
}
}

public void set(HugeType type, Id id, V value) {
assert id.number() && id.asLong() > 0 : id;
int key = (int) id.asLong();
if (key >= this.size) {
return;
}
switch (type) {
case PROPERTY_KEY:
this.pks.set(key, value);
break;
case VERTEX_LABEL:
this.vls.set(key, value);
break;
case EDGE_LABEL:
this.els.set(key, value);
break;
case INDEX_LABEL:
this.ils.set(key, value);
break;
default:
// pass
break;
}
}

public void remove(HugeType type, Id id) {
assert id.number() && id.asLong() > 0 : id;
int key = (int) id.asLong();
V value = null;
if (key >= this.size) {
return;
}
switch (type) {
case PROPERTY_KEY:
this.pks.set(key, value);
break;
case VERTEX_LABEL:
this.vls.set(key, value);
break;
case EDGE_LABEL:
this.els.set(key, value);
break;
case INDEX_LABEL:
this.ils.set(key, value);
break;
default:
// pass
break;
}
}

public void clear() {
this.pks.clear();
this.vls.clear();
this.els.clear();
this.ils.clear();

this.cachedTypes.clear();
}

public CachedTypes cachedTypes() {
return this.cachedTypes;
}
}

private static class CachedTypes
extends ConcurrentHashMap<HugeType, Boolean> {

Expand Down
5 changes: 5 additions & 0 deletions hugegraph-test/src/main/resources/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ query.batch_size=4
query.page_size=2
query.index_intersect_threshold=2

#schema.cache_capacity=1000000
#query.ramtable_enable=true
#query.ramtable_vertices_capacity=1800
#query.ramtable_edges_capacity=1200

# cassandra backend config
cassandra.host=127.0.0.1
cassandra.port=9042
Expand Down