001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.net;
019    
020    import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY;
021    
022    import java.io.BufferedReader;
023    import java.io.FileReader;
024    import java.io.IOException;
025    import java.util.ArrayList;
026    import java.util.HashMap;
027    import java.util.List;
028    import java.util.Map;
029    
030    import org.apache.commons.lang.StringUtils;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    import org.apache.hadoop.classification.InterfaceAudience;
034    import org.apache.hadoop.classification.InterfaceStability;
035    import org.apache.hadoop.conf.Configuration;
036    import org.apache.hadoop.conf.Configured;
037    
038    /**
039     * <p>
040     * Simple {@link DNSToSwitchMapping} implementation that reads a 2 column text
041     * file. The columns are separated by whitespace. The first column is a DNS or
042     * IP address and the second column specifies the rack where the address maps.
043     * </p>
044     * <p>
045     * This class uses the configuration parameter {@code
046     * net.topology.table.file.name} to locate the mapping file.
047     * </p>
048     * <p>
049     * Calls to {@link #resolve(List)} will look up the address as defined in the
050     * mapping file. If no entry corresponding to the address is found, the value
051     * {@code /default-rack} is returned.
052     * </p>
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Evolving
056    public class TableMapping extends CachedDNSToSwitchMapping {
057    
058      private static final Log LOG = LogFactory.getLog(TableMapping.class);
059      
060      public TableMapping() {
061        super(new RawTableMapping());
062      }
063      
064      private RawTableMapping getRawMapping() {
065        return (RawTableMapping) rawMapping;
066      }
067    
068      @Override
069      public Configuration getConf() {
070        return getRawMapping().getConf();
071      }
072    
073      @Override
074      public void setConf(Configuration conf) {
075        super.setConf(conf);
076        getRawMapping().setConf(conf);
077      }
078      
079      private static final class RawTableMapping extends Configured
080          implements DNSToSwitchMapping {
081        
082        private final Map<String, String> map = new HashMap<String, String>();
083        private boolean initialized = false;
084      
085        private synchronized void load() {
086          map.clear();
087      
088          String filename = getConf().get(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY, null);
089          if (StringUtils.isBlank(filename)) {
090            LOG.warn(NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY + " not configured. "
091                + NetworkTopology.DEFAULT_RACK + " will be returned.");
092            return;
093          }
094      
095          BufferedReader reader = null;
096          try {
097            reader = new BufferedReader(new FileReader(filename));
098            String line = reader.readLine();
099            while (line != null) {
100              line = line.trim();
101              if (line.length() != 0 && line.charAt(0) != '#') {
102                String[] columns = line.split("\\s+");
103                if (columns.length == 2) {
104                  map.put(columns[0], columns[1]);
105                } else {
106                  LOG.warn("Line does not have two columns. Ignoring. " + line);
107                }
108              }
109              line = reader.readLine();
110            }
111          } catch (Exception e) {
112            LOG.warn(filename + " cannot be read. " + NetworkTopology.DEFAULT_RACK
113                + " will be returned.", e);
114            map.clear();
115          } finally {
116            if (reader != null) {
117              try {
118                reader.close();
119              } catch (IOException e) {
120                LOG.warn(filename + " cannot be read. "
121                    + NetworkTopology.DEFAULT_RACK + " will be returned.", e);
122                map.clear();
123              }
124            }
125          }
126        }
127      
128        @Override
129        public synchronized List<String> resolve(List<String> names) {
130          if (!initialized) {
131            initialized = true;
132            load();
133          }
134      
135          List<String> results = new ArrayList<String>(names.size());
136          for (String name : names) {
137            String result = map.get(name);
138            if (result != null) {
139              results.add(result);
140            } else {
141              results.add(NetworkTopology.DEFAULT_RACK);
142            }
143          }
144          return results;
145        }
146        
147      }
148    }