您好,欢迎来到站长目录(28sn.com)!


Kafka快速入门(八)——Confluent Kafka简介

来源:网络整理 浏览:207次 时间:2020-11-29
Kafka快速入门(八)——Confluent Kafka简介一、Confluent Kafka简介1、Confluent Kafka简介

2014年,Kafka的创始人Jay Kreps、NahaNarkhede和饶军离开LinkedIn创立Confluent公司,专注于提供基于Kafka的企业级流处理解决方案,并发布了Confluent Kafka。Confluent Kafka分为开源版和企业版,企业版收费。

2、Confluent Kafka特性

Confluent Kafka开源版特性如下:
(1)Confluent Kafka Connectors:支持Kafka Connect JDBC Connector、Kafka Connect HDFS Connector、Kafka Connect Elasticsearch Connector、Kafka Connect S3 Connector。
(2)多客户端支持:支持C/C++、Python、Go、.Net、Java客户端。
(3)Confluent Schema Registry
(4)Confluent Kafka REST Proxy
Confluent Kafka企业版特性如下:
(1)Automatic Data Balancing
(2)Multi-DataCenter Replication
(3)Confluent Control Center
(4)JMS Client

3、Confluent Kafka客户端

Kafka快速入门(八)——Confluent Kafka简介

4、Confluent Kafka客户端特性支持

Kafka快速入门(八)——Confluent Kafka简介

二、 Confluent Kafka组件1、Schema Registry

当通过网络发送数据或将数据存储在文件中时,需要对数据进行序列化。常见跨语言的序列化库如Avro、Thrift和Protocol buffer,存在两个明显的缺点:
(1)数据生产者和数据消费者间缺乏契约。如果上游生产者随意更改数据格式,很难确保下游消费者能够正确解释数据。
(2)开销和冗余。所有消息都会显示保存相同字段名和类型信息,存在大量冗余数据。
无论是使用传统的Avro API自定义序列化类和反序列化类还是使用Twitter的Bijection类库实现Avro的序列化与反序列化,都会在每条Kafka记录里都嵌入schema,会让记录的大小成倍地增加。在读取记录时需要用到整个schema,Schema Registry可以让所有记录共用一个schema。
Kafka快速入门(八)——Confluent Kafka简介
生产者把写入消息需要用到的schema保存到Schema Registry,然后在消息中引用schema的ID。消费者使用ID从Schema Registry里拉取schema来反序列化消息。
User的schema文件如下:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}

Confluent Schema Registry为开发人员提供了一个RESTful接口,以便为事件定义标准schema,在整个组织中共享标准schema,并以向后兼容和未来证明的方式安全地对其进行改进。

2、Control Center

Confluent Control Center是一个对整个产品进行管理的控制中心,最主要的功能对Kafka集群的各个生产者和消费者的进行性能监控,同时可以很容易地管理Kafka的连接、创建、编辑和管理与其它系统的连接。
Confluent Control Center只在Confluent Kafk企业版提供支持。
Kafka快速入门(八)——Confluent Kafka简介

3、KSQL

KSQL是面向Apache Kafka的一种流式SQL引擎,为使用Kafka处理数据提供了一种简单的、完全交互的SQL界面,支持众多功能强大的数据流处理操作,包括聚合、连接、加窗(windowing)和sessionization(捕获单一访问者的网站会话时间范围内所有的点击流事件)等等。
KSQL目前还无法执行查询,KSQL提供的是对Kafka中的数据流执行连续查询,即随着新数据不断流入,转换在连续进行。

4、Confluent Replicator

Confluent Platform可以轻松地在多个数据中心内维护多个Kafka群集。Confluent Replicator提供了管理数据中心之间的数据复制和topic配置的功能,应用场景如下:
(1)ative-active地理定位部署:允许用户访问最近(附近)的数据中心,以优化其架构,实现低延迟和高性能。
(2)集中分析:将来自多个Kafka集群的数据聚合到一个地方,以进行组织范围的分析。
(3)云迁移:可以使用Kafka完成本地应用与云之间的数据迁移。

5、Confluent Auto Data Balancer

随着集群的增长,topic和partition以不同的速度增长,随着时间的推移,添加和删除会导致跨数据中心资源的工作负载不平衡(数据倾斜)。Confluent Auto Data Balancer会监控集群中的Broker数量、partition大小、partition数量,允许转移数据以在整个群集中创建均匀的工作负载,同时限制重新平衡流量,以最大限度地减少重新平衡时对生产工作负载的影响。

6、Kafka Connect

Kafka Connect是Kafka的一个开源组件,是用来将Kafka与数据库、key-value存储系统、搜索系统、文件系统等外部系统连接起来的基础框架。
通过使用Kafka Connect框架以及现有的Connector可以实现从源数据读入消息到Kafka,再从Kafka读出消息到目的地的功能。
Confluent在Kafka Connect基础上提供了对多种常用系统的支持:
(1)Kafka Connect ActiveMQ Connector
(2)Kafka FileStream Connectors
(3)Kafka Connect HDFS
(4)Kafka Connect JDBC Connector
(5)Confluent Kafka Replicator
(6)Kafka Connect S3
(7)Kafka Connect Elasticsearch Connector
(8)Kafka Connect IBM MQ Connector
(9)Kafka Connect JMS Connector

7、MQTT Proxy333

MQTT Proxy提供了允许MQTT客户端以Kafka原生方式直接向Kafka发送消息的可伸缩的轻量级接口。
MQTT Proxy用于把基于物联网(IoT)的应用程序和Kafka 集成,可以用于联网汽车、制造业生产线和预测性维护应用程序中。
MQTT Proxy使用传输层安全(TLS)加密和基本身份验证,支持 MQTT 3.1.1 协议,可以发布所有三个MQTT服务质量级别的消息,服务级别包括把消息发送给消费者:1)最多一次(即发即弃);2)至少一次(需要确认);3)恰好一次。

三、Confluent Kafka Docker部署1、Confluent Kafka一站式部署

Confluent提供了Confluent Kafka所有组件的一站式部署,即cp-all-in-one项目,分为企业版本和开源社区版本,企业版比社区版本多了Control Center服务。
GitHub项目地址:
https://github.com/confluentinc/cp-all-in-one

2、Confluent Kafka企业版

docker-compose.yml文件:

version: '2'services:  zookeeper:    image: confluentinc/cp-zookeeper:5.5.0    hostname: zookeeper    container_name: zookeeper    ports:      - "2181:2181"    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_TICK_TIME: 2000  broker:    image: confluentinc/cp-server:5.5.0    hostname: broker    container_name: broker    depends_on:      - zookeeper    ports:      - "9092:9092"    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1      CONFLUENT_METRICS_ENABLE: 'true'      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'  schema-registry:    image: confluentinc/cp-schema-registry:5.5.0    hostname: schema-registry    container_name: schema-registry    depends_on:      - zookeeper      - broker    ports:      - "8081:8081"    environment:      SCHEMA_REGISTRY_HOST_NAME: schema-registry      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'  connect:    image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0    hostname: connect    container_name: connect    depends_on:      - zookeeper      - broker      - schema-registry    ports:      - "8083:8083"    environment:      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'      CONNECT_REST_ADVERTISED_HOST_NAME: connect      CONNECT_REST_PORT: 8083      CONNECT_GROUP_ID: compose-connect-group      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'      # CLASSPATH required due to CC-2422      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR  control-center:    image: confluentinc/cp-enterprise-control-center:5.5.0    hostname: control-center    container_name: control-center    depends_on:      - zookeeper      - broker      - schema-registry      - connect      - ksqldb-server    ports:      - "9021:9021"    environment:      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"      CONTROL_CENTER_REPLICATION_FACTOR: 1      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1      CONFLUENT_METRICS_TOPIC_REPLICATION: 1      PORT: 9021  ksqldb-server:    image: confluentinc/cp-ksqldb-server:5.5.0    hostname: ksqldb-server    container_name: ksqldb-server    depends_on:      - broker      - connect    ports:      - "8088:8088"    environment:      KSQL_CONFIG_DIR: "/etc/ksql"      KSQL_BOOTSTRAP_SERVERS: "broker:29092"      KSQL_HOST_NAME: ksqldb-server      KSQL_LISTENERS: "http://0.0.0.0:8088"      KSQL_CACHE_MAX_BYTES_BUFFERING: 0      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"      KSQL_KSQL_CONNECT_URL: "http://connect:8083"  ksqldb-cli:    image: confluentinc/cp-ksqldb-cli:5.5.0    container_name: ksqldb-cli    depends_on:      - broker      - connect      - ksqldb-server    entrypoint: /bin/sh    tty: true  ksql-datagen:    image: confluentinc/ksqldb-examples:5.5.0    hostname: ksql-datagen    container_name: ksql-datagen    depends_on:      - ksqldb-server      - broker      - schema-registry      - connect    command: "bash -c 'echo Waiting for Kafka to be ready... && \                       cub kafka-ready -b broker:29092 1 40 && \                       echo Waiting for Confluent Schema Registry to be ready... && \                       cub sr-ready schema-registry 8081 40 && \                       echo Waiting a few seconds for topic creation to finish... && \                       sleep 11 && \                       tail -f /dev/null'"    environment:      KSQL_CONFIG_DIR: "/etc/ksql"      STREAMS_BOOTSTRAP_SERVERS: broker:29092      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry      STREAMS_SCHEMA_REGISTRY_PORT: 8081  rest-proxy:    image: confluentinc/cp-kafka-rest:5.5.0    depends_on:      - zookeeper      - broker      - schema-registry    ports:      - 8082:8082    hostname: rest-proxy    container_name: rest-proxy    environment:      KAFKA_REST_HOST_NAME: rest-proxy      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

启动容器服务:
docker-compose -f docker-compose.yml up -d
关闭容器服务:
docker-compose -f docker-compose.yml down

3、Confluent Kafka社区版

docker-compose.yml文件:

version: '2'services:  zookeeper:    image: confluentinc/cp-zookeeper:5.5.0    hostname: zookeeper    container_name: zookeeper    ports:      - "2181:2181"    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_TICK_TIME: 2000  broker:    image: confluentinc/cp-kafka:5.5.0    hostname: broker    container_name: broker    depends_on:      - zookeeper    ports:      - "29092:29092"      - "9092:9092"    environment:      KAFKA_BROKER_ID: 1      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0  schema-registry:    image: confluentinc/cp-schema-registry:5.5.0    hostname: schema-registry    container_name: schema-registry    depends_on:      - zookeeper      - broker    ports:      - "8081:8081"    environment:      SCHEMA_REGISTRY_HOST_NAME: schema-registry      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'  connect:    image: cnfldemos/kafka-connect-datagen:0.3.2-5.5.0    hostname: connect    container_name: connect    depends_on:      - zookeeper      - broker      - schema-registry    ports:      - "8083:8083"    environment:      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'      CONNECT_REST_ADVERTISED_HOST_NAME: connect      CONNECT_REST_PORT: 8083      CONNECT_GROUP_ID: compose-connect-group      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR  ksqldb-server:    image: confluentinc/cp-ksqldb-server:5.5.0    hostname: ksqldb-server    container_name: ksqldb-server    depends_on:      - broker      - connect    ports:      - "8088:8088"    environment:      KSQL_CONFIG_DIR: "/etc/ksql"      KSQL_BOOTSTRAP_SERVERS: "broker:29092"      KSQL_HOST_NAME: ksqldb-server      KSQL_LISTENERS: "http://0.0.0.0:8088"      KSQL_CACHE_MAX_BYTES_BUFFERING: 0      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"      KSQL_KSQL_CONNECT_URL: "http://connect:8083"  ksqldb-cli:    image: confluentinc/cp-ksqldb-cli:5.5.0    container_name: ksqldb-cli    depends_on:      - broker      - connect      - ksqldb-server    entrypoint: /bin/sh    tty: true  ksql-datagen:    image: confluentinc/ksqldb-examples:5.5.0    hostname: ksql-datagen    container_name: ksql-datagen    depends_on:      - ksqldb-server      - broker      - schema-registry      - connect    command: "bash -c 'echo Waiting for Kafka to be ready... && \                       cub kafka-ready -b broker:29092 1 40 && \                       echo Waiting for Confluent Schema Registry to be ready... && \                       cub sr-ready schema-registry 8081 40 && \                       echo Waiting a few seconds for topic creation to finish... && \                       sleep 11 && \                       tail -f /dev/null'"    environment:      KSQL_CONFIG_DIR: "/etc/ksql"      STREAMS_BOOTSTRAP_SERVERS: broker:29092      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry      STREAMS_SCHEMA_REGISTRY_PORT: 8081  rest-proxy:    image: confluentinc/cp-kafka-rest:5.5.0    depends_on:      - zookeeper      - broker      - schema-registry    ports:      - 8082:8082    hostname: rest-proxy    container_name: rest-proxy    environment:      KAFKA_REST_HOST_NAME: rest-proxy      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

启动容器服务:
docker-compose -f docker-compose.yml up -d
关闭容器服务:
docker-compose -f docker-compose.yml down

4、Confluent Kafka云服务版

docker-compose.yml文件:

version: '2'services:  schema-registry:    image: confluentinc/cp-schema-registry:5.5.0    hostname: schema-registry    container_name: schema-registry    ports:      - '8085:8085'    environment:      SCHEMA_REGISTRY_HOST_NAME: schema-registry      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085      SCHEMA_REGISTRY_KAFKASTORE_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: "SASL_SSL"      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS      SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: "PLAIN"  ksqldb-server:    image: confluentinc/cp-ksqldb-server:5.5.0    hostname: ksqldb-server    container_name: ksqldb-server    ports:      - "8089:8089"    environment:      KSQL_HOST_NAME: ksqldb-server      KSQL_CONFIG_DIR: "/etc/ksql"      KSQL_LISTENERS: "http://0.0.0.0:8089"      KSQL_AUTO_OFFSET_RESET: "earliest"      KSQL_COMMIT_INTERVAL_MS: 0      KSQL_CACHE_MAX_BYTES_BUFFERING: 0      KSQL_KSQL_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL      KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE      KSQL_KSQL_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO      KSQL_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS      KSQL_SECURITY_PROTOCOL: "SASL_SSL"      KSQL_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      KSQL_SASL_MECHANISM: "PLAIN"      KSQL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"      KSQL_KSQL_STREAMS_PRODUCER_RETRIES: 2147483647      KSQL_KSQL_STREAMS_PRODUCER_CONFLUENT_BATCH_EXPIRE_MS: 9223372036854775807      KSQL_KSQL_STREAMS_PRODUCER_REQUEST_TIMEOUT_MS: 300000      KSQL_KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807      KSQL_KSQL_STREAMS_PRODUCER_DELIVERY_TIMEOUT_MS: 2147483647      KSQL_KSQL_STREAMS_REPLICATION_FACTOR: 3      KSQL_KSQL_INTERNAL_TOPIC_REPLICAS: 3      KSQL_KSQL_SINK_REPLICAS: 3      # Producer Confluent Monitoring Interceptors for Control Center streams monitoring      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"      KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN      KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: "SASL_SSL"      KSQL_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      # Consumer Confluent Monitoring Interceptors for Control Center streams monitoring      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"      KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN      KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: "SASL_SSL"      KSQL_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      KSQL_KSQL_CONNECT_URL: "http://connect:8083"  ksqldb-cli:    image: confluentinc/cp-ksqldb-cli:5.5.0    container_name: ksqldb-cli    entrypoint: /bin/sh    tty: true  control-center:    image: confluentinc/cp-enterprise-control-center:5.5.0    hostname: control-center    container_name: control-center    ports:      - "9021:9021"    environment:      CONTROL_CENTER_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8089"      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8089"      CONTROL_CENTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL      CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE      CONTROL_CENTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO      CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"      CONTROL_CENTER_STREAMS_SECURITY_PROTOCOL: SASL_SSL      CONTROL_CENTER_STREAMS_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      CONTROL_CENTER_STREAMS_SASL_MECHANISM: PLAIN      CONTROL_CENTER_REPLICATION_FACTOR: 3      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 3      CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 3      CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 3      CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 3      CONFLUENT_METRICS_TOPIC_REPLICATION: 3      CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 3      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1      # Workaround for MMA-3564      CONTROL_CENTER_METRICS_TOPIC_MAX_MESSAGE_BYTES: 8388608      PORT: 9021  connect:    image: cnfldemos/cp-server-connect-datagen:0.3.2-5.5.0    hostname: connect    container_name: connect    ports:      - "8083:8083"    volumes:      - mi2:/usr/share/java/monitoring-interceptors/    environment:      CONNECT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS      CONNECT_REST_PORT: 8083      CONNECT_GROUP_ID: "connect"      CONNECT_CONFIG_STORAGE_TOPIC: demo-connect-configs      CONNECT_OFFSET_STORAGE_TOPIC: demo-connect-offsets      CONNECT_STATUS_STORAGE_TOPIC: demo-connect-status      CONNECT_REPLICATION_FACTOR: 3      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL      CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR      # CLASSPATH required due to CC-2422      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.0.jar      # Connect worker      CONNECT_SECURITY_PROTOCOL: SASL_SSL      CONNECT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      CONNECT_SASL_MECHANISM: PLAIN      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"      # Connect producer      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL      CONNECT_PRODUCER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN      # Connect consumer      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL      CONNECT_CONSUMER_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL: SASL_SSL      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM: PLAIN  rest-proxy:    image: confluentinc/cp-kafka-rest:5.5.0    ports:      - 8082:8082    hostname: rest-proxy    container_name: rest-proxy    environment:      KAFKA_REST_HOST_NAME: rest-proxy      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"      KAFKA_REST_SCHEMA_REGISTRY_URL: $SCHEMA_REGISTRY_URL      KAFKA_REST_CLIENT_BASIC_AUTH_CREDENTIALS_SOURCE: $BASIC_AUTH_CREDENTIALS_SOURCE      KAFKA_REST_CLIENT_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: $SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO      KAFKA_REST_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS      KAFKA_REST_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"      KAFKA_REST_SECURITY_PROTOCOL: "SASL_SSL"      KAFKA_REST_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      KAFKA_REST_SASL_MECHANISM: "PLAIN"      KAFKA_REST_CLIENT_BOOTSTRAP_SERVERS: $BOOTSTRAP_SERVERS      KAFKA_REST_CLIENT_SSL_ENDPOINT_IDENTIFIED_ALGORITHM: "https"      KAFKA_REST_CLIENT_SECURITY_PROTOCOL: "SASL_SSL"      KAFKA_REST_CLIENT_SASL_JAAS_CONFIG: $SASL_JAAS_CONFIG      KAFKA_REST_CLIENT_SASL_MECHANISM: "PLAIN"volumes:    mi2: {}

启动容器服务:
docker-compose -f docker-compose.yml up -d
关闭容器服务:
docker-compose -f docker-compose.yml down

推荐站点

  • 我爱发烧音乐我爱发烧音乐

    我爱发烧音乐囊括了从流行音乐到古典音乐多个类型的音乐作品,专栏推荐最新的音乐,提供音乐排名榜单!可供免费线上收听音乐,歌曲流畅,音效极佳! 网站提供的钢琴以及二胡专栏,可供收听者,陶冶情操,改善心情,是难得的轻音乐典藏!

    www.520fs.com
  • 世纪音乐网世纪音乐网

    世纪音乐网是专业的在线音乐试听MP3下载网站。歌曲总计30余万首,收录了网上最新歌曲和流行音乐,DJ舞曲,非主流音乐,经典老歌,劲舞团歌曲,搞笑歌曲,儿童歌曲,英文歌曲等。是您上网听歌的最佳网站。

    www.ssjj.com
  • 怒江大峡谷网怒江大峡谷网

    怒江大峡谷网内容包括:新闻、要闻、怒江报、视频、文化、民俗、人文、音乐、政务、公告、政策等地方信息。

    www.nujiang.cn
  • 杭州网杭州网

      杭州网是杭州地区唯一的新闻门户网站,由中共杭州市委宣传部、杭州日报报业集团和杭州广播电视集团共同组建的杭州网络传媒有限公司运营。

    www.hangzhou.com.cn
  • 深圳在线深圳在线

      深圳在线 www.szol.net是深圳本地最大、最早的地方生活资讯网站之一,网站名“深圳在线www.szol.net”由南方报业传媒集团编辑委员会总编辑、南方日报社总编辑、南方都市报总编辑、南方书画院名誉院长王春芙亲笔题名,深圳在线www.szol.net团队与深圳热线www.szonline.net、奥一网www.oeeee.com都源于全国最早成立于1996年的知名网络公司——深圳万用网。

    www.szol.net

鄂公网安备 42062502000001号