-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathHdfsUtils.java
More file actions
252 lines (224 loc) · 7.99 KB
/
HdfsUtils.java
File metadata and controls
252 lines (224 loc) · 7.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import com.huawei.common.constants.PathCons;
import com.huawei.common.exception.ParameterException;
import com.huawei.security.LoginUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
public class HdfsUtils {
private static HdfsUtils hdfsUtils;
private final static Log LOG = LogFactory.getLog(HdfsUtils.class.getName());
private static FileSystem fSystem; /* HDFS file system */
private static Configuration conf;
private static String PRNCIPAL_NAME;
private static String PATH_TO_KEYTAB = PathCons.PATH_TO_KEYTAB;
private static String PATH_TO_KRB5_CONF = PathCons.PATH_TO_KRB5_CONF;
private static String PATH_TO_HDFS_SITE_XML = PathCons.PATH_TO_HDFS_SITE_XML;
private static String PATH_TO_CORE_SITE_XML = PathCons.PATH_TO_CORE_SITE_XML;
private HdfsUtils() {
}
/**
* 获取HDFS单例对象
* @param keberosUser keberos认证用户名
* @return
*/
public static HdfsUtils getInstance(String keberosUser) throws IOException {
PRNCIPAL_NAME = keberosUser;
if (hdfsUtils == null) {
init();
return new HdfsUtils();
}
return hdfsUtils;
}
/**
* 加载conf文件
* 认证kerberos
* 获取FileSystem实例
*/
private static void init() throws IOException {
confLoad();
authentication();
instanceBuild();
}
/**
* 加载配置文件如core-site.xml,hdfs-site.xml
*/
private static void confLoad() throws IOException {
conf = new Configuration();
// conf file
conf.addResource(new Path(PATH_TO_HDFS_SITE_XML));
conf.addResource(new Path(PATH_TO_CORE_SITE_XML));
}
/**
* kerberos认证
*/
private static void authentication() throws IOException {
// security mode
LOG.info(conf.get("hadoop.security.authentication"));
if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) {
System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF);
LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf);
}
}
/**
* 构建FileSystem实例
*/
private static void instanceBuild() throws IOException {
// get filesystem
fSystem = FileSystem.get(conf);
}
/**
* 在HDFS上创建目录
* @param filePath
* @throws IOException
*/
public void mkdir(String filePath) throws IOException {
Path destPath = new Path(filePath);
if (fSystem.exists(destPath)) {
LOG.error("failed to create destPath the path is already existed: " + filePath);
} else {
fSystem.mkdirs(destPath);
LOG.info("success to create destPath " + filePath);
}
}
/**
* 往HDFS上写数据
* @param content 内容
* @param destPath 目标目录
* @param fileName 文件名(eg test.txt)
* @throws IOException
*/
public void write(final String content, String destPath, String fileName) throws IOException, ParameterException {
InputStream in = (InputStream) new ByteArrayInputStream(content.getBytes());
try {
HdfsWriter writer = new HdfsWriter(fSystem, destPath + File.separator + fileName);
writer.doWrite(in);
LOG.info("success to write.");
} finally {
in.close();
}
}
/**
* 往指定目录下指定文件追加内容
* @param content 追加内容
* @param destPath 目标目录
* @param fileName 文件名(eg test.txt)
* @throws Exception
*/
public void append(final String content, String destPath, String fileName) throws Exception {
//校验路径是否存在
if (!fSystem.exists(new Path(destPath+"/"+fileName))) {
LOG.error("the Path or File doesn't exists " + destPath+"/"+fileName);
} else {
InputStream in = (InputStream) new ByteArrayInputStream(content.getBytes());
try {
HdfsWriter writer = new HdfsWriter(fSystem, destPath + File.separator + fileName);
writer.doAppend(in);
LOG.info("success to append.");
} finally {
close(in);
}
}
}
/**
* 读取HDFS文件
* @param destPath HDFS目录
* @param fileName 文件名(eg test.txt)
* @return
* @throws IOException
*/
public String read(String destPath, String fileName) throws IOException {
if (!fSystem.exists(new Path(destPath+"/"+fileName))) {
LOG.error("the Path or File doesn't exists " + destPath+"/"+fileName);
} else {
String strPath = destPath + File.separator + fileName;
Path path = new Path(strPath);
FSDataInputStream in = null;
BufferedReader reader = null;
StringBuffer strBuffer = new StringBuffer();
try {
in = fSystem.open(path);
reader = new BufferedReader(new InputStreamReader(in));
String sTempOneLine;
while ((sTempOneLine = reader.readLine()) != null) {
strBuffer.append(sTempOneLine + "\n");
}
LOG.info("success to read ");
return strBuffer.toString();
} finally {
close(reader);
close(in);
}
}
return null;
}
/**
* 删除文件
* @param destPath 目标目录
* @param fileName 文件名
* @throws IOException
*/
public void deleteFile(String destPath, String fileName) throws IOException {
Path beDeletedPath = new Path(destPath + File.separator + fileName);
if (fSystem.delete(beDeletedPath, true)) {
LOG.info("success to delete the file " + destPath + File.separator + fileName);
} else {
LOG.error("failed to delete the file, the file doesn't exists" + destPath + File.separator + fileName);
}
}
/**
* 删除HDFS目录及目录下文件
* @param destPath 目标目录
* @throws IOException
*/
public void rmdir(String destPath) throws IOException {
Path path = new Path(destPath);
if (fSystem.exists(path)) {
fSystem.delete(path, true);
LOG.info("success to delete path " + destPath);
} else {
LOG.error("failed to delete destPath, the directory doesn't exists " + destPath);
}
}
/**
* 上传本地单个本地文件或文件夹到HDFS
* @param localPath 本地文件或者文件目录
* @param destPath HDFS中的目录,或者文件名
*/
public void putFile(String localPath, String destPath) {
try {
// 从本地将文件拷贝到HDFS中,如果目标文件已存在则进行覆盖
fSystem.copyFromLocalFile(new Path(localPath), new Path(destPath));
LOG.info("file has been uploaded to HDFS");
} catch (IOException e) {
LOG.error("uploading failed");
e.printStackTrace();
}
}
/**
* 下载HDFS单个本地文件或文件夹到本地
*
* @param destPath HDFS中文件或目录
* @param localPath 本地目录
*/
public void getFile(String destPath, String localPath) {
try {
if (!fSystem.exists(new Path(destPath))) {
LOG.error("文件不存在!");
} else {
fSystem.copyToLocalFile(new Path(destPath), new Path(localPath));
LOG.info("download succeed");
}
} catch (IOException e) {
LOG.error("downloading failed");
e.printStackTrace();
}
}
//关闭流
private void close(Closeable stream) throws IOException {
stream.close();
}
}