001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs.s3;
019
020 import java.io.IOException;
021 import java.io.InputStream;
022 import java.io.UnsupportedEncodingException;
023 import java.net.URI;
024 import java.net.URLDecoder;
025 import java.net.URLEncoder;
026 import java.util.Set;
027 import java.util.TreeSet;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.conf.Configured;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.util.Tool;
034 import org.apache.hadoop.util.ToolRunner;
035 import org.jets3t.service.S3Service;
036 import org.jets3t.service.S3ServiceException;
037 import org.jets3t.service.impl.rest.httpclient.RestS3Service;
038 import org.jets3t.service.model.S3Bucket;
039 import org.jets3t.service.model.S3Object;
040 import org.jets3t.service.security.AWSCredentials;
041
042 /**
043 * <p>
044 * This class is a tool for migrating data from an older to a newer version
045 * of an S3 filesystem.
046 * </p>
047 * <p>
048 * All files in the filesystem are migrated by re-writing the block metadata
049 * - no datafiles are touched.
050 * </p>
051 */
052 @InterfaceAudience.Public
053 @InterfaceStability.Unstable
054 public class MigrationTool extends Configured implements Tool {
055
056 private S3Service s3Service;
057 private S3Bucket bucket;
058
059 public static void main(String[] args) throws Exception {
060 int res = ToolRunner.run(new MigrationTool(), args);
061 System.exit(res);
062 }
063
064 @Override
065 public int run(String[] args) throws Exception {
066
067 if (args.length == 0) {
068 System.err.println("Usage: MigrationTool <S3 file system URI>");
069 System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
070 ToolRunner.printGenericCommandUsage(System.err);
071 return -1;
072 }
073
074 URI uri = URI.create(args[0]);
075
076 initialize(uri);
077
078 FileSystemStore newStore = new Jets3tFileSystemStore();
079 newStore.initialize(uri, getConf());
080
081 if (get("%2F") != null) {
082 System.err.println("Current version number is [unversioned].");
083 System.err.println("Target version number is " +
084 newStore.getVersion() + ".");
085 Store oldStore = new UnversionedStore();
086 migrate(oldStore, newStore);
087 return 0;
088 } else {
089 S3Object root = get("/");
090 if (root != null) {
091 String version = (String) root.getMetadata("fs-version");
092 if (version == null) {
093 System.err.println("Can't detect version - exiting.");
094 } else {
095 String newVersion = newStore.getVersion();
096 System.err.println("Current version number is " + version + ".");
097 System.err.println("Target version number is " + newVersion + ".");
098 if (version.equals(newStore.getVersion())) {
099 System.err.println("No migration required.");
100 return 0;
101 }
102 // use version number to create Store
103 //Store oldStore = ...
104 //migrate(oldStore, newStore);
105 System.err.println("Not currently implemented.");
106 return 0;
107 }
108 }
109 System.err.println("Can't detect version - exiting.");
110 return 0;
111 }
112
113 }
114
115 public void initialize(URI uri) throws IOException {
116
117
118
119 try {
120 String accessKey = null;
121 String secretAccessKey = null;
122 String userInfo = uri.getUserInfo();
123 if (userInfo != null) {
124 int index = userInfo.indexOf(':');
125 if (index != -1) {
126 accessKey = userInfo.substring(0, index);
127 secretAccessKey = userInfo.substring(index + 1);
128 } else {
129 accessKey = userInfo;
130 }
131 }
132 if (accessKey == null) {
133 accessKey = getConf().get("fs.s3.awsAccessKeyId");
134 }
135 if (secretAccessKey == null) {
136 secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
137 }
138 if (accessKey == null && secretAccessKey == null) {
139 throw new IllegalArgumentException("AWS " +
140 "Access Key ID and Secret Access Key " +
141 "must be specified as the username " +
142 "or password (respectively) of a s3 URL, " +
143 "or by setting the " +
144 "fs.s3.awsAccessKeyId or " +
145 "fs.s3.awsSecretAccessKey properties (respectively).");
146 } else if (accessKey == null) {
147 throw new IllegalArgumentException("AWS " +
148 "Access Key ID must be specified " +
149 "as the username of a s3 URL, or by setting the " +
150 "fs.s3.awsAccessKeyId property.");
151 } else if (secretAccessKey == null) {
152 throw new IllegalArgumentException("AWS " +
153 "Secret Access Key must be specified " +
154 "as the password of a s3 URL, or by setting the " +
155 "fs.s3.awsSecretAccessKey property.");
156 }
157 AWSCredentials awsCredentials =
158 new AWSCredentials(accessKey, secretAccessKey);
159 this.s3Service = new RestS3Service(awsCredentials);
160 } catch (S3ServiceException e) {
161 if (e.getCause() instanceof IOException) {
162 throw (IOException) e.getCause();
163 }
164 throw new S3Exception(e);
165 }
166 bucket = new S3Bucket(uri.getHost());
167 }
168
169 private void migrate(Store oldStore, FileSystemStore newStore)
170 throws IOException {
171 for (Path path : oldStore.listAllPaths()) {
172 INode inode = oldStore.retrieveINode(path);
173 oldStore.deleteINode(path);
174 newStore.storeINode(path, inode);
175 }
176 }
177
178 private S3Object get(String key) {
179 try {
180 return s3Service.getObject(bucket, key);
181 } catch (S3ServiceException e) {
182 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
183 return null;
184 }
185 }
186 return null;
187 }
188
189 interface Store {
190
191 Set<Path> listAllPaths() throws IOException;
192 INode retrieveINode(Path path) throws IOException;
193 void deleteINode(Path path) throws IOException;
194
195 }
196
197 class UnversionedStore implements Store {
198
199 @Override
200 public Set<Path> listAllPaths() throws IOException {
201 try {
202 String prefix = urlEncode(Path.SEPARATOR);
203 S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
204 Set<Path> prefixes = new TreeSet<Path>();
205 for (int i = 0; i < objects.length; i++) {
206 prefixes.add(keyToPath(objects[i].getKey()));
207 }
208 return prefixes;
209 } catch (S3ServiceException e) {
210 if (e.getCause() instanceof IOException) {
211 throw (IOException) e.getCause();
212 }
213 throw new S3Exception(e);
214 }
215 }
216
217 @Override
218 public void deleteINode(Path path) throws IOException {
219 delete(pathToKey(path));
220 }
221
222 private void delete(String key) throws IOException {
223 try {
224 s3Service.deleteObject(bucket, key);
225 } catch (S3ServiceException e) {
226 if (e.getCause() instanceof IOException) {
227 throw (IOException) e.getCause();
228 }
229 throw new S3Exception(e);
230 }
231 }
232
233 @Override
234 public INode retrieveINode(Path path) throws IOException {
235 return INode.deserialize(get(pathToKey(path)));
236 }
237
238 private InputStream get(String key) throws IOException {
239 try {
240 S3Object object = s3Service.getObject(bucket, key);
241 return object.getDataInputStream();
242 } catch (S3ServiceException e) {
243 if ("NoSuchKey".equals(e.getS3ErrorCode())) {
244 return null;
245 }
246 if (e.getCause() instanceof IOException) {
247 throw (IOException) e.getCause();
248 }
249 throw new S3Exception(e);
250 }
251 }
252
253 private String pathToKey(Path path) {
254 if (!path.isAbsolute()) {
255 throw new IllegalArgumentException("Path must be absolute: " + path);
256 }
257 return urlEncode(path.toUri().getPath());
258 }
259
260 private Path keyToPath(String key) {
261 return new Path(urlDecode(key));
262 }
263
264 private String urlEncode(String s) {
265 try {
266 return URLEncoder.encode(s, "UTF-8");
267 } catch (UnsupportedEncodingException e) {
268 // Should never happen since every implementation of the Java Platform
269 // is required to support UTF-8.
270 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
271 throw new IllegalStateException(e);
272 }
273 }
274
275 private String urlDecode(String s) {
276 try {
277 return URLDecoder.decode(s, "UTF-8");
278 } catch (UnsupportedEncodingException e) {
279 // Should never happen since every implementation of the Java Platform
280 // is required to support UTF-8.
281 // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
282 throw new IllegalStateException(e);
283 }
284 }
285
286 }
287
288 }