Skip to content

Commit 278ecd8

Browse files
committed
Merge branch 'master' into SPARK-29713
2 parents fc5843f + 0a03839 commit 278ecd8

File tree

203 files changed

+7837
-2653
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

203 files changed

+7837
-2653
lines changed

.github/workflows/master.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
run: |
3232
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
3333
export MAVEN_CLI_OPTS="--no-transfer-progress"
34-
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -P${{ matrix.hadoop }} -Phadoop-cloud -Djava.version=${{ matrix.java }} package
34+
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -P${{ matrix.hadoop }} -Phadoop-cloud -Djava.version=${{ matrix.java }} install
3535
3636
3737
lint:
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.util;
19+
20+
public class DateTimeConstants {
21+
22+
public static final int YEARS_PER_DECADE = 10;
23+
public static final int YEARS_PER_CENTURY = 100;
24+
public static final int YEARS_PER_MILLENNIUM = 1000;
25+
26+
public static final byte MONTHS_PER_QUARTER = 3;
27+
public static final int MONTHS_PER_YEAR = 12;
28+
29+
public static final byte DAYS_PER_WEEK = 7;
30+
public static final long DAYS_PER_MONTH = 30L;
31+
32+
public static final long HOURS_PER_DAY = 24L;
33+
34+
public static final long MINUTES_PER_HOUR = 60L;
35+
36+
public static final long SECONDS_PER_MINUTE = 60L;
37+
public static final long SECONDS_PER_HOUR = MINUTES_PER_HOUR * SECONDS_PER_MINUTE;
38+
public static final long SECONDS_PER_DAY = HOURS_PER_DAY * SECONDS_PER_HOUR;
39+
40+
public static final long MILLIS_PER_SECOND = 1000L;
41+
public static final long MILLIS_PER_MINUTE = SECONDS_PER_MINUTE * MILLIS_PER_SECOND;
42+
public static final long MILLIS_PER_HOUR = MINUTES_PER_HOUR * MILLIS_PER_MINUTE;
43+
public static final long MILLIS_PER_DAY = HOURS_PER_DAY * MILLIS_PER_HOUR;
44+
45+
public static final long MICROS_PER_MILLIS = 1000L;
46+
public static final long MICROS_PER_SECOND = MILLIS_PER_SECOND * MICROS_PER_MILLIS;
47+
public static final long MICROS_PER_MINUTE = SECONDS_PER_MINUTE * MICROS_PER_SECOND;
48+
public static final long MICROS_PER_HOUR = MINUTES_PER_HOUR * MICROS_PER_MINUTE;
49+
public static final long MICROS_PER_DAY = HOURS_PER_DAY * MICROS_PER_HOUR;
50+
public static final long MICROS_PER_MONTH = DAYS_PER_MONTH * MICROS_PER_DAY;
51+
/* 365.25 days per year assumes leap year every four years */
52+
public static final long MICROS_PER_YEAR = (36525L * MICROS_PER_DAY) / 100;
53+
54+
public static final long NANOS_PER_MICROS = 1000L;
55+
public static final long NANOS_PER_MILLIS = MICROS_PER_MILLIS * NANOS_PER_MICROS;
56+
public static final long NANOS_PER_SECOND = MILLIS_PER_SECOND * NANOS_PER_MILLIS;
57+
}

common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,27 @@
1919

2020
import java.io.Serializable;
2121
import java.math.BigDecimal;
22+
import java.time.Duration;
23+
import java.time.Period;
24+
import java.time.temporal.ChronoUnit;
2225
import java.util.Objects;
2326

27+
import static org.apache.spark.sql.catalyst.util.DateTimeConstants.*;
28+
2429
/**
2530
* The internal representation of interval type.
2631
*/
2732
public final class CalendarInterval implements Serializable {
28-
public static final long MICROS_PER_MILLI = 1000L;
29-
public static final long MICROS_PER_SECOND = MICROS_PER_MILLI * 1000;
30-
public static final long MICROS_PER_MINUTE = MICROS_PER_SECOND * 60;
31-
public static final long MICROS_PER_HOUR = MICROS_PER_MINUTE * 60;
32-
public static final long MICROS_PER_DAY = MICROS_PER_HOUR * 24;
33-
public static final long MICROS_PER_WEEK = MICROS_PER_DAY * 7;
34-
3533
public final int months;
3634
public final int days;
3735
public final long microseconds;
3836

39-
public long milliseconds() {
40-
return this.microseconds / MICROS_PER_MILLI;
41-
}
42-
4337
public CalendarInterval(int months, int days, long microseconds) {
4438
this.months = months;
4539
this.days = days;
4640
this.microseconds = microseconds;
4741
}
4842

49-
public CalendarInterval add(CalendarInterval that) {
50-
int months = this.months + that.months;
51-
int days = this.days + that.days;
52-
long microseconds = this.microseconds + that.microseconds;
53-
return new CalendarInterval(months, days, microseconds);
54-
}
55-
56-
public CalendarInterval subtract(CalendarInterval that) {
57-
int months = this.months - that.months;
58-
int days = this.days - that.days;
59-
long microseconds = this.microseconds - that.microseconds;
60-
return new CalendarInterval(months, days, microseconds);
61-
}
62-
63-
public CalendarInterval negate() {
64-
return new CalendarInterval(-this.months, -this.days, -this.microseconds);
65-
}
66-
6743
@Override
6844
public boolean equals(Object o) {
6945
if (this == o) return true;
@@ -81,35 +57,53 @@ public int hashCode() {
8157

8258
@Override
8359
public String toString() {
84-
StringBuilder sb = new StringBuilder("interval");
60+
if (months == 0 && days == 0 && microseconds == 0) {
61+
return "0 seconds";
62+
}
63+
64+
StringBuilder sb = new StringBuilder();
8565

8666
if (months != 0) {
87-
appendUnit(sb, months / 12, "year");
88-
appendUnit(sb, months % 12, "month");
67+
appendUnit(sb, months / 12, "years");
68+
appendUnit(sb, months % 12, "months");
8969
}
9070

91-
appendUnit(sb, days, "day");
71+
appendUnit(sb, days, "days");
9272

9373
if (microseconds != 0) {
9474
long rest = microseconds;
95-
appendUnit(sb, rest / MICROS_PER_HOUR, "hour");
75+
appendUnit(sb, rest / MICROS_PER_HOUR, "hours");
9676
rest %= MICROS_PER_HOUR;
97-
appendUnit(sb, rest / MICROS_PER_MINUTE, "minute");
77+
appendUnit(sb, rest / MICROS_PER_MINUTE, "minutes");
9878
rest %= MICROS_PER_MINUTE;
9979
if (rest != 0) {
10080
String s = BigDecimal.valueOf(rest, 6).stripTrailingZeros().toPlainString();
101-
sb.append(' ').append(s).append(" seconds");
81+
sb.append(s).append(" seconds ");
10282
}
103-
} else if (months == 0 && days == 0) {
104-
sb.append(" 0 microseconds");
10583
}
10684

85+
sb.setLength(sb.length() - 1);
10786
return sb.toString();
10887
}
10988

11089
private void appendUnit(StringBuilder sb, long value, String unit) {
11190
if (value != 0) {
112-
sb.append(' ').append(value).append(' ').append(unit).append('s');
91+
sb.append(value).append(' ').append(unit).append(' ');
11392
}
11493
}
94+
95+
/**
96+
* Extracts the date part of the interval.
97+
* @return an instance of {@code java.time.Period} based on the months and days fields
98+
* of the given interval, not null.
99+
*/
100+
public Period extractAsPeriod() { return Period.of(0, months, days); }
101+
102+
/**
103+
* Extracts the time part of the interval.
104+
* @return an instance of {@code java.time.Duration} based on the microseconds field
105+
* of the given interval, not null.
106+
* @throws ArithmeticException if a numeric overflow occurs
107+
*/
108+
public Duration extractAsDuration() { return Duration.of(microseconds, ChronoUnit.MICROS); }
115109
}

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ private byte getByte(int i) {
370370
return Platform.getByte(base, offset + i);
371371
}
372372

373-
private boolean matchAt(final UTF8String s, int pos) {
373+
public boolean matchAt(final UTF8String s, int pos) {
374374
if (s.numBytes + pos > numBytes || pos < 0) {
375375
return false;
376376
}

common/unsafe/src/test/java/org/apache/spark/unsafe/types/CalendarIntervalSuite.java

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import org.junit.Test;
2121

22+
import java.time.Duration;
23+
import java.time.Period;
24+
2225
import static org.junit.Assert.*;
23-
import static org.apache.spark.unsafe.types.CalendarInterval.*;
26+
import static org.apache.spark.sql.catalyst.util.DateTimeConstants.*;
2427

2528
public class CalendarIntervalSuite {
2629

@@ -48,50 +51,35 @@ public void toStringTest() {
4851
CalendarInterval i;
4952

5053
i = new CalendarInterval(0, 0, 0);
51-
assertEquals("interval 0 microseconds", i.toString());
54+
assertEquals("0 seconds", i.toString());
5255

5356
i = new CalendarInterval(34, 0, 0);
54-
assertEquals("interval 2 years 10 months", i.toString());
57+
assertEquals("2 years 10 months", i.toString());
5558

5659
i = new CalendarInterval(-34, 0, 0);
57-
assertEquals("interval -2 years -10 months", i.toString());
60+
assertEquals("-2 years -10 months", i.toString());
5861

5962
i = new CalendarInterval(0, 31, 0);
60-
assertEquals("interval 31 days", i.toString());
63+
assertEquals("31 days", i.toString());
6164

6265
i = new CalendarInterval(0, -31, 0);
63-
assertEquals("interval -31 days", i.toString());
66+
assertEquals("-31 days", i.toString());
6467

6568
i = new CalendarInterval(0, 0, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
66-
assertEquals("interval 3 hours 13 minutes 0.000123 seconds", i.toString());
69+
assertEquals("3 hours 13 minutes 0.000123 seconds", i.toString());
6770

6871
i = new CalendarInterval(0, 0, -3 * MICROS_PER_HOUR - 13 * MICROS_PER_MINUTE - 123);
69-
assertEquals("interval -3 hours -13 minutes -0.000123 seconds", i.toString());
72+
assertEquals("-3 hours -13 minutes -0.000123 seconds", i.toString());
7073

7174
i = new CalendarInterval(34, 31, 3 * MICROS_PER_HOUR + 13 * MICROS_PER_MINUTE + 123);
72-
assertEquals("interval 2 years 10 months 31 days 3 hours 13 minutes 0.000123 seconds",
75+
assertEquals("2 years 10 months 31 days 3 hours 13 minutes 0.000123 seconds",
7376
i.toString());
7477
}
7578

7679
@Test
77-
public void addTest() {
78-
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
79-
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
80-
assertEquals(new CalendarInterval(5, 5, 101 * MICROS_PER_HOUR), input1.add(input2));
81-
82-
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
83-
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
84-
assertEquals(new CalendarInterval(65, 120, 119 * MICROS_PER_HOUR), input1.add(input2));
85-
}
86-
87-
@Test
88-
public void subtractTest() {
89-
CalendarInterval input1 = new CalendarInterval(3, 1, 1 * MICROS_PER_HOUR);
90-
CalendarInterval input2 = new CalendarInterval(2, 4, 100 * MICROS_PER_HOUR);
91-
assertEquals(new CalendarInterval(1, -3, -99 * MICROS_PER_HOUR), input1.subtract(input2));
92-
93-
input1 = new CalendarInterval(-10, -30, -81 * MICROS_PER_HOUR);
94-
input2 = new CalendarInterval(75, 150, 200 * MICROS_PER_HOUR);
95-
assertEquals(new CalendarInterval(-85, -180, -281 * MICROS_PER_HOUR), input1.subtract(input2));
80+
public void periodAndDurationTest() {
81+
CalendarInterval interval = new CalendarInterval(120, -40, 123456);
82+
assertEquals(Period.of(0, 120, -40), interval.extractAsPeriod());
83+
assertEquals(Duration.ofNanos(123456000), interval.extractAsDuration());
9684
}
9785
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.plugin;
19+
20+
import java.util.Collections;
21+
import java.util.Map;
22+
23+
import org.apache.spark.SparkContext;
24+
import org.apache.spark.annotation.DeveloperApi;
25+
26+
/**
27+
* :: DeveloperApi ::
28+
* Driver component of a {@link SparkPlugin}.
29+
*
30+
* @since 3.0.0
31+
*/
32+
@DeveloperApi
33+
public interface DriverPlugin {
34+
35+
/**
36+
* Initialize the plugin.
37+
* <p>
38+
* This method is called early in the initialization of the Spark driver. Explicitly, it is
39+
* called before the Spark driver's task scheduler is initialized. This means that a lot
40+
* of other Spark subsystems may yet not have been initialized. This call also blocks driver
41+
* initialization.
42+
* <p>
43+
* It's recommended that plugins be careful about what operations are performed in this call,
44+
* preferrably performing expensive operations in a separate thread, or postponing them until
45+
* the application has fully started.
46+
*
47+
* @param sc The SparkContext loading the plugin.
48+
* @param pluginContext Additional plugin-specific about the Spark application where the plugin
49+
* is running.
50+
* @return A map that will be provided to the {@link ExecutorPlugin#init(PluginContext,Map)}
51+
* method.
52+
*/
53+
default Map<String, String> init(SparkContext sc, PluginContext pluginContext) {
54+
return Collections.emptyMap();
55+
}
56+
57+
/**
58+
* Register metrics published by the plugin with Spark's metrics system.
59+
* <p>
60+
* This method is called later in the initialization of the Spark application, after most
61+
* subsystems are up and the application ID is known. If there are metrics registered in
62+
* the registry ({@link PluginContext#metricRegistry()}), then a metrics source with the
63+
* plugin name will be created.
64+
* <p>
65+
* Note that even though the metric registry is still accessible after this method is called,
66+
* registering new metrics after this method is called may result in the metrics not being
67+
* available.
68+
*
69+
* @param appId The application ID from the cluster manager.
70+
* @param pluginContext Additional plugin-specific about the Spark application where the plugin
71+
* is running.
72+
*/
73+
default void registerMetrics(String appId, PluginContext pluginContext) {}
74+
75+
/**
76+
* RPC message handler.
77+
* <p>
78+
* Plugins can use Spark's RPC system to send messages from executors to the driver (but not
79+
* the other way around, currently). Messages sent by the executor component of the plugin will
80+
* be delivered to this method, and the returned value will be sent back to the executor as
81+
* the reply, if the executor has requested one.
82+
* <p>
83+
* Any exception thrown will be sent back to the executor as an error, in case it is expecting
84+
* a reply. In case a reply is not expected, a log message will be written to the driver log.
85+
* <p>
86+
* The implementation of this handler should be thread-safe.
87+
* <p>
88+
* Note all plugins share RPC dispatch threads, and this method is called synchronously. So
89+
* performing expensive operations in this handler may affect the operation of other active
90+
* plugins. Internal Spark endpoints are not directly affected, though, since they use different
91+
* threads.
92+
* <p>
93+
* Spark guarantees that the driver component will be ready to receive messages through this
94+
* handler when executors are started.
95+
*
96+
* @param message The incoming message.
97+
* @return Value to be returned to the caller. Ignored if the caller does not expect a reply.
98+
*/
99+
default Object receive(Object message) throws Exception {
100+
throw new UnsupportedOperationException();
101+
}
102+
103+
/**
104+
* Informs the plugin that the Spark application is shutting down.
105+
* <p>
106+
* This method is called during the driver shutdown phase. It is recommended that plugins
107+
* not use any Spark functions (e.g. send RPC messages) during this call.
108+
*/
109+
default void shutdown() {}
110+
111+
}

0 commit comments

Comments
 (0)