Connect to Accumulo inside Mapper using Kerberos

I am moving some software from an older Hadoop cluster (uses authentication username / password ) for the newer version 2.6.0-cdh5.12.0 , which has Kerberos authentication enabled .

I managed to get many of the existing Map / Reduce jobs that use Accumulo for input and / or output to work using the DelegationToken set in the AccumuloInput / OutputFormat classes.

However, I have 1 job that uses AccumuloInput / OutputFormat for input and output, but also inside its Mapper.setup () method, it connects to Accumulo via Zookeeper, so in the Mapper.map () method it can compare each key / value processed by my Mapper.map () and go into another Accumulo table.

I have included the appropriate code below, which shows the setup () method connecting the Zookeeper user with PasswordToken, and then creates an Accumulo table scanner, which is then used in the mapper method.

So, the question is, how to replace the use of PasswordToken with KerberosToken to configure the Accumulo scanner in the Mapper.setup () method? I cannot find a way to "get" the DelegationToken used in the AccumulInput / OutputFormat classes that I set.

I tried context.getCredentials (). getAllTokens () and searches for a token of type org.apache.accumul..code.client.security.tokens.AuthenticationToken - all tokens returned here are of type org. apache.hadoop.security.token.Token.

Please note that I typed the code snippets compared to cut / paste, because the code works on a network that is not connected to the Internet - otherwise there may be a typo. :)

//****************************
// code in the M/R driver
//****************************
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1");
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg);
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig());
AccumuloInputFormat.setConnectorInfo(job, username, dt);
AccumuloOutputFormat.setConnectorInfo(job, username, dt);
// other job setup and then
job.waitForCompletion(true)



//****************************
// this is inside the Mapper class of the M/R job
//****************************
private Scanner index_scanner;

public void setup(Context context) {
    Configuration cfg = context.getConfiguration();

    // properties set and passed from M/R Driver program
    String username = cfg.get("UserName");
    String password = cfg.get("Password");
    String accumuloInstName = cfg.get("InstanceName");
    String zookeepers = cfg.get("Zookeepers");
    String tableName = cfg.get("TableName");
    Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers);
    try {
      AuthenticationToken passwordToken = new PasswordToken(password);

      Connector conn = inst.getConnector(username, passwordToken);

      index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username));
    } catch(Exception e) {
       e.printStackTrace();
    }
}

public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
    String uuid = key.getRow().toString();
    index_scanner.clearColumns();
    index_scanner.setRange(Range.exact(uuid));
    for(Entry<Key, Value> entry : index_scanner) {
        // do some processing in here
    }
}
+4
source share
1 answer

AccumuloInputFormat AccumuloOutputFormat Accumulo*putFormat.setConnectorInfo(job, principle, token). HDFS, AuthenticationTokenSerializer setConnectorInfo, .

KerberosToken, DelegationToken , Token , .

AccumuloInputFormat , Mapper, . , (- ) Mapper, AccumuloInputFormat RecordReader , .

+6

Source: https://habr.com/ru/post/1687730/


All Articles