闪光。卡夫卡消费者没有收到卡夫卡的信息 [英] Flink. Kafka Consumer does not get messages from Kafka
本文介绍了闪光。卡夫卡消费者没有收到卡夫卡的信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我在Mac上将Kafka和Flink作为坞站容器运行。
我已经实现了Flink Job,它应该使用来自Kafka主题的消息。 我运行一个向主题发送消息的python生成器。
作业开始时没有问题,但零消息到达。 我相信消息发送到了正确的主题,因为我有能够使用消息的python使用者。
闪烁作业(Java):
package com.p81.datapipeline.swg;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SWGEventJob {
private static final Logger LOG = LoggerFactory.getLogger(SWGEventJob.class);
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final String inputTopic = parameterTool.get("kafka_input_topic","kafka_fake_swg_event_topic_in");
final String outputTopic = parameterTool.get("kafka_output_topic","kafka_fake_swg_event_topic_out");
final String consumerGroup = parameterTool.get("kafka_consumer_group","p81_swg_event_consumer_group");
final String bootstrapServers = parameterTool.get("kafka_bootstrap_servers","broker:29092");
LOG.info("inputTopic : " + inputTopic);
LOG.info("outputTopic : " + outputTopic);
LOG.info("consumerGroup : " + consumerGroup);
LOG.info("bootstrapServers : " + bootstrapServers);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<SWGEvent> swgEventConsumer = createSWGEventConsumer(inputTopic, bootstrapServers, consumerGroup);
swgEventConsumer.setStartFromEarliest();
DataStream<SWGEvent> dataStream = env.addSource(swgEventConsumer).name(String.format("SWG Event Kafka Consumer [%s]",inputTopic));
FlinkKafkaProducer<SWGEvent> swgEventProducer = createSWGEventProducer(outputTopic, bootstrapServers);
dataStream.map(new SWGEventAnonymizer()).addSink(swgEventProducer).name(String.format("SWG Event Kafka Producer [%s]",outputTopic));
env.execute("P81 Dummy SWG Event Flink Job");
}
static private FlinkKafkaConsumer<SWGEvent> createSWGEventConsumer(String topic, String kafkaAddress, String kafkaGroup) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id", kafkaGroup);
return new FlinkKafkaConsumer<>(topic, new SWGEventDeserializationSchema(), properties);
}
static private FlinkKafkaProducer<SWGEvent> createSWGEventProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer<>(kafkaAddress, topic, new SWGEventSerializationSchema());
}
}
闪烁作业日志:
2021-11-25 10:03:25,282 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true)
2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - inputTopic : kafka_fake_swg_event_topic_in
2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - outputTopic : kafka_fake_swg_event_topic_out
2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - consumerGroup : p81_swg_event_consumer_group
2021-11-25 10:03:25,284 INFO com.p81.datapipeline.swg.SWGEventJob [] - bootstrapServers : broker:29092
2021-11-25 10:03:26,155 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Property [transaction.timeout.ms] not specified. Setting it to 3600000 ms
2021-11-25 10:03:26,202 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 62c766b4ace055cf91f97f1e46f621d1 is submitted.
2021-11-25 10:03:26,202 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=62c766b4ace055cf91f97f1e46f621d1.
2021-11-25 10:03:26,301 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 62c766b4ace055cf91f97f1e46f621d1 (P81 Dummy SWG Event Flink Job).
2021-11-25 10:03:26,302 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 62c766b4ace055cf91f97f1e46f621d1 (P81 Dummy SWG Event Flink Job).
2021-11-25 10:03:26,306 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_15 .
2021-11-25 10:03:26,307 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,309 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,309 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,309 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms.
2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 0 ms
2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@252e8634
2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'jobmanager'
2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore.
2021-11-25 10:03:26,310 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3931aba0 for P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,311 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1) under job master id 00000000000000000000000000000000.
2021-11-25 10:03:26,318 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-11-25 10:03:26,318 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1) switched from state CREATED to RUNNING.
2021-11-25 10:03:26,319 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from CREATED to SCHEDULED.
2021-11-25 10:03:26,320 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Connecting to ResourceManager akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-11-25 10:03:26,321 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved ResourceManager address, beginning registration
2021-11-25 10:03:26,322 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_15 for job 62c766b4ace055cf91f97f1e46f621d1.
2021-11-25 10:03:26,324 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_15 for job 62c766b4ace055cf91f97f1e46f621d1.
2021-11-25 10:03:26,327 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-11-25 10:03:26,328 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 62c766b4ace055cf91f97f1e46f621d1: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2021-11-25 10:03:26,394 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from SCHEDULED to DEPLOYING.
2021-11-25 10:03:26,395 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (attempt #0) with attempt id 87c54365842acb250dc6984b1ca9b466 to 172.18.0.4:35157-adeb80 @ kafka_taskmanager_1.kafka_default (dataPort=41077) with allocation id 968834ad9a512d16050107a088449490
2021-11-25 10:03:26,546 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from DEPLOYING to INITIALIZING.
2021-11-25 10:03:27,597 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from INITIALIZING to RUNNING.
生产者作业(Python):(在主机上运行-不在docker上运行)
import json
import os
import time
from dataclasses import dataclass, asdict
from random import randint
from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)
_METHODS = ['GET'] * 17 + ['POST', 'PUT', 'DELETE']
_ACTIONS = ['ALLOW', 'WARNING', 'BLOCK']
_URLS = ['x']
@dataclass
class SWGEvent:
url: str
action: str
agentId: int
agentIP: str
HTTPMethod: str
timestamp: int
def _get_fake_swg_event() -> SWGEvent:
url = _URLS[randint(0, len(_URLS) - 1)]
action = _ACTIONS[randint(0, len(_ACTIONS) - 1)]
agent_id = randint(1, 1000)
agent_ip = f'{randint(1, 255)}.{randint(1, 255)}.{randint(1, 255)}.{randint(1, 255)}'
http_method = _METHODS[randint(0, len(_METHODS) - 1)]
timestamp = int(time.time())
return SWGEvent(url, action, agent_id, agent_ip, http_method, timestamp)
def produce(producer: KafkaProducer, topic_name: str) -> None:
x = 0
while x < 500:
event: SWGEvent = _get_fake_swg_event()
result = producer.send(topic_name, asdict(event))
x += 1
time.sleep(1)
producer.flush()
logging.info(f'send result: {str(result)}')
if __name__ == '__main__':
kafka_server = os.getenv('KAFKA_SERVER')
topic_name = os.getenv('TOPIC_NAME')
logging.info(f'Producer.Working with server {kafka_server} and topic {topic_name}')
producer = KafkaProducer(bootstrap_servers=kafka_server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
produce(producer, topic_name)
输出python代码:
INFO:root:Producer.Working with server localhost:9092 and topic kafka_fake_swg_event_topic_in
docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091
schema-registry:
image: confluentinc/cp-schema-registry:7.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8091:8091"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091
rest-proxy:
image: confluentinc/cp-kafka-rest:7.0.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'
jobmanager:
image: flink:1.13.2-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.13.2-scala_2.12
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
坞站PS
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2f465a0a4129 confluentinc/cp-kafka-rest:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 23 hours 0.0.0.0:8082->8082/tcp rest-proxy
eb25992c47d0 confluentinc/cp-schema-registry:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 23 hours 8081/tcp, 0.0.0.0:8091->8091/tcp schema-registry
1081319da296 confluentinc/cp-kafka:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 17 hours 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp broker
de9056ee250c flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 23 hours ago Up 28 minutes 6123/tcp, 8081/tcp kafka_taskmanager_1
b38beefc35e3 confluentinc/cp-zookeeper:7.0.0 "/etc/confluent/dock…" 23 hours ago Up 23 hours 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
e6db23fa8842 flink:1.13.2-scala_2.12 "/docker-entrypoint.…" 23 hours ago Up 18 hours 6123/tcp, 0.0.0.0:8081->8081/tcp kafka_jobmanager_1
问题:若要将邮件放入Flink作业,应修复哪些问题?
更新#1 看起来工作正常。由卡夫卡消费者消费并由卡夫卡生产者生产的事件。(我通过查看Flink任务管理器日志了解到这一点。)因此,实际问题是-为什么Flink UI显示零活动?
推荐答案
您正在查看的Flink度量仅测量Flink群集本身内发生的流量(使用Flink的序列化程序和网络堆栈),而忽略作业图边缘的通信(使用连接器的序列化程序和网络)。
换句话说,源从不报告传入的记录,汇点从不报告传出的记录。
此外,在您的工作中,所有操作员都可以链接在一起,因此根本不使用Flink的网络。
是的,这令人困惑。
这篇关于闪光。卡夫卡消费者没有收到卡夫卡的信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文