Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

import java.util.Collection;
import java.util.LinkedList;


/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-wrote this slightly:

"A {@link Metric} that stores a collection of values. By default, a {@link ListGauge} retains the {@code size} most recent values"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed these comments in new, separate PR: #541

* A ListGauge is a collection of {@link org.apache.samza.metrics.Gauge}. ListGauges are useful for maintaining, recording,
* or collecting specific Gauge values over time. It is implemented as a {@link org.apache.samza.metrics.Metric}.
* For example, the set of recent errors that have occurred.
*
* This current implementation uses a size-bound-policy and holds the N most-recent Gauge objects added to the list.
* This bound N is configurable at instantiation time.
* TODO: Support a time-based and size-and-time-based hybrid policy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is only adding gauges, maybe move the TODOs to another JIRA and assign them to yourself. You can remove these from code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* TODO: Add a derived class to do compaction for errors using hash-based errorIDs and adding timestamp for errors to dedup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be a new metric type then? would be good to think what the end-user API is so that we build the right thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, agree

* on the read path.
*
* All public methods are thread-safe.
*
*/
public class ListGauge implements Metric {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this as a list of values? You don't have to expose the fact that it uses a Gauge underneath. ListGauge merely takes in an add(T val)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to parameterize this class? ListGauge<T> since Gauges are parameterized already

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final String name;
private final Collection<Gauge> metricList;
private int nItems;

/**
* Create a new ListGauge.
* @param name Name to be assigned
* @param nItems The number of items to hold in the list
*/
public ListGauge(String name, int nItems) {
this.name = name;
this.metricList = new LinkedList<>();
this.nItems = nItems;
}

/**
* Get the name assigned to the ListGauge
* @return the assigned name
*/
public String getName() {
return this.name;
}

/**
* Add a gauge to the list
* @param value The Gauge value to be added
*/
public synchronized void add(Gauge value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just take in a T value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (this.metricList.size() == nItems) {
((LinkedList<Gauge>) this.metricList).removeFirst();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably don't need this cast here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

this.metricList.add(value);
}

/**
* Get the Collection of Gauge values currently in the list
* @return the collection of gauge values
*/
public synchronized Collection<Gauge> getValue() {
return this.metricList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return a copy or wrap it in an immutable collection so that modifications don't alter the original?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* {@inheritDoc}
*/
@Override
public synchronized void visit(MetricsVisitor visitor) {
visitor.listGauge(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ public interface MetricsRegistry {
*/
<T> Gauge<T> newGauge(String group, Gauge<T> value);

/**
* Register a {@link org.apache.samza.metrics.ListGauge}
* @param group Group for this ListGauge
* @param listGauge the ListGauge to register
* @param <T> Type of the ListGauge
* @return ListGauge registered
*/
<T> ListGauge newListGauge(String group, ListGauge listGauge);

/**
* Create and Register a new {@link org.apache.samza.metrics.Timer}
* @param group Group for this Timer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ public abstract class MetricsVisitor {

public abstract void timer(Timer timer);

public abstract <T> void listGauge(ListGauge listGauge);

public void visit(Metric metric) {
if (metric instanceof Counter) {
// Cast for metrics of type ListGauge
if (metric instanceof ListGauge) {
listGauge((ListGauge) metric);
} else if (metric instanceof Counter) {
counter((Counter) metric);
} else if (metric instanceof Gauge<?>) {
gauge((Gauge<?>) metric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener {

void onGauge(String group, Gauge<?> gauge);

void onListGauge(String group, ListGauge listGauge);

void onTimer(String group, Timer timer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;


/**
* {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be
* recorded but a registry is still required.
Expand All @@ -49,6 +51,11 @@ public <T> Gauge<T> newGauge(String group, Gauge<T> gauge) {
return gauge;
}

@Override
public <T> ListGauge newListGauge(String group, ListGauge listGauge) {
return listGauge;
}

@Override
public Timer newTimer(String group, String name) {
return new Timer(name);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import org.junit.Assert;
import org.junit.Test;


/**
* Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge}
*/
public class TestListGauge {

private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10);

@Test
public void basicTest() {
ListGauge listGauge = new ListGauge("listGauge", 10);
Gauge<String> sampleGauge = new Gauge<String>("key", "value");
listGauge.add(sampleGauge);
Assert.assertEquals("Names should be the same", listGauge.getName(), "listGauge");
Assert.assertEquals("List sizes should match", listGauge.getValue().size(), 1);
Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValue().contains(sampleGauge), true);
}

@Test
public void testSizeEnforcement() {
ListGauge listGauge = new ListGauge("listGauge", 10);
for (int i = 15; i > 0; i--) {
Gauge<String> sampleGauge = new Gauge<String>("key", "v" + i);
listGauge.add(sampleGauge);
}
Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValue().size(), 10);

int valueIndex = 10;
Collection<Gauge> currentList = listGauge.getValue();
Iterator iterator = currentList.iterator();
while (iterator.hasNext()) {
Gauge<String> gauge = (Gauge<String>) iterator.next();
Assert.assertTrue(gauge.getName().equals("key"));
Assert.assertTrue(gauge.getValue().equals("v" + valueIndex));
valueIndex--;
}
}

@Test
public void testThreadSafety() throws InterruptedException {
ListGauge listGauge = new ListGauge("listGauge", 20);

Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
listGauge.add(new Gauge<Integer>("thread1", i));
}
}
});

Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= 100; i++) {
listGauge.add(new Gauge<Integer>("key", i));
}
}
});

thread1.start();
thread2.start();

thread1.join(THREAD_TEST_TIMEOUT.toMillis());
thread2.join(THREAD_TEST_TIMEOUT.toMillis());

Assert.assertTrue("ListGauge should have the last 20 values", listGauge.getValue().size() == 20);
for (Gauge gauge : listGauge.getValue()) {
Assert.assertTrue("Values should have the last 20 range",
((Integer) gauge.getValue()) <= 100 && ((Integer) gauge.getValue()) > 80);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testTimerWithDifferentWindowSize() {
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
assertEquals(3, snapshot.getValues().size());

// The time is 500 for update(4L) because getSnapshot calls clock once + 3
// The time is 500 for update(4L) because getValue calls clock once + 3
// updates that call clock 3 times
timer.update(4L);
Snapshot snapshot2 = timer.getSnapshot();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.system.eventhub;

import org.apache.commons.collections4.map.HashedMap;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class TestMetricsRegistry implements MetricsRegistry {

private Map<String, List<Counter>> counters = new HashedMap<>();
private Map<String, List<Gauge<?>>> gauges = new HashedMap<>();
private Map<String, List<ListGauge>> listGauges = new HashedMap<>();

public List<Counter> getCounters(String groupName) {
return counters.get(groupName);
Expand Down Expand Up @@ -78,6 +81,14 @@ public <T> Gauge<T> newGauge(String group, Gauge<T> value) {
return value;
}

@Override
public ListGauge newListGauge(String group, ListGauge listGauge) {
listGauges.putIfAbsent(group, new ArrayList());
ListGauge value = new ListGauge(group, 1000);
listGauges.get(group).add(value);
return value;
}

@Override
public Timer newTimer(String group, String name) {
return null;
Expand Down
38 changes: 21 additions & 17 deletions samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.metrics;

Expand Down Expand Up @@ -48,6 +48,10 @@ public <T> Gauge<T> newGauge(String name, T value) {
return registry.newGauge(groupName, new Gauge<T>((prefix + name).toLowerCase(), value));
}

public <T> ListGauge newListGauge(String name, int nItems) {
return registry.newListGauge(groupName, new ListGauge(name, nItems));
}

/*
* Specify a dynamic gauge that always returns the latest value when polled.
* The value closure/object must be thread safe, since metrics reporters may access
Expand Down
Loading