diff --git a/pom.xml b/pom.xml index cd3c1c5..4742d25 100644 --- a/pom.xml +++ b/pom.xml @@ -3,14 +3,12 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - io.cdap.plugin github-plugin GitHub plugin 1.0.0-SNAPSHOT A collection of gitHub connectors and plugins https://github.com/data-integrations/github - The Apache Software License, Version 2.0 @@ -19,7 +17,6 @@ A business-friendly OSS license - CDAP @@ -28,7 +25,6 @@ http://www.cdap.io - sonatype @@ -43,11 +39,9 @@ https://repo.eclipse.org/content/groups/releases - https://issues.cask.co/browse/CDAP - UTF-8 @@ -64,9 +58,64 @@ 3.11.1 3.9.0 1.32.1 + 2.32.0 - + + + + org.apache.beam + beam-sdks-java-bom + ${beam.version} + pom + import + + + + + + org.apache.beam + beam-sdks-java-core + + + org.apache.beam + beam-sdks-java-io-google-cloud-platform + + + junit + junit + + + + + org.apache.beam + beam-sdks-java-io-jdbc + + + org.apache.beam + beam-sdks-java-extensions-google-cloud-platform-core + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + + + org.apache.beam + beam-sdks-java-io-hadoop-file-system + + + org.apache.beam + beam-sdks-java-io-hadoop-common + + + org.apache.beam + beam-sdks-java-io-hadoop-format + + + + org.apache.beam + beam-runners-direct-java + io.cdap.cdap cdap-etl-api @@ -195,7 +244,6 @@ google-http-client-gson ${google-http-client-gson.version} - io.cdap.cdap @@ -233,18 +281,18 @@ ${cdap.version} test - - com.fasterxml.jackson.core - jackson-core - 2.9.8 - test - - - com.fasterxml.jackson.core - jackson-annotations - 2.9.0 - test - + + + + + + + + + + + + com.google.inject guice @@ -252,7 +300,6 @@ test - @@ -363,5 +410,4 @@ - - + \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/github/source/BeamAdapter.java b/src/main/java/io/cdap/plugin/github/source/BeamAdapter.java new file mode 100644 index 0000000..0c2c42e --- /dev/null +++ b/src/main/java/io/cdap/plugin/github/source/BeamAdapter.java @@ -0,0 +1,74 @@ +package io.cdap.plugin.github.source; + +import com.google.gson.Gson; +import io.cdap.plugin.github.source.batch.GithubBatchSourceConfig; +import io.cdap.plugin.github.source.batch.GithubFormatProvider; +import io.cdap.plugin.github.source.batch.GithubInputFormat; +import io.cdap.plugin.github.source.common.model.impl.Commit; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.hadoop.WritableCoder; +import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; + +public class BeamAdapter { + + + public static void main(String[] args) { + Gson gson = new Gson(); + Configuration myHadoopConfiguration = new Configuration(false); + + String authorizationToken = System.getenv("GITHUB_PAT"); + String repoOwner = "ktttnv"; + String repoName = "react-quiz"; + String datasetName = "Commits"; + String hostname = "https://api.github.com"; + + GithubBatchSourceConfig githubBatchSourceConfig = gson.fromJson( + String.format("{\"authorizationToken\":\"%s\",\"repoOwner\":\"%s\",\"repoName\":\"%s\",\"datasetName\":\"%s\",\"hostname\":\"%s\"}", + authorizationToken, repoOwner, repoName, datasetName, hostname), + GithubBatchSourceConfig.class); + GithubFormatProvider githubFormatProvider = new GithubFormatProvider(githubBatchSourceConfig); + myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", GithubInputFormat.class, + InputFormat.class); + myHadoopConfiguration.setClass("key.class", Text.class, Object.class); + myHadoopConfiguration.setClass("value.class", Commit.class, Object.class); + myHadoopConfiguration.set(GithubFormatProvider.PROPERTY_CONFIG_JSON, + githubFormatProvider.getInputFormatConfiguration() + .get(GithubFormatProvider.PROPERTY_CONFIG_JSON)); + + Pipeline p = Pipeline.create(); + + // Read data only with Hadoop configuration. + PCollection> pcol = p.apply("read", + HadoopFormatIO.read() + .withConfiguration(myHadoopConfiguration) + ).setCoder( + KvCoder.of(NullableCoder.of(WritableCoder.of(Text.class)), SerializableCoder.of(Commit.class))); + + PCollection strings = pcol.apply(MapElements + .into(TypeDescriptors.strings()) + .via( + ((SerializableFunction, String>) input -> { + Gson gson1 = new Gson(); + return gson1.toJson(input.getValue()); + }) + ) + ) + .setCoder(StringUtf8Coder.of()); + + strings.apply(TextIO.write().to("./txt.txt")); + p.run(); + } +} \ No newline at end of file diff --git a/src/main/java/io/cdap/plugin/github/source/batch/GithubRecordReader.java b/src/main/java/io/cdap/plugin/github/source/batch/GithubRecordReader.java index 0d14ca7..595edd2 100644 --- a/src/main/java/io/cdap/plugin/github/source/batch/GithubRecordReader.java +++ b/src/main/java/io/cdap/plugin/github/source/batch/GithubRecordReader.java @@ -21,6 +21,7 @@ import io.cdap.plugin.github.source.common.GitHubRequestFactory; import io.cdap.plugin.github.source.common.model.GitHubModel; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -35,7 +36,7 @@ /** * RecordReader implementation, which reads {@link GitHubModel} instances from GitHub repository API for github. */ -public class GithubRecordReader extends RecordReader { +public class GithubRecordReader extends RecordReader { private final GithubBatchSourceConfig config; private final String link; @@ -54,7 +55,7 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont HttpRequest httpRequest = GitHubRequestFactory.buildRequest(link, config.getAuthorizationToken()); HttpResponse response = httpRequest.execute(); Class datasetClass = (Class) - Array.newInstance(config.getDatasetClass(), 0).getClass(); + Array.newInstance(config.getDatasetClass(), 0).getClass(); currentPage = Arrays.stream(response.parseAs(datasetClass)).iterator(); } @@ -69,8 +70,8 @@ public boolean nextKeyValue() { } @Override - public NullWritable getCurrentKey() { - return null; + public Text getCurrentKey() { + return new Text(""); } @Override diff --git a/src/main/java/io/cdap/plugin/github/source/common/model/GitHubModel.java b/src/main/java/io/cdap/plugin/github/source/common/model/GitHubModel.java index ddba45b..48fd203 100644 --- a/src/main/java/io/cdap/plugin/github/source/common/model/GitHubModel.java +++ b/src/main/java/io/cdap/plugin/github/source/common/model/GitHubModel.java @@ -15,8 +15,10 @@ */ package io.cdap.plugin.github.source.common.model; +import java.io.Serializable; + /** * Generic GitHub model interface for github. */ -public interface GitHubModel { +public interface GitHubModel extends Serializable { } diff --git a/src/main/java/io/cdap/plugin/github/source/common/model/impl/Commit.java b/src/main/java/io/cdap/plugin/github/source/common/model/impl/Commit.java index b82373b..5f443f7 100644 --- a/src/main/java/io/cdap/plugin/github/source/common/model/impl/Commit.java +++ b/src/main/java/io/cdap/plugin/github/source/common/model/impl/Commit.java @@ -19,6 +19,7 @@ import io.cdap.plugin.github.source.common.model.GitHubModel; import io.cdap.plugin.github.source.common.model.impl.user.User; +import java.io.Serializable; import java.util.List; /** @@ -48,7 +49,7 @@ public class Commit implements GitHubModel { /** * Commit.CommitData model */ - public static class CommitData { + public static class CommitData implements Serializable { @Key private String url; @Key @@ -67,7 +68,7 @@ public static class CommitData { /** * Commit.CommitData.CommitUser model */ - public static class CommitUser { + public static class CommitUser implements Serializable { @Key private String name; @Key @@ -79,7 +80,7 @@ public static class CommitUser { /** * Commit.CommitData.Tree model */ - public static class Tree { + public static class Tree implements Serializable { @Key private String url; @Key @@ -89,7 +90,7 @@ public static class Tree { /** * Commit.CommitData.Verification model */ - public static class Verification { + public static class Verification implements Serializable { @Key private Boolean verified; @Key