Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,7 @@ private static final class OutstandingBatch {
this.batchSizeBytes = batchSizeBytes;
}

public int getAttempt() {
return attempt;
}

public int size() {
int size() {
return outstandingPublishes.size();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private Subscriber(Builder builder) {
closeables.add(
new AutoCloseable() {
@Override
public void close() throws IOException {
public void close() {
alarmsExecutor.shutdown();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static void setupClass() throws Exception {
}

@AfterClass
public static void tearDownClass() throws Exception {
public static void tearDownClass() {
topicAdminClient.close();
subscriptionAdminClient.close();
}
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testTopicPolicy() {
}

@Test
public void testVPCPushSubscriber() throws Exception {
public void testVPCPushSubscriber() {
assumeTrue(IS_VPC_TEST);
ProjectTopicName topicName =
ProjectTopicName.of(projectId, formatForTest("testing-vpc-push-subscriber-topic"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,6 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
PendingCallableType.FIXED_DELAY));
}

/**
* This allows for adding expectations on future work to be scheduled ( {@link
* FakeScheduledExecutorService#schedule} or {@link
* FakeScheduledExecutorService#scheduleAtFixedRate} or {@link
* FakeScheduledExecutorService#scheduleWithFixedDelay}) based on its delay.
*/
public void setupScheduleExpectation(Duration delay) {
synchronized (expectedWorkQueue) {
expectedWorkQueue.add(delay);
}
}

/**
* Blocks the current thread until all the work {@link
* FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been scheduled in
* the executor.
*/
public void waitForExpectedWork() {
synchronized (expectedWorkQueue) {
while (!expectedWorkQueue.isEmpty()) {
try {
expectedWorkQueue.wait();
} catch (InterruptedException e) {
// Wait uninterruptibly
}
}
}
}

/**
* This will advance the reference time of the executor and execute (in the same thread) any
* outstanding callable which execution time has passed.
Expand Down Expand Up @@ -232,7 +203,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
return callable.getScheduledFuture();
}

static enum PendingCallableType {
enum PendingCallableType {
NORMAL,
FIXED_RATE,
FIXED_DELAY
Expand All @@ -252,7 +223,7 @@ class PendingCallable<T> implements Comparable<PendingCallable<T>> {
pendingCallable =
new Callable<T>() {
@Override
public T call() throws Exception {
public T call() {
runnable.run();
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package com.google.cloud.pubsub.v1;

import com.google.api.client.util.Preconditions;
import com.google.api.core.InternalApi;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
Expand Down Expand Up @@ -61,7 +59,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase {
private final BlockingQueue<PullResponse> pullResponses = new LinkedBlockingDeque<>();
private int currentStream;

public static enum CloseSide {
public enum CloseSide {
SERVER,
CLIENT
}
Expand All @@ -76,10 +74,6 @@ public ModifyAckDeadline(String ackId, long seconds) {
this.seconds = seconds;
}

public String getAckId() {
return ackId;
}

public long getSeconds() {
return seconds;
}
Expand Down Expand Up @@ -207,23 +201,6 @@ public StreamObserver<StreamingPullRequest> streamingPull(
return stream.requestObserver;
}

public void sendStreamingResponse(StreamingPullResponse pullResponse)
throws InterruptedException {
waitForRegistedSubscription();
synchronized (openedStreams) {
waitForOpenedStreams(1);
openedStreams.get(getAndAdvanceCurrentStream()).responseObserver.onNext(pullResponse);
}
}

public void setMessageAckDeadlineSeconds(int ackDeadline) {
messageAckDeadline.set(ackDeadline);
}

public void enqueuePullResponse(PullResponse response) {
pullResponses.add(response);
}

@Override
public void getSubscription(
GetSubscriptionRequest request, StreamObserver<Subscription> responseObserver) {
Expand All @@ -237,12 +214,6 @@ public void getSubscription(
responseObserver.onCompleted();
}

/** Returns the number of times getSubscription is called. */
@InternalApi
int getSubscriptionCalledCount() {
return getSubscriptionCalled.get();
}

@Override
public void pull(PullRequest request, StreamObserver<PullResponse> responseObserver) {
synchronized (receivedPullRequest) {
Expand Down Expand Up @@ -293,26 +264,6 @@ public String waitForRegistedSubscription() throws InterruptedException {
return subscription;
}

public List<String> waitAndConsumeReceivedAcks(int expectedCount) throws InterruptedException {
synchronized (acks) {
waitAtLeast(acks, expectedCount);
List<String> receivedAcksCopy = ImmutableList.copyOf(acks.subList(0, expectedCount));
acks.subList(0, expectedCount).clear();
return receivedAcksCopy;
}
}

public List<ModifyAckDeadline> waitAndConsumeModifyAckDeadlines(int expectedCount)
throws InterruptedException {
synchronized (modAckDeadlines) {
waitAtLeast(modAckDeadlines, expectedCount);
List<ModifyAckDeadline> modAckDeadlinesCopy =
ImmutableList.copyOf(modAckDeadlines.subList(0, expectedCount));
modAckDeadlines.subList(0, expectedCount).clear();
return modAckDeadlinesCopy;
}
}

public int waitForClosedStreams(int expectedCount) throws InterruptedException {
synchronized (closedStreams) {
waitAtLeast(closedStreams, expectedCount);
Expand Down Expand Up @@ -341,50 +292,6 @@ private static void waitAtLeast(Collection<?> collection, int target)
}
}

public void waitForStreamAckDeadline(int expectedValue) throws InterruptedException {
synchronized (messageAckDeadline) {
while (messageAckDeadline.get() != expectedValue) {
messageAckDeadline.wait();
}
}
}

public int getOpenedStreamsCount() {
return openedStreams.size();
}

public int getClosedStreamsCount() {
return closedStreams.size();
}

public List<String> getAcks() {
return acks;
}

public List<ModifyAckDeadline> getModifyAckDeadlines() {
return modAckDeadlines;
}

public void reset() {
synchronized (subscriptionInitialized) {
synchronized (openedStreams) {
synchronized (acks) {
synchronized (modAckDeadlines) {
openedStreams.clear();
closedStreams.clear();
acks.clear();
modAckDeadlines.clear();
subscriptionInitialized.set(false);
subscription = "";
pullResponses.clear();
receivedPullRequest.clear();
currentStream = 0;
}
}
}
}
}

private void addOpenedStream(Stream stream) {
synchronized (openedStreams) {
openedStreams.add(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void sendAckOperations(
}

@Test
public void testReceipt() throws Exception {
public void testReceipt() {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.processOutstandingAckOperations();
assertThat(sentModAcks)
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testExtension() throws Exception {
}

@Test
public void testExtension_Close() throws Exception {
public void testExtension_Close() {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.extendDeadlines();
assertThat(sentModAcks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class OpenCensusUtilTest {
// Verifies that trace contexts propagated as an attribute are set as the parent link in the
// message receiver and that the tag context is not change (for now).
@Test
public void testOpenCensusMessageReceiver() throws Exception {
public void testOpenCensusMessageReceiver() {
PubsubMessage message;
SpanContext publisherContext;
try (Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME);
Expand Down Expand Up @@ -143,7 +143,7 @@ private void assertSpanCount(int expected) {
Stopwatch watch = Stopwatch.createStarted();
while (true) {
Collection<SpanData> spanDatas = runningSpanStore.getRunningSpans(RECEIVER_FILTER);
if (spanDatas.size() == 1) {
if (spanDatas.size() == expected) {
break;
}
Thread.yield();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class PublisherImplTest {

private Server testServer;

class FakeException extends Exception {}

@Before
public void setUp() throws Exception {
testPublisherServiceImpl = new FakePublisherServiceImpl();
Expand Down Expand Up @@ -454,7 +452,7 @@ public void testBuilderInvalidArguments() {
builder.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setRequestByteThreshold((Long) null)
.setRequestByteThreshold(null)
.build());
fail("Should have thrown an NullPointerException");
} catch (NullPointerException expected) {
Expand Down Expand Up @@ -513,7 +511,7 @@ public void testBuilderInvalidArguments() {
builder.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold((Long) null)
.setElementCountThreshold(null)
.build());
fail("Should have thrown an NullPointerException");
} catch (NullPointerException expected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
}
}

private Subscriber startSubscriber(Builder testSubscriberBuilder) throws Exception {
private Subscriber startSubscriber(Builder testSubscriberBuilder) {
Subscriber subscriber = testSubscriberBuilder.build();
subscriber.startAsync().awaitRunning();
return subscriber;
Expand Down