大数据应用项目实践:HDFS的Java客户端操作
浙江理工大学 2024 大数据应用项目实践 实操 HDFS的Java客户端操作
实操文档和项目代码
3.1 HDFS客户端环境准备
创建一个Maven工程,在pom.xml的<project>
中添加以下依赖:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> </dependencies>
|
在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
log4j.rootLogger=info, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
|
创建HdfsClient类
public class HdfsClient{ @Test public void testMkdirs() throws IOException, InterruptedException, URISyntaxException{ Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); fs.mkdirs(new Path("/0723")); fs.close(); } }
|
3.2 HDFS的API操作
3.2.1 HDFS文件上传
@Test public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException { Configuration configuration = new Configuration(); configuration.set("dfs.replication", "2"); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root");
fs.copyFromLocalFile(new Path("e:/hello.txt"), new Path("/hello.txt"));
fs.close(); System.out.println("over"); }
|
3.2.2 HDFS文件下载
@Test public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException{ Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); fs.copyToLocalFile(false, new Path("/hello1.txt"), new Path("e:/hello1.txt"), true); fs.close(); }
|
3.2.3 HDFS文件夹删除
@Test public void testDelete() throws IOException, InterruptedException, URISyntaxException{ Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); fs.delete(new Path("/1108/"), true); fs.close(); }
|
3.2.4 HDFS文件名更改
@Test public void testRename() throws IOException, InterruptedException, URISyntaxException{ Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); fs.rename(new Path("/hello.txt"), new Path("/hello6.txt")); fs.close(); }
|
3.2.5 HDFS文件和文件夹判断
@Test public void testListStatus() throws IOException, InterruptedException, URISyntaxException{ Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fileStatus : listStatus) { if (fileStatus.isFile()) { System.out.println("f:"+fileStatus.getPath().getName()); }else { System.out.println("d:"+fileStatus.getPath().getName()); } } fs.close(); }
|
3.2.6 通过IO流操作HDFS
HDFS文件上传
@Test public void putFileToHDFS() throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "root"); FileInputStream fis = new FileInputStream(new File("testio.txt")); FSDataOutputStream fos = fs.create(new Path("/testio.txt")); IOUtils.copyBytes(fis, fos, conf); IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
|
HDFS文件下载到本地
@Test public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "root"); FSDataInputStream fis = fs.open(new Path("/testio.txt")); FileOutputStream fos = new FileOutputStream(new File("testio1.txt")); IOUtils.copyBytes(fis, fos, conf); IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
|
定位文件读取 - 下载第一块
@Test
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "root"); FSDataInputStream fis = fs.open(new Path("/hadoop-3.3.6.tar.gz")); FileOutputStream fos = new FileOutputStream(new File("hadoop-3.3.6.tar.gz.part1")); byte[] buffer = new byte[1024]; for (int i = 0; i < 1024 * 128; i++) { fis.read(buffer); fos.write(buffer); } IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
|
定位文件读取 - 下载第二块
@Test
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "root"); FSDataInputStream fis = fs.open(new Path("/hadoop-3.3.6.tar.gz")); fis.seek(1024*1024*128); FileOutputStream fos = new FileOutputStream(new File("hadoop-3.3.6.tar.gz.part2")); IOUtils.copyBytes(fis, fos, conf); IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
|
合并文件
cat hadoop-3.3.6.tar.gz.part1 hadoop-3.3.6.tar.gz.part2 > merged.tar.gz
|