Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ public class KafkaExtractionNamespace implements ExtractionNamespace

@JsonCreator
public KafkaExtractionNamespace(
@NotNull @JsonProperty(value = "kafkaTopic", required = true)
final String kafkaTopic,
@NotNull @JsonProperty(value = "namespace", required = true)
final String namespace
@NotNull @JsonProperty(value = "kafkaTopic", required = true) final String kafkaTopic,
@NotNull @JsonProperty(value = "namespace", required = true) final String namespace
)
{
Preconditions.checkNotNull(kafkaTopic, "kafkaTopic required");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.CreateMode;
import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -76,20 +75,20 @@
public class TestKafkaExtractionCluster
{
private static final Logger log = new Logger(TestKafkaExtractionCluster.class);
private static KafkaServer kafkaServer;
private static Properties kafkaProperties = new Properties();
private static KafkaConfig kafkaConfig;
private static final String topicName = "testTopic";
private static final String namespace = "testNamespace";
private static TestingServer zkTestServer;
private static KafkaExtractionManager renameManager;

private static final Lifecycle lifecycle = new Lifecycle();
private static NamespaceExtractionCacheManager extractionCacheManager;
private static ZkClient zkClient = null;
private static File tmpDir = Files.createTempDir();
private static Injector injector;
private static final File tmpDir = Files.createTempDir();
private static final String topicName = "testTopic";
private static final String namespace = "testNamespace";
private static final Properties kafkaProperties = new Properties();

private KafkaServer kafkaServer;
private KafkaConfig kafkaConfig;
private TestingServer zkTestServer;
private ZkClient zkClient;
private KafkaExtractionManager renameManager;
private NamespaceExtractionCacheManager extractionCacheManager;
private Injector injector;

public static class KafkaFactoryProvider implements Provider<ExtractionNamespaceFunctionFactory<?>>
{
Expand All @@ -110,10 +109,12 @@ public ExtractionNamespaceFunctionFactory<?> get()
}
}

@BeforeClass
public static void setupStatic() throws Exception
@Before
public void setUp() throws Exception
{
zkTestServer = new TestingServer(-1, new File(tmpDir.getAbsolutePath() + "/zk"), true);
zkTestServer.start();

zkClient = new ZkClient(
zkTestServer.getConnectString(),
10000,
Expand Down Expand Up @@ -142,38 +143,41 @@ public static void setupStatic() throws Exception

final long time = DateTime.parse("2015-01-01").getMillis();
kafkaServer = new KafkaServer(
kafkaConfig, new Time()
{

@Override
public long milliseconds()
{
return time;
}

@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
}

@Override
public void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
kafkaConfig,
new Time()
{

@Override
public long milliseconds()
{
return time;
}

@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
}

@Override
public void sleep(long ms)
{
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
}
}
);
kafkaServer.startup();

int sleepCount = 0;

while (!kafkaServer.kafkaController().isActive()) {
Thread.sleep(10);
if (++sleepCount > 100) {
Thread.sleep(100);
if (++sleepCount > 10) {
throw new InterruptedException("Controller took to long to awaken");
}
}
Expand All @@ -184,6 +188,7 @@ public void sleep(long ms)
zkTestServer.getConnectString() + "/kafka", 10000, 10000,
ZKStringSerializer$.MODULE$
);

try {
final Properties topicProperties = new Properties();
topicProperties.put("cleanup.policy", "compact");
Expand All @@ -198,11 +203,13 @@ public void sleep(long ms)
finally {
zkClient.close();
}

final Properties kafkaProducerProperties = makeProducerProperties();
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(new ProducerConfig(kafkaProducerProperties));
Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));

try {
producer.send(
new KeyedMessage<byte[], byte[]>(
new KeyedMessage<>(
topicName,
StringUtils.toUtf8("abcdefg"),
StringUtils.toUtf8("abcdefg")
Expand All @@ -221,7 +228,8 @@ public void sleep(long ms)
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of()
), ImmutableList.of(
),
ImmutableList.<Module>of(
new Module()
{
@Override
Expand All @@ -230,7 +238,8 @@ public void configure(Binder binder)
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
}
}, new NamespacedExtractionModule(),
},
new NamespacedExtractionModule(),
new KafkaExtractionNamespaceModule()
{
@Override
Expand All @@ -255,19 +264,21 @@ public Properties getProperties(
extractionCacheManager.schedule(
new KafkaExtractionNamespace(topicName, namespace)
);

long start = System.currentTimeMillis();
while (renameManager.getBackgroundTaskCount() < 1) {
Thread.sleep(10); // wait for map populator to start up
Thread.sleep(100); // wait for map populator to start up
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("renameManager took too long to start");
}
}
log.info("--------------------------- started rename manager ---------------------------");
}

@AfterClass
public static void closeStatic() throws IOException
@After
public void tearDown() throws Exception
{

lifecycle.stop();
if (null != renameManager) {
renameManager.stop();
Expand Down Expand Up @@ -297,7 +308,7 @@ public static void closeStatic() throws IOException
}
}

private static final Properties makeProducerProperties()
private final Properties makeProducerProperties()
{
final Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.putAll(kafkaProperties);
Expand All @@ -309,55 +320,70 @@ private static final Properties makeProducerProperties()
return kafkaProducerProperties;
}

private static void checkServer()
private void checkServer()
{
if (!kafkaServer.apis().controller().isActive()) {
throw new ISE("server is not active!");
}
}

//@Test(timeout = 5_000)
@Test
@Test(timeout = 60_000L)
public void testSimpleRename() throws InterruptedException
{
final Properties kafkaProducerProperties = makeProducerProperties();
final Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(new ProducerConfig(kafkaProducerProperties));
final Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(kafkaProducerProperties));

try {
checkServer();
final ConcurrentMap<String, Function<String, String>> fnFn = injector.getInstance(Key.get(new TypeLiteral<ConcurrentMap<String, Function<String, String>>>()
{
}, Names.named("namespaceExtractionFunctionCache")));
final ConcurrentMap<String, Function<String, List<String>>> reverseFn = injector.getInstance(Key.get(new TypeLiteral<ConcurrentMap<String, Function<String, List<String>>>>()
{
}, Names.named("namespaceReverseExtractionFunctionCache")));

final ConcurrentMap<String, Function<String, String>> fnFn =
injector.getInstance(
Key.get(
new TypeLiteral<ConcurrentMap<String, Function<String, String>>>()
{
},
Names.named("namespaceExtractionFunctionCache")
)
);

final ConcurrentMap<String, Function<String, List<String>>> reverseFn =
injector.getInstance(
Key.get(
new TypeLiteral<ConcurrentMap<String, Function<String, List<String>>>>()
{
},
Names.named("namespaceReverseExtractionFunctionCache")
)
);

KafkaExtractionNamespace extractionNamespace = new KafkaExtractionNamespace(topicName, namespace);

Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("foo"));
Assert.assertEquals(Collections.EMPTY_LIST, reverseFn.get(extractionNamespace.getNamespace()).apply("foo"));
assertUpdated(null, extractionNamespace.getNamespace(), "foo", fnFn);
assertReverseUpdated(Collections.EMPTY_LIST, extractionNamespace.getNamespace(), "foo", reverseFn);

long events = renameManager.getNumEvents(namespace);

log.info("------------------------- Sending foo bar -------------------------------");
producer.send(new KeyedMessage<byte[], byte[]>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("foo"), StringUtils.toUtf8("bar")));

long start = System.currentTimeMillis();
while (events == renameManager.getNumEvents(namespace)) {
Thread.sleep(10);
Thread.sleep(100);
if (System.currentTimeMillis() > start + 60_000) {
throw new ISE("Took too long to update event");
}
}

log.info("------------------------- Checking foo bar -------------------------------");
Assert.assertEquals("bar", fnFn.get(extractionNamespace.getNamespace()).apply("foo"));
Assert.assertEquals(Arrays.asList("foo"), reverseFn.get(extractionNamespace.getNamespace()).apply("bar"));
Assert.assertEquals(null, fnFn.get(extractionNamespace.getNamespace()).apply("baz"));
assertUpdated("bar", extractionNamespace.getNamespace(), "foo", fnFn);
assertReverseUpdated(Arrays.asList("foo"), extractionNamespace.getNamespace(), "bar", reverseFn);
assertUpdated(null, extractionNamespace.getNamespace(), "baz", fnFn);

checkServer();
events = renameManager.getNumEvents(namespace);

log.info("------------------------- Sending baz bat -------------------------------");
producer.send(new KeyedMessage<byte[], byte[]>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
producer.send(new KeyedMessage<>(topicName, StringUtils.toUtf8("baz"), StringUtils.toUtf8("bat")));
while (events == renameManager.getNumEvents(namespace)) {
Thread.sleep(10);
if (System.currentTimeMillis() > start + 60_000) {
Expand All @@ -373,4 +399,44 @@ public void testSimpleRename() throws InterruptedException
producer.close();
}
}

private void assertUpdated(
String expected,
String namespace,
String key,
ConcurrentMap<String, Function<String, String>> lookup
)
throws InterruptedException
{
final Function<String, String> extractionFn = lookup.get(namespace);

if (expected == null) {
while (extractionFn.apply(key) != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having a timeout here, how about having this never timeout and instead applying a timeout to the test? @Test(timeout = blah). Seems like it would be more robust to have one big timeout than a lot of little timeouts that would be more sensitive to noise.

Thread.sleep(100);
}
} else {
while (!expected.equals(extractionFn.apply(key))) {
Thread.sleep(100);
}
}

Assert.assertEquals("update check", expected, extractionFn.apply(key));
}

private void assertReverseUpdated(
List<String> expected,
String namespace,
String key,
ConcurrentMap<String, Function<String, List<String>>> lookup
)
throws InterruptedException
{
final Function<String, List<String>> extractionFn = lookup.get(namespace);

while (!extractionFn.apply(key).equals(expected)) {
Thread.sleep(100);
}

Assert.assertEquals("update check", expected, extractionFn.apply(key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
@JsonTypeName("jdbc")
public class JDBCExtractionNamespace implements ExtractionNamespace
{

@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public void testSkipOld()
assertUpdated(extractionNamespace.getNamespace(), "foo", "bar");
}

@Test(timeout = 10_000L)
@Test(timeout = 60_000L)
public void testFindNew()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you comment this test out intentionally? If so why?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i forgot to uncomment this

throws NoSuchFieldException, IllegalAccessException, ExecutionException, InterruptedException
{
Expand Down Expand Up @@ -482,6 +482,12 @@ private void assertUpdated(String namespace, String key, String expected) throws
waitForUpdates(1_000L, 2L);

Function<String, String> extractionFn = fnCache.get(namespace);

// rely on test timeout to break out of this loop
while (!extractionFn.apply(key).equals(expected)) {
Thread.sleep(100);
}

Assert.assertEquals(
"update check",
expected,
Expand Down
Loading