storm 连接带sasl的kafka

论坛 期权论坛 脚本     
匿名技术用户   2020-12-22 05:20   43   0

弄了3天,终于折腾对了。我的环境是ambari装的kafka和storm,

HDP-2.6 HDP-2.6 STANDARD 2.6.3.0-235

kafka:0.10.1

storm:1.1.0

只给kafka配置了SASL_PLAINTEXT

先上代码。

package example;

import java.util.Properties;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.testing.IdentityBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import example.MatchRules;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.CommonClientConfigs;
import java.util.Properties;
public class KafkaStormKafkaTopology {

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {

        final TopologyBuilder builder = new TopologyBuilder();
        final Fields fields = new Fields("topic", "key", "message");
  
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:6667,192.168.0.2:6667");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
        consumerProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //System.setProperties("java.security.auth.login.config", "d:\\kafka_2.11_auth\\client_cert\\kafka_client_jaas_admin.conf");
        consumerProps.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required "
              + "serviceName=\"kafka\" "
                    + "username=\"kafka\" "
                    + "password=\"123456\";");
        consumerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        consumerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        //consumer.subscribe(Arrays.asList("test3"));
        // Kafka spout getting data from "inputTopicStorm"
        KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig
                .builder(consumerProps.getProperty("bootstrap.servers"), "test3")
                .setGroupId("storm")
                .setProp(consumerProps)
                .setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "message"))
                .build();


        KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);

        // Identity bolt (just for testing, doing nothing)
        IdentityBolt identityBolt = new IdentityBolt(fields);

        // Kafka bolt to send data into "outputTopicStorm"
        KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>()
                .withProducerProperties(consumerProps)
                .withTopicSelector(new DefaultTopicSelector("outputTopicStorm"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>());

        // Building the topology: KafkaSpout -> Identity -> KafkaBolt
        builder.setSpout("kafka-spout", kafkaSpout);
       // builder.setBolt("identity", identityBolt).MatchRules("kafka-spout");
        builder.setBolt("identity",  new MatchRules()).globalGrouping("kafka-spout");
        //builder.setBolt("kafka-bolt", kafkaBolt, 2).globalGrouping("identity");

        // Submit the topology
        Config config = new Config();
        try{
         if (args != null && args.length > 0) 
            {    
                config.setNumWorkers(1);  
                config.setMaxTaskParallelism(1);  
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());  
            } 
            else 
            {  
                //这里是本地模式下运行的启动代码。  
                config.setNumWorkers(1);  
                config.setMaxTaskParallelism(1);  
                LocalCluster cluster = new LocalCluster();  
                cluster.submitTopology("simple", config,  
                  builder.createTopology());
                //Thread.sleep(10000);
               //cluster.killTopology("simple");
               //cluster.shutdown();
            }  
            //cluster.submitTopology("MySqlDemoTopology", config, builder.createTopology());
        }
        catch(Exception ex)
        {
         ex.printStackTrace();
        }
        //StormSubmitter.submitTopology("Kafka-Storm-Kafka", conf, builder.createTopology());

    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>kafka-storm-kafka</groupId>
 <artifactId>kafka-storm-kafka</artifactId>
 <version>0.0.1-SNAPSHOT</version>

 <dependencies>

  <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-core</artifactId>
   <version>1.1.0</version>
   <scope>provided</scope>
  </dependency>

  <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-kafka-client</artifactId>
   <version>1.1.0</version>
  </dependency>

  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.2.1</version>
  </dependency>
  
  <!-- dependency for org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink -->
  <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.6</version>
  </dependency>
<dependency>
      <groupId>org.json</groupId>
      <artifactId>json</artifactId>
      <version>20160810</version>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.3</version>
    <configuration>
     <source>1.8</source>
     <target>1.8</target>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <configuration>
     <createDependencyReducedPom>true</createDependencyReducedPom>
    </configuration>
    <executions>
     <execution>
      <phase>package</phase>
      <goals>
       <goal>shade</goal>
      </goals>
      <configuration>
       <transformers>
        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
         <mainClass>example.KafkaStormKafkaTopology</mainClass>
        </transformer>
       </transformers>
       <filters>
        <filter>
         <artifact>*:*</artifact>
         <excludes>
          <exclude>META-INF/*.SF</exclude>
          <exclude>META-INF/*.DSA</exclude>
          <exclude>META-INF/*.RSA</exclude>
         </excludes>
        </filter>
       </filters>
      </configuration>
     </execution>
    </executions>
   </plugin>
  </plugins>
 </build>

</project>

参考了github的storm-kafka-kerberos-master项目,https://github.com/pvillard31/storm-kafka-kerberos

原作者使用了kerberos,而我只是使用了简单的sasl,所以只是在原项目上做了修改。

我这里的kafka是管理员用户,如果使用普通用户,需要acl另外赋权。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP