Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions docs/dev/table/streaming/time_attributes.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ Moreover, the `DataStream` returned by the `getDataStream()` method must have wa
<div data-lang="java" markdown="1">
{% highlight java %}
// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

@Override
public TypeInformation<Row> getReturnType() {
Expand All @@ -284,9 +284,15 @@ public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeA
}

@Override
public String getRowtimeAttribute() {
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
// Mark the "UserActionTime" attribute as event-time attribute.
return "UserActionTime";
// here we create one attribute descriptor of "UserActionTime"
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"UserActionTime",
new ExistingField("UserActionTime"),
new AscendingTimestamps());
List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
}

Expand All @@ -301,7 +307,7 @@ WindowedTable windowedTable = tEnv
<div data-lang="scala" markdown="1">
{% highlight scala %}
// define a table source with a rowtime attribute
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

override def getReturnType = {
val names = Array[String]("Username" , "Data", "UserActionTime")
Expand All @@ -317,9 +323,15 @@ class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribu
stream
}

override def getRowtimeAttribute = {
override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
// Mark the "UserActionTime" attribute as event-time attribute.
"UserActionTime"
// here we create one attribute descriptor of "UserActionTime"
val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"UserActionTime",
new ExistingField("UserActionTime"),
new AscendingTimestamps)
val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
listRowtimeAttrDescr
}
}

Expand Down