|
弄了3天,终于折腾对了。我的环境是ambari装的kafka和storm,
| HDP-2.6 |
HDP-2.6 |
STANDARD |
2.6.3.0-235 |
kafka:0.10.1
storm:1.1.0
只给kafka配置了SASL_PLAINTEXT
先上代码。
package example;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.testing.IdentityBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import example.MatchRules;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.CommonClientConfigs;
import java.util.Properties;
public class KafkaStormKafkaTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
final TopologyBuilder builder = new TopologyBuilder();
final Fields fields = new Fields("topic", "key", "message");
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:6667,192.168.0.2:6667");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
consumerProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//System.setProperties("java.security.auth.login.config", "d:\\kafka_2.11_auth\\client_cert\\kafka_client_jaas_admin.conf");
consumerProps.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ "serviceName=\"kafka\" "
+ "username=\"kafka\" "
+ "password=\"123456\";");
consumerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
consumerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
//consumer.subscribe(Arrays.asList("test3"));
// Kafka spout getting data from "inputTopicStorm"
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig
.builder(consumerProps.getProperty("bootstrap.servers"), "test3")
.setGroupId("storm")
.setProp(consumerProps)
.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "message"))
.build();
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
// Identity bolt (just for testing, doing nothing)
IdentityBolt identityBolt = new IdentityBolt(fields);
// Kafka bolt to send data into "outputTopicStorm"
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>()
.withProducerProperties(consumerProps)
.withTopicSelector(new DefaultTopicSelector("outputTopicStorm"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>());
// Building the topology: KafkaSpout -> Identity -> KafkaBolt
builder.setSpout("kafka-spout", kafkaSpout);
// builder.setBolt("identity", identityBolt).MatchRules("kafka-spout");
builder.setBolt("identity", new MatchRules()).globalGrouping("kafka-spout");
//builder.setBolt("kafka-bolt", kafkaBolt, 2).globalGrouping("identity");
// Submit the topology
Config config = new Config();
try{
if (args != null && args.length > 0)
{
config.setNumWorkers(1);
config.setMaxTaskParallelism(1);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
}
else
{
//这里是本地模式下运行的启动代码。
config.setNumWorkers(1);
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("simple", config,
builder.createTopology());
//Thread.sleep(10000);
//cluster.killTopology("simple");
//cluster.shutdown();
}
//cluster.submitTopology("MySqlDemoTopology", config, builder.createTopology());
}
catch(Exception ex)
{
ex.printStackTrace();
}
//StormSubmitter.submitTopology("Kafka-Storm-Kafka", conf, builder.createTopology());
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kafka-storm-kafka</groupId>
<artifactId>kafka-storm-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<!-- dependency for org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>example.KafkaStormKafkaTopology</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
参考了github的storm-kafka-kerberos-master项目,https://github.com/pvillard31/storm-kafka-kerberos
原作者使用了kerberos,而我只是使用了简单的sasl,所以只是在原项目上做了修改。
我这里的kafka是管理员用户,如果使用普通用户,需要acl另外赋权。 |