
1479 lines
50 KiB

Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/package.html
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/package.html (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/package.html (working copy)
@@ -1,98 +0,0 @@
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- See the License for the specific language governing permissions and
- limitations under the License.
-<h1>A client for the Kosmos filesystem (KFS)</h1>
-This pages describes how to use Kosmos Filesystem
-(<a href="http://kosmosfs.sourceforge.net"> KFS </a>) as a backing
-store with Hadoop. This page assumes that you have downloaded the
-KFS software and installed necessary binaries as outlined in the KFS
- <ul>
- <li>In the Hadoop conf directory edit core-site.xml,
- add the following:
- <pre>
- &lt;name&gt;fs.kfs.impl&lt;/name&gt;
- &lt;value&gt;org.apache.hadoop.fs.kfs.KosmosFileSystem&lt;/value&gt;
- &lt;description&gt;The FileSystem for kfs: uris.&lt;/description&gt;
- </pre>
- <li>In the Hadoop conf directory edit core-site.xml,
- adding the following (with appropriate values for
- &lt;server&gt; and &lt;port&gt;):
- <pre>
- &lt;name&gt;fs.default.name&lt;/name&gt;
- &lt;value&gt;kfs://&lt;server:port&gt;&lt;/value&gt;
- &lt;name&gt;fs.kfs.metaServerHost&lt;/name&gt;
- &lt;value&gt;&lt;server&gt;&lt;/value&gt;
- &lt;description&gt;The location of the KFS meta server.&lt;/description&gt;
- &lt;name&gt;fs.kfs.metaServerPort&lt;/name&gt;
- &lt;value&gt;&lt;port&gt;&lt;/value&gt;
- &lt;description&gt;The location of the meta server's port.&lt;/description&gt;
- </li>
- <li>Copy KFS's <i> kfs-0.1.jar </i> to Hadoop's lib directory. This step
- enables Hadoop's to load the KFS specific modules. Note
- that, kfs-0.1.jar was built when you compiled KFS source
- code. This jar file contains code that calls KFS's client
- library code via JNI; the native code is in KFS's <i>
- libkfsClient.so </i> library.
- </li>
- <li> When the Hadoop map/reduce trackers start up, those
-processes (on local as well as remote nodes) will now need to load
-KFS's <i> libkfsClient.so </i> library. To simplify this process, it is advisable to
-store libkfsClient.so in an NFS accessible directory (similar to where
-Hadoop binaries/scripts are stored); then, modify Hadoop's
-conf/hadoop-env.sh adding the following line and providing suitable
-value for &lt;path&gt;:
-export LD_LIBRARY_PATH=&lt;path&gt;
- <li>Start only the map/reduce trackers
- <br />
- example: execute Hadoop's bin/start-mapred.sh</li>
- </ul>
-If the map/reduce job trackers start up, all file-I/O is done to KFS.
Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSInputStream.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSInputStream.java (working copy)
@@ -1,143 +0,0 @@
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Implements the Hadoop FSInputStream interfaces to allow applications to read
- * files in Kosmos File System (KFS).
- */
-package org.apache.hadoop.fs.kfs;
-import java.io.*;
-import java.nio.ByteBuffer;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSInputStream;
-import org.kosmix.kosmosfs.access.KfsAccess;
-import org.kosmix.kosmosfs.access.KfsInputChannel;
-class KFSInputStream extends FSInputStream {
- private KfsInputChannel kfsChannel;
- private FileSystem.Statistics statistics;
- private long fsize;
- @Deprecated
- public KFSInputStream(KfsAccess kfsAccess, String path) {
- this(kfsAccess, path, null);
- }
- public KFSInputStream(KfsAccess kfsAccess, String path,
- FileSystem.Statistics stats) {
- this.statistics = stats;
- this.kfsChannel = kfsAccess.kfs_open(path);
- if (this.kfsChannel != null)
- this.fsize = kfsAccess.kfs_filesize(path);
- else
- this.fsize = 0;
- }
- @Override
- public long getPos() throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- return kfsChannel.tell();
- }
- @Override
- public synchronized int available() throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- return (int) (this.fsize - getPos());
- }
- @Override
- public synchronized void seek(long targetPos) throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- kfsChannel.seek(targetPos);
- }
- @Override
- public synchronized boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
- @Override
- public synchronized int read() throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- byte b[] = new byte[1];
- int res = read(b, 0, 1);
- if (res == 1) {
- if (statistics != null) {
- statistics.incrementBytesRead(1);
- }
- return b[0] & 0xff;
- }
- return -1;
- }
- @Override
- public synchronized int read(byte b[], int off, int len) throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- int res;
- res = kfsChannel.read(ByteBuffer.wrap(b, off, len));
- // Use -1 to signify EOF
- if (res == 0)
- return -1;
- if (statistics != null) {
- statistics.incrementBytesRead(res);
- }
- return res;
- }
- @Override
- public synchronized void close() throws IOException {
- if (kfsChannel == null) {
- return;
- }
- kfsChannel.close();
- kfsChannel = null;
- }
- @Override
- public boolean markSupported() {
- return false;
- }
- @Override
- public void mark(int readLimit) {
- // Do nothing
- }
- @Override
- public void reset() throws IOException {
- throw new IOException("Mark not supported");
- }
Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java (working copy)
@@ -1,99 +0,0 @@
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Implements the Hadoop FSOutputStream interfaces to allow applications to write to
- * files in Kosmos File System (KFS).
- */
-package org.apache.hadoop.fs.kfs;
-import java.io.*;
-import java.nio.ByteBuffer;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.Progressable;
-import org.kosmix.kosmosfs.access.KfsAccess;
-import org.kosmix.kosmosfs.access.KfsOutputChannel;
-class KFSOutputStream extends OutputStream {
- private String path;
- private KfsOutputChannel kfsChannel;
- private Progressable progressReporter;
- public KFSOutputStream(KfsAccess kfsAccess, String path, short replication,
- boolean append, Progressable prog) {
- this.path = path;
- if ((append) && (kfsAccess.kfs_isFile(path)))
- this.kfsChannel = kfsAccess.kfs_append(path);
- else
- this.kfsChannel = kfsAccess.kfs_create(path, replication);
- this.progressReporter = prog;
- }
- public long getPos() throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- return kfsChannel.tell();
- }
- @Override
- public void write(int v) throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- byte[] b = new byte[1];
- b[0] = (byte) v;
- write(b, 0, 1);
- }
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- // touch the progress before going into KFS since the call can block
- progressReporter.progress();
- kfsChannel.write(ByteBuffer.wrap(b, off, len));
- }
- @Override
- public void flush() throws IOException {
- if (kfsChannel == null) {
- throw new IOException("File closed");
- }
- // touch the progress before going into KFS since the call can block
- progressReporter.progress();
- kfsChannel.sync();
- }
- @Override
- public synchronized void close() throws IOException {
- if (kfsChannel == null) {
- return;
- }
- flush();
- kfsChannel.close();
- kfsChannel = null;
- }
Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (working copy)
@@ -1,352 +0,0 @@
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Implements the Hadoop FS interfaces to allow applications to store
- *files in Kosmos File System (KFS).
- */
-package org.apache.hadoop.fs.kfs;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
- * A FileSystem backed by KFS.
- *
- */
-public class KosmosFileSystem extends FileSystem {
- private FileSystem localFs;
- private IFSImpl kfsImpl = null;
- private URI uri;
- private Path workingDir = new Path("/");
- public KosmosFileSystem() {
- }
- KosmosFileSystem(IFSImpl fsimpl) {
- this.kfsImpl = fsimpl;
- }
- /**
- * Return the protocol scheme for the FileSystem.
- * <p/>
- *
- * @return <code>kfs</code>
- */
- @Override
- public String getScheme() {
- return "kfs";
- }
- @Override
- public URI getUri() {
- return uri;
- }
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
- try {
- if (kfsImpl == null) {
- if (uri.getHost() == null) {
- kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
- conf.getInt("fs.kfs.metaServerPort", -1),
- statistics);
- } else {
- kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
- }
- }
- this.localFs = FileSystem.getLocal(conf);
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- this.workingDir = new Path("/user", System.getProperty("user.name")
- ).makeQualified(this);
- setConf(conf);
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("Unable to initialize KFS");
- System.exit(-1);
- }
- }
- @Override
- public Path getWorkingDirectory() {
- return workingDir;
- }
- @Override
- public void setWorkingDirectory(Path dir) {
- workingDir = makeAbsolute(dir);
- }
- private Path makeAbsolute(Path path) {
- if (path.isAbsolute()) {
- return path;
- }
- return new Path(workingDir, path);
- }
- @Override
- public boolean mkdirs(Path path, FsPermission permission
- ) throws IOException {
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- int res;
- // System.out.println("Calling mkdirs on: " + srep);
- res = kfsImpl.mkdirs(srep);
- return res == 0;
- }
- @Override
- public boolean isDirectory(Path path) throws IOException {
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- // System.out.println("Calling isdir on: " + srep);
- return kfsImpl.isDirectory(srep);
- }
- @Override
- public boolean isFile(Path path) throws IOException {
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- return kfsImpl.isFile(srep);
- }
- @Override
- public FileStatus[] listStatus(Path path) throws IOException {
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- if(!kfsImpl.exists(srep))
- throw new FileNotFoundException("File " + path + " does not exist.");
- if (kfsImpl.isFile(srep))
- return new FileStatus[] { getFileStatus(path) } ;
- return kfsImpl.readdirplus(absolute);
- }
- @Override
- public FileStatus getFileStatus(Path path) throws IOException {
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- if (!kfsImpl.exists(srep)) {
- throw new FileNotFoundException("File " + path + " does not exist.");
- }
- if (kfsImpl.isDirectory(srep)) {
- // System.out.println("Status of path: " + path + " is dir");
- return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep),
- path.makeQualified(this));
- } else {
- // System.out.println("Status of path: " + path + " is file");
- return new FileStatus(kfsImpl.filesize(srep), false,
- kfsImpl.getReplication(srep),
- getDefaultBlockSize(),
- kfsImpl.getModificationTime(srep),
- path.makeQualified(this));
- }
- }
- @Override
- public FSDataOutputStream append(Path f, int bufferSize,
- Progressable progress) throws IOException {
- Path parent = f.getParent();
- if (parent != null && !mkdirs(parent)) {
- throw new IOException("Mkdirs failed to create " + parent);
- }
- Path absolute = makeAbsolute(f);
- String srep = absolute.toUri().getPath();
- return kfsImpl.append(srep, bufferSize, progress);
- }
- @Override
- public FSDataOutputStream create(Path file, FsPermission permission,
- boolean overwrite, int bufferSize,
- short replication, long blockSize, Progressable progress)
- throws IOException {
- if (exists(file)) {
- if (overwrite) {
- delete(file, true);
- } else {
- throw new IOException("File already exists: " + file);
- }
- }
- Path parent = file.getParent();
- if (parent != null && !mkdirs(parent)) {
- throw new IOException("Mkdirs failed to create " + parent);
- }
- Path absolute = makeAbsolute(file);
- String srep = absolute.toUri().getPath();
- return kfsImpl.create(srep, replication, bufferSize, progress);
- }
- @Override
- public FSDataInputStream open(Path path, int bufferSize) throws IOException {
- if (!exists(path))
- throw new IOException("File does not exist: " + path);
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- return kfsImpl.open(srep, bufferSize);
- }
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- Path absoluteS = makeAbsolute(src);
- String srepS = absoluteS.toUri().getPath();
- Path absoluteD = makeAbsolute(dst);
- String srepD = absoluteD.toUri().getPath();
- // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
- return kfsImpl.rename(srepS, srepD) == 0;
- }
- // recursively delete the directory and its contents
- @Override
- public boolean delete(Path path, boolean recursive) throws IOException {
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- if (kfsImpl.isFile(srep))
- return kfsImpl.remove(srep) == 0;
- FileStatus[] dirEntries = listStatus(absolute);
- if (!recursive && (dirEntries.length != 0)) {
- throw new IOException("Directory " + path.toString() +
- " is not empty.");
- }
- for (int i = 0; i < dirEntries.length; i++) {
- delete(new Path(absolute, dirEntries[i].getPath()), recursive);
- }
- return kfsImpl.rmdir(srep) == 0;
- }
- @Override
- public short getDefaultReplication() {
- return 3;
- }
- @Override
- public boolean setReplication(Path path, short replication)
- throws IOException {
- Path absolute = makeAbsolute(path);
- String srep = absolute.toUri().getPath();
- int res = kfsImpl.setReplication(srep, replication);
- return res >= 0;
- }
- // 64MB is the KFS block size
- @Override
- public long getDefaultBlockSize() {
- return 1 << 26;
- }
- @Deprecated
- public void lock(Path path, boolean shared) throws IOException {
- }
- @Deprecated
- public void release(Path path) throws IOException {
- }
- /**
- * Return null if the file doesn't exist; otherwise, get the
- * locations of the various chunks of the file file from KFS.
- */
- @Override
- public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
- long len) throws IOException {
- if (file == null) {
- return null;
- }
- String srep = makeAbsolute(file.getPath()).toUri().getPath();
- String[][] hints = kfsImpl.getDataLocation(srep, start, len);
- if (hints == null) {
- return null;
- }
- BlockLocation[] result = new BlockLocation[hints.length];
- long blockSize = getDefaultBlockSize();
- long length = len;
- long blockStart = start;
- for(int i=0; i < result.length; ++i) {
- result[i] = new BlockLocation(null, hints[i], blockStart,
- length < blockSize ? length : blockSize);
- blockStart += blockSize;
- length -= blockSize;
- }
- return result;
- }
- @Override
- public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
- FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
- }
- @Override
- public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
- FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
- }
- @Override
- public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
- throws IOException {
- return tmpLocalFile;
- }
- @Override
- public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
- throws IOException {
- moveFromLocalFile(tmpLocalFile, fsOutputFile);
- }
Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/IFSImpl.java
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/IFSImpl.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/IFSImpl.java (working copy)
@@ -1,59 +0,0 @@
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * We need to provide the ability to the code in fs/kfs without really
- * having a KFS deployment. In particular, the glue code that wraps
- * around calls to KfsAccess object. This is accomplished by defining a
- * filesystem implementation interface:
- * -- for testing purposes, a dummy implementation of this interface
- * will suffice; as long as the dummy implementation is close enough
- * to doing what KFS does, we are good.
- * -- for deployment purposes with KFS, this interface is
- * implemented by the KfsImpl object.
- */
-package org.apache.hadoop.fs.kfs;
-import java.io.*;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Progressable;
-interface IFSImpl {
- public boolean exists(String path) throws IOException;
- public boolean isDirectory(String path) throws IOException;
- public boolean isFile(String path) throws IOException;
- public String[] readdir(String path) throws IOException;
- public FileStatus[] readdirplus(Path path) throws IOException;
- public int mkdirs(String path) throws IOException;
- public int rename(String source, String dest) throws IOException;
- public int rmdir(String path) throws IOException;
- public int remove(String path) throws IOException;
- public long filesize(String path) throws IOException;
- public short getReplication(String path) throws IOException;
- public short setReplication(String path, short replication) throws IOException;
- public String[][] getDataLocation(String path, long start, long len) throws IOException;
- public long getModificationTime(String path) throws IOException;
- public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException;
- public FSDataInputStream open(String path, int bufferSize) throws IOException;
- public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException;
Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSConfigKeys.java
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSConfigKeys.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSConfigKeys.java (working copy)
@@ -1,47 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.kfs;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
- * This class contains constants for configuration keys used
- * in the kfs file system.
- *
- */
-public class KFSConfigKeys extends CommonConfigurationKeys {
- public static final String KFS_BLOCK_SIZE_KEY = "kfs.blocksize";
- public static final long KFS_BLOCK_SIZE_DEFAULT = 64*1024*1024;
- public static final String KFS_REPLICATION_KEY = "kfs.replication";
- public static final short KFS_REPLICATION_DEFAULT = 1;
- public static final String KFS_STREAM_BUFFER_SIZE_KEY =
- "kfs.stream-buffer-size";
- public static final int KFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
- public static final String KFS_BYTES_PER_CHECKSUM_KEY =
- "kfs.bytes-per-checksum";
- public static final int KFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
- public static final String KFS_CLIENT_WRITE_PACKET_SIZE_KEY =
- "kfs.client-write-packet-size";
- public static final int KFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
Index: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSImpl.java
--- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSImpl.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/kfs/KFSImpl.java (working copy)
@@ -1,171 +0,0 @@
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Provide the implementation of KFS which turn into calls to KfsAccess.
- */
-package org.apache.hadoop.fs.kfs;
-import java.io.*;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.kosmix.kosmosfs.access.KfsAccess;
-import org.kosmix.kosmosfs.access.KfsFileAttr;
-import org.apache.hadoop.util.Progressable;
-class KFSImpl implements IFSImpl {
- private KfsAccess kfsAccess = null;
- private FileSystem.Statistics statistics;
- @Deprecated
- public KFSImpl(String metaServerHost, int metaServerPort
- ) throws IOException {
- this(metaServerHost, metaServerPort, null);
- }
- public KFSImpl(String metaServerHost, int metaServerPort,
- FileSystem.Statistics stats) throws IOException {
- kfsAccess = new KfsAccess(metaServerHost, metaServerPort);
- statistics = stats;
- }
- @Override
- public boolean exists(String path) throws IOException {
- return kfsAccess.kfs_exists(path);
- }
- @Override
- public boolean isDirectory(String path) throws IOException {
- return kfsAccess.kfs_isDirectory(path);
- }
- @Override
- public boolean isFile(String path) throws IOException {
- return kfsAccess.kfs_isFile(path);
- }
- @Override
- public String[] readdir(String path) throws IOException {
- return kfsAccess.kfs_readdir(path);
- }
- @Override
- public FileStatus[] readdirplus(Path path) throws IOException {
- String srep = path.toUri().getPath();
- KfsFileAttr[] fattr = kfsAccess.kfs_readdirplus(srep);
- if (fattr == null)
- return null;
- int numEntries = 0;
- for (int i = 0; i < fattr.length; i++) {
- if ((fattr[i].filename.compareTo(".") == 0) || (fattr[i].filename.compareTo("target/generated-sources") == 0))
- continue;
- numEntries++;
- }
- FileStatus[] fstatus = new FileStatus[numEntries];
- int j = 0;
- for (int i = 0; i < fattr.length; i++) {
- if ((fattr[i].filename.compareTo(".") == 0) || (fattr[i].filename.compareTo("target/generated-sources") == 0))
- continue;
- Path fn = new Path(path, fattr[i].filename);
- if (fattr[i].isDirectory)
- fstatus[j] = new FileStatus(0, true, 1, 0, fattr[i].modificationTime, fn);
- else
- fstatus[j] = new FileStatus(fattr[i].filesize, fattr[i].isDirectory,
- fattr[i].replication,
- (long)
- (1 << 26),
- fattr[i].modificationTime,
- fn);
- j++;
- }
- return fstatus;
- }
- @Override
- public int mkdirs(String path) throws IOException {
- return kfsAccess.kfs_mkdirs(path);
- }
- @Override
- public int rename(String source, String dest) throws IOException {
- return kfsAccess.kfs_rename(source, dest);
- }
- @Override
- public int rmdir(String path) throws IOException {
- return kfsAccess.kfs_rmdir(path);
- }
- @Override
- public int remove(String path) throws IOException {
- return kfsAccess.kfs_remove(path);
- }
- @Override
- public long filesize(String path) throws IOException {
- return kfsAccess.kfs_filesize(path);
- }
- @Override
- public short getReplication(String path) throws IOException {
- return kfsAccess.kfs_getReplication(path);
- }
- @Override
- public short setReplication(String path, short replication) throws IOException {
- return kfsAccess.kfs_setReplication(path, replication);
- }
- @Override
- public String[][] getDataLocation(String path, long start, long len) throws IOException {
- return kfsAccess.kfs_getDataLocation(path, start, len);
- }
- @Override
- public long getModificationTime(String path) throws IOException {
- return kfsAccess.kfs_getModificationTime(path);
- }
- @Override
- public FSDataInputStream open(String path, int bufferSize) throws IOException {
- return new FSDataInputStream(new KFSInputStream(kfsAccess, path,
- statistics));
- }
- @Override
- public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
- return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication, false, progress),
- statistics);
- }
- @Override
- public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
- // when opening for append, # of replicas is ignored
- return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, (short) 1, true, progress),
- statistics);
- }
Index: hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
--- hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem (working copy)
@@ -17,6 +17,5 @@
Index: hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
--- hadoop-common-project/hadoop-common/src/main/resources/core-default.xml (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/main/resources/core-default.xml (working copy)
@@ -774,42 +774,6 @@
<description>Replication factor</description>
-<!-- Kosmos File System -->
- <name>kfs.stream-buffer-size</name>
- <value>4096</value>
- <description>The size of buffer to stream files.
- The size of this buffer should probably be a multiple of hardware
- page size (4096 on Intel x86), and it determines how much data is
- buffered during read and write operations.</description>
- <name>kfs.bytes-per-checksum</name>
- <value>512</value>
- <description>The number of bytes per checksum. Must not be larger than
- kfs.stream-buffer-size</description>
- <name>kfs.client-write-packet-size</name>
- <value>65536</value>
- <description>Packet size for clients to write</description>
- <name>kfs.blocksize</name>
- <value>67108864</value>
- <description>Block size</description>
- <name>kfs.replication</name>
- <value>3</value>
- <description>Replication factor</description>
<!-- FTP file system -->
Index: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
--- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (working copy)
@@ -1,199 +0,0 @@
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Unit tests for testing the KosmosFileSystem API implementation.
- */
-package org.apache.hadoop.fs.kfs;
-import java.io.IOException;
-import java.net.URI;
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-public class TestKosmosFileSystem extends TestCase {
- KosmosFileSystem kosmosFileSystem;
- KFSEmulationImpl kfsEmul;
- Path baseDir;
- @Override
- protected void setUp() throws IOException {
- Configuration conf = new Configuration();
- kfsEmul = new KFSEmulationImpl(conf);
- kosmosFileSystem = new KosmosFileSystem(kfsEmul);
- // a dummy URI; we are not connecting to any setup here
- kosmosFileSystem.initialize(URI.create("kfs:///"), conf);
- baseDir = new Path(System.getProperty("test.build.data", "/tmp" ) +
- "/kfs-test");
- }
- @Override
- protected void tearDown() throws Exception {
- }
- // @Test
- // Check all the directory API's in KFS
- public void testDirs() throws Exception {
- Path subDir1 = new Path("dir.1");
- // make the dir
- kosmosFileSystem.mkdirs(baseDir);
- assertTrue(kosmosFileSystem.isDirectory(baseDir));
- kosmosFileSystem.setWorkingDirectory(baseDir);
- kosmosFileSystem.mkdirs(subDir1);
- assertTrue(kosmosFileSystem.isDirectory(subDir1));
- assertFalse(kosmosFileSystem.exists(new Path("test1")));
- assertFalse(kosmosFileSystem.isDirectory(new Path("test/dir.2")));
- FileStatus[] p = kosmosFileSystem.listStatus(baseDir);
- assertEquals(p.length, 1);
- kosmosFileSystem.delete(baseDir, true);
- assertFalse(kosmosFileSystem.exists(baseDir));
- }
- // @Test
- // Check the file API's
- public void testFiles() throws Exception {
- Path subDir1 = new Path("dir.1");
- Path file1 = new Path("dir.1/foo.1");
- Path file2 = new Path("dir.1/foo.2");
- kosmosFileSystem.mkdirs(baseDir);
- assertTrue(kosmosFileSystem.isDirectory(baseDir));
- kosmosFileSystem.setWorkingDirectory(baseDir);
- kosmosFileSystem.mkdirs(subDir1);
- FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
- FSDataOutputStream s2 = kosmosFileSystem.create(file2, true, 4096, (short) 1, (long) 4096, null);
- s1.close();
- s2.close();
- FileStatus[] p = kosmosFileSystem.listStatus(subDir1);
- assertEquals(p.length, 2);
- kosmosFileSystem.delete(file1, true);
- p = kosmosFileSystem.listStatus(subDir1);
- assertEquals(p.length, 1);
- kosmosFileSystem.delete(file2, true);
- p = kosmosFileSystem.listStatus(subDir1);
- assertEquals(p.length, 0);
- kosmosFileSystem.delete(baseDir, true);
- assertFalse(kosmosFileSystem.exists(baseDir));
- }
- // @Test
- // Check file/read write
- public void testFileIO() throws Exception {
- Path subDir1 = new Path("dir.1");
- Path file1 = new Path("dir.1/foo.1");
- kosmosFileSystem.mkdirs(baseDir);
- assertTrue(kosmosFileSystem.isDirectory(baseDir));
- kosmosFileSystem.setWorkingDirectory(baseDir);
- kosmosFileSystem.mkdirs(subDir1);
- FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
- int bufsz = 4096;
- byte[] data = new byte[bufsz];
- for (int i = 0; i < data.length; i++)
- data[i] = (byte) (i % 16);
- // write 4 bytes and read them back; read API should return a byte per call
- s1.write(32);
- s1.write(32);
- s1.write(32);
- s1.write(32);
- // write some data
- s1.write(data, 0, data.length);
- // flush out the changes
- s1.close();
- // Read the stuff back and verify it is correct
- FSDataInputStream s2 = kosmosFileSystem.open(file1, 4096);
- int v;
- long nread = 0;
- v = s2.read();
- assertEquals(v, 32);
- v = s2.read();
- assertEquals(v, 32);
- v = s2.read();
- assertEquals(v, 32);
- v = s2.read();
- assertEquals(v, 32);
- assertEquals(s2.available(), data.length);
- byte[] buf = new byte[bufsz];
- s2.read(buf, 0, buf.length);
- nread = s2.getPos();
- for (int i = 0; i < data.length; i++)
- assertEquals(data[i], buf[i]);
- assertEquals(s2.available(), 0);
- s2.close();
- // append some data to the file
- try {
- s1 = kosmosFileSystem.append(file1);
- for (int i = 0; i < data.length; i++)
- data[i] = (byte) (i % 17);
- // write the data
- s1.write(data, 0, data.length);
- // flush out the changes
- s1.close();
- // read it back and validate
- s2 = kosmosFileSystem.open(file1, 4096);
- s2.seek(nread);
- s2.read(buf, 0, buf.length);
- for (int i = 0; i < data.length; i++)
- assertEquals(data[i], buf[i]);
- s2.close();
- } catch (Exception e) {
- System.out.println("append isn't supported by the underlying fs");
- }
- kosmosFileSystem.delete(file1, true);
- assertFalse(kosmosFileSystem.exists(file1));
- kosmosFileSystem.delete(subDir1, true);
- assertFalse(kosmosFileSystem.exists(subDir1));
- kosmosFileSystem.delete(baseDir, true);
- assertFalse(kosmosFileSystem.exists(baseDir));
- }
Index: hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
--- hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (revision 1395699)
+++ hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (working copy)
@@ -1,168 +0,0 @@
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * We need to provide the ability to the code in fs/kfs without really
- * having a KFS deployment. For this purpose, use the LocalFileSystem
- * as a way to "emulate" KFS.
- */
-package org.apache.hadoop.fs.kfs;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Progressable;
-public class KFSEmulationImpl implements IFSImpl {
- FileSystem localFS;
- public KFSEmulationImpl(Configuration conf) throws IOException {
- localFS = FileSystem.getLocal(conf);
- }
- @Override
- public boolean exists(String path) throws IOException {
- return localFS.exists(new Path(path));
- }
- @Override
- public boolean isDirectory(String path) throws IOException {
- return localFS.isDirectory(new Path(path));
- }
- @Override
- public boolean isFile(String path) throws IOException {
- return localFS.isFile(new Path(path));
- }
- @Override
- public String[] readdir(String path) throws IOException {
- FileStatus[] p = localFS.listStatus(new Path(path));
- try {
- p = localFS.listStatus(new Path(path));
- } catch ( FileNotFoundException fnfe ) {
- return null;
- }
- String[] entries = null;
- entries = new String[p.length];
- for (int i = 0; i < p.length; i++)
- entries[i] = p[i].getPath().toString();
- return entries;
- }
- @Override
- public FileStatus[] readdirplus(Path path) throws IOException {
- return localFS.listStatus(path);
- }
- @Override
- public int mkdirs(String path) throws IOException {
- if (localFS.mkdirs(new Path(path)))
- return 0;
- return -1;
- }
- @Override
- public int rename(String source, String dest) throws IOException {
- if (localFS.rename(new Path(source), new Path(dest)))
- return 0;
- return -1;
- }
- @Override
- public int rmdir(String path) throws IOException {
- if (isDirectory(path)) {
- // the directory better be empty
- String[] dirEntries = readdir(path);
- if ((dirEntries.length <= 2) && (localFS.delete(new Path(path), true)))
- return 0;
- }
- return -1;
- }
- @Override
- public int remove(String path) throws IOException {
- if (isFile(path) && (localFS.delete(new Path(path), true)))
- return 0;
- return -1;
- }
- @Override
- public long filesize(String path) throws IOException {
- return localFS.getFileStatus(new Path(path)).getLen();
- }
- @Override
- public short getReplication(String path) throws IOException {
- return 1;
- }
- @Override
- public short setReplication(String path, short replication) throws IOException {
- return 1;
- }
- @Override
- public String[][] getDataLocation(String path, long start, long len) throws IOException {
- BlockLocation[] blkLocations =
- localFS.getFileBlockLocations(localFS.getFileStatus(new Path(path)),
- start, len);
- if ((blkLocations == null) || (blkLocations.length == 0)) {
- return new String[0][];
- }
- int blkCount = blkLocations.length;
- String[][]hints = new String[blkCount][];
- for (int i=0; i < blkCount ; i++) {
- String[] hosts = blkLocations[i].getHosts();
- hints[i] = new String[hosts.length];
- hints[i] = hosts;
- }
- return hints;
- }
- @Override
- public long getModificationTime(String path) throws IOException {
- FileStatus s = localFS.getFileStatus(new Path(path));
- if (s == null)
- return 0;
- return s.getModificationTime();
- }
- @Override
- public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
- // besides path/overwrite, the other args don't matter for
- // testing purposes.
- return localFS.append(new Path(path));
- }
- @Override
- public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
- // besides path/overwrite, the other args don't matter for
- // testing purposes.
- return localFS.create(new Path(path));
- }
- @Override
- public FSDataInputStream open(String path, int bufferSize) throws IOException {
- return localFS.open(new Path(path));
- }
Index: hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
--- hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (revision 1395699)
+++ hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (working copy)
@@ -176,18 +176,6 @@
- <Class name="org.apache.hadoop.fs.kfs.KFSOutputStream" />
- <Field name="path" />
- <Bug pattern="URF_UNREAD_FIELD" />
- </Match>
- <Match>
- <Class name="org.apache.hadoop.fs.kfs.KosmosFileSystem" />
- <Method name="initialize" />
- <Bug pattern="DM_EXIT" />
- </Match>
- <Match>
<Class name="org.apache.hadoop.io.Closeable" />
Index: hadoop-common-project/hadoop-common/pom.xml
--- hadoop-common-project/hadoop-common/pom.xml (revision 1395699)
+++ hadoop-common-project/hadoop-common/pom.xml (working copy)
@@ -195,11 +195,6 @@
- <groupId>net.sf.kosmosfs</groupId>
- <artifactId>kfs</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>