闪光。卡夫卡消费者没有收到卡夫卡的信息 [英] Flink. Kafka Consumer does not get messages from Kafka

查看:25
本文介绍了闪光。卡夫卡消费者没有收到卡夫卡的信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Mac上将Kafka和Flink作为坞站容器运行。

我已经实现了Flink Job,它应该使用来自Kafka主题的消息。 我运行一个向主题发送消息的python生成器。

作业开始时没有问题,但消息到达。 我相信消息发送到了正确的主题,因为我有能够使用消息的python使用者。

闪烁作业(Java):

package com.p81.datapipeline.swg;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;


public class SWGEventJob {
    private static final Logger LOG = LoggerFactory.getLogger(SWGEventJob.class);

    public static void main(String[] args) throws Exception {

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        final String inputTopic = parameterTool.get("kafka_input_topic","kafka_fake_swg_event_topic_in");
        final String outputTopic = parameterTool.get("kafka_output_topic","kafka_fake_swg_event_topic_out");
        final String consumerGroup = parameterTool.get("kafka_consumer_group","p81_swg_event_consumer_group");
        final String bootstrapServers = parameterTool.get("kafka_bootstrap_servers","broker:29092");
        LOG.info("inputTopic : " + inputTopic);
        LOG.info("outputTopic : " + outputTopic);
        LOG.info("consumerGroup : " + consumerGroup);
        LOG.info("bootstrapServers : " + bootstrapServers);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKafkaConsumer<SWGEvent> swgEventConsumer = createSWGEventConsumer(inputTopic, bootstrapServers, consumerGroup);
        swgEventConsumer.setStartFromEarliest();
        DataStream<SWGEvent> dataStream = env.addSource(swgEventConsumer).name(String.format("SWG Event Kafka Consumer [%s]",inputTopic));
        FlinkKafkaProducer<SWGEvent> swgEventProducer = createSWGEventProducer(outputTopic, bootstrapServers);
        dataStream.map(new SWGEventAnonymizer()).addSink(swgEventProducer).name(String.format("SWG Event Kafka Producer [%s]",outputTopic));
        env.execute("P81 Dummy SWG Event Flink Job");
    }

     static private FlinkKafkaConsumer<SWGEvent> createSWGEventConsumer(String topic, String kafkaAddress, String kafkaGroup) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaAddress);
        properties.setProperty("group.id", kafkaGroup);
        return new FlinkKafkaConsumer<>(topic, new SWGEventDeserializationSchema(), properties);
    }

     static private FlinkKafkaProducer<SWGEvent> createSWGEventProducer(String topic, String kafkaAddress) {
        return new FlinkKafkaProducer<>(kafkaAddress, topic, new SWGEventSerializationSchema());
    }

}

闪烁作业日志:

2021-11-25 10:03:25,282 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: true)
2021-11-25 10:03:25,284 INFO  com.p81.datapipeline.swg.SWGEventJob                         [] - inputTopic : kafka_fake_swg_event_topic_in
2021-11-25 10:03:25,284 INFO  com.p81.datapipeline.swg.SWGEventJob                         [] - outputTopic : kafka_fake_swg_event_topic_out
2021-11-25 10:03:25,284 INFO  com.p81.datapipeline.swg.SWGEventJob                         [] - consumerGroup : p81_swg_event_consumer_group
2021-11-25 10:03:25,284 INFO  com.p81.datapipeline.swg.SWGEventJob                         [] - bootstrapServers : broker:29092
2021-11-25 10:03:26,155 WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer [] - Property [transaction.timeout.ms] not specified. Setting it to 3600000 ms
2021-11-25 10:03:26,202 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 62c766b4ace055cf91f97f1e46f621d1 is submitted.
2021-11-25 10:03:26,202 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=62c766b4ace055cf91f97f1e46f621d1.
2021-11-25 10:03:26,301 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 62c766b4ace055cf91f97f1e46f621d1 (P81 Dummy SWG Event Flink Job).
2021-11-25 10:03:26,302 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 62c766b4ace055cf91f97f1e46f621d1 (P81 Dummy SWG Event Flink Job).
2021-11-25 10:03:26,306 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_15 .
2021-11-25 10:03:26,307 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,309 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,309 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,309 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
2021-11-25 10:03:26,310 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 0 ms
2021-11-25 10:03:26,310 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@252e8634
2021-11-25 10:03:26,310 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'jobmanager'
2021-11-25 10:03:26,310 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
2021-11-25 10:03:26,310 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3931aba0 for P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1).
2021-11-25 10:03:26,311 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1) under job master id 00000000000000000000000000000000.
2021-11-25 10:03:26,318 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-11-25 10:03:26,318 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job P81 Dummy SWG Event Flink Job (62c766b4ace055cf91f97f1e46f621d1) switched from state CREATED to RUNNING.
2021-11-25 10:03:26,319 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from CREATED to SCHEDULED.
2021-11-25 10:03:26,320 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Connecting to ResourceManager akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-11-25 10:03:26,321 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration
2021-11-25 10:03:26,322 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_15 for job 62c766b4ace055cf91f97f1e46f621d1.
2021-11-25 10:03:26,324 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_15 for job 62c766b4ace055cf91f97f1e46f621d1.
2021-11-25 10:03:26,327 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-11-25 10:03:26,328 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 62c766b4ace055cf91f97f1e46f621d1: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2021-11-25 10:03:26,394 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from SCHEDULED to DEPLOYING.
2021-11-25 10:03:26,395 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (attempt #0) with attempt id 87c54365842acb250dc6984b1ca9b466 to 172.18.0.4:35157-adeb80 @ kafka_taskmanager_1.kafka_default (dataPort=41077) with allocation id 968834ad9a512d16050107a088449490
2021-11-25 10:03:26,546 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from DEPLOYING to INITIALIZING.
2021-11-25 10:03:27,597 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: SWG Event Kafka Consumer [kafka_fake_swg_event_topic_in] -> Map -> Sink: SWG Event Kafka Producer [kafka_fake_swg_event_topic_out] (1/1) (87c54365842acb250dc6984b1ca9b466) switched from INITIALIZING to RUNNING.

生产者作业(Python):(在主机上运行-不在docker上运行)

import json
import os
import time
from dataclasses import dataclass, asdict
from random import randint

from kafka import KafkaProducer

import logging

logging.basicConfig(level=logging.INFO)

_METHODS = ['GET'] * 17 + ['POST', 'PUT', 'DELETE']
_ACTIONS = ['ALLOW', 'WARNING', 'BLOCK']

_URLS = ['x']


@dataclass
class SWGEvent:
    url: str
    action: str
    agentId: int
    agentIP: str
    HTTPMethod: str
    timestamp: int


def _get_fake_swg_event() -> SWGEvent:
    url = _URLS[randint(0, len(_URLS) - 1)]
    action = _ACTIONS[randint(0, len(_ACTIONS) - 1)]
    agent_id = randint(1, 1000)
    agent_ip = f'{randint(1, 255)}.{randint(1, 255)}.{randint(1, 255)}.{randint(1, 255)}'
    http_method = _METHODS[randint(0, len(_METHODS) - 1)]
    timestamp = int(time.time())
    return SWGEvent(url, action, agent_id, agent_ip, http_method, timestamp)


def produce(producer: KafkaProducer, topic_name: str) -> None:
    x = 0
    while x < 500:
        event: SWGEvent = _get_fake_swg_event()
        result = producer.send(topic_name, asdict(event))
        x += 1
        time.sleep(1)
    producer.flush()
    logging.info(f'send result: {str(result)}')


if __name__ == '__main__':
    kafka_server = os.getenv('KAFKA_SERVER')
    topic_name = os.getenv('TOPIC_NAME')
    logging.info(f'Producer.Working with server {kafka_server} and topic {topic_name}')
    producer = KafkaProducer(bootstrap_servers=kafka_server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    produce(producer, topic_name)

输出python代码:

INFO:root:Producer.Working with server localhost:9092 and topic kafka_fake_swg_event_topic_in

docker-compose.yml

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8091

  schema-registry:
    image: confluentinc/cp-schema-registry:7.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8091:8091"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8091

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.0.0
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8091'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      KAFKA_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS,HEAD'
  jobmanager:
    image: flink:1.13.2-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager

  taskmanager:
    image: flink:1.13.2-scala_2.12
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

坞站PS

CONTAINER ID   IMAGE                                   COMMAND                  CREATED        STATUS          PORTS                                            NAMES
2f465a0a4129   confluentinc/cp-kafka-rest:7.0.0        "/etc/confluent/dock…"   23 hours ago   Up 23 hours     0.0.0.0:8082->8082/tcp                           rest-proxy
eb25992c47d0   confluentinc/cp-schema-registry:7.0.0   "/etc/confluent/dock…"   23 hours ago   Up 23 hours     8081/tcp, 0.0.0.0:8091->8091/tcp                 schema-registry
1081319da296   confluentinc/cp-kafka:7.0.0             "/etc/confluent/dock…"   23 hours ago   Up 17 hours     0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp   broker
de9056ee250c   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   23 hours ago   Up 28 minutes   6123/tcp, 8081/tcp                               kafka_taskmanager_1
b38beefc35e3   confluentinc/cp-zookeeper:7.0.0         "/etc/confluent/dock…"   23 hours ago   Up 23 hours     2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp       zookeeper
e6db23fa8842   flink:1.13.2-scala_2.12                 "/docker-entrypoint.…"   23 hours ago   Up 18 hours     6123/tcp, 0.0.0.0:8081->8081/tcp                 kafka_jobmanager_1

问题:若要将邮件放入Flink作业,应修复哪些问题?

更新#1 看起来工作正常。由卡夫卡消费者消费并由卡夫卡生产者生产的事件。(我通过查看Flink任务管理器日志了解到这一点。)因此,实际问题是-为什么Flink UI显示零活动?

推荐答案

您正在查看的Flink度量仅测量Flink群集本身内发生的流量(使用Flink的序列化程序和网络堆栈),而忽略作业图边缘的通信(使用连接器的序列化程序和网络)。

换句话说,源从不报告传入的记录,汇点从不报告传出的记录。

此外,在您的工作中,所有操作员都可以链接在一起,因此根本不使用Flink的网络。

是的,这令人困惑。

这篇关于闪光。卡夫卡消费者没有收到卡夫卡的信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆