如何让我的kafka消息值不返回乱码? [英] How do I make my kafka message value not return gibberish?

查看:1129
本文介绍了如何让我的kafka消息值不返回乱码?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Confluent.Kafka来获取数据,我的代码为(c#):



  public   static   class  Confluent 
{
public static void 耗尽()
{
var conf = new ConsumerConfig
{
GroupId = Guid.NewGuid()。ToString(),
BootstrapServers = pdkafka.test.com:9092 // localhost:9092,
// 注意:AutoOffsetReset属性确定事件中的起始偏移量
// 对于
// ,消费者群体尚未提交任何承诺的抵消额感兴趣的主题/分区。默认情况下,自动提交偏移
// ,因此在此示例中,仅消耗第一次从主题'my-topic'中的
// 最早的消息开始你运行程序。
AutoCommitIntervalMs = 5000
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.SaslPlaintext ,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = test
SaslPassword = 123

};

使用 var c = new ConsumerBuilder<忽略,字符串>(conf).Build())
{
c.Subscribe( prj.basedata); // c.Subscribe(test ); //传入主题名称

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress + =(_,e)= >
{
e.Cancel = ; // 阻止进程终止。
cts.Cancel();
};

尝试
{
while true
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($ 消息消息'{cr.Value}'at:'{cr.TopicPartitionOffset })。
}
catch (ConsumeException e)
{
Console.WriteLine($ 发生错误:{e.Error.Reason});
}
}
}
catch (OperationCanceledException)
{
// 确保消费者干净地离开小组并提交最终抵消。
c.Close();
}
}
}
}





但是回报消息值如下:

 \0\0\0\\\0015 \ u0002 \ u001aSandra LTC75 \ 0 \ 0 \ 0 \0\0\0\0\\\\\\10604012\\\\\\ODM\\\W\\\W\\\X \\\НX\\\X\\\ރY\\\ܯ[\ u0002\\\3\\\\\\8810074\\\\bSONY \0\0\\\\\\-\\\W\\\X\\\W\\\\\\LCM\\\\ fPCJ100\\\W\\\W\\\X\\\НX\\\X\\\ ރY\\\      [\ u0002 \ u0018TOM WH HUANG \\\ \ u00187PD02M450001 \\\\\\\\\\\\\\\\\\\\\\\\\\\\\ fActive\0\\\\\\1\\\\\\-\0\\\\\\132\\\\\\\\\\\\I\\\ÎY





我的尝试:



应该是什么我这样做是为了正常吗?



请帮忙,非常感谢。

解决方案

消息消息{cr.Value}位于:{cr.TopicPartitionOffset}。);
}
catch (ConsumeException e)
{
Console.WriteLine(


< blockquote> 发生错误:{e.Error.Reason});
}
}
}
catch (OperationCanceledException)
{
// 确保消费者干净地离开小组并提交最终抵消。
c.Close();
}
}
}
}





但是回报消息值如下:

 \0\0\0\\\0015 \ u0002 \ u001aSandra LTC75 \ 0 \ 0 \ 0 \0\0\0\0\\\\\\10604012\\\\\\ODM\\\W\\\W\\\X \\\НX\\\X\\\ރY\\\ܯ[\ u0002\\\3\\\\\\8810074\\\\bSONY \0\0\\\\\\-\\\W\\\X\\\W\\\\\\LCM\\\\ fPCJ100\\\W\\\W\\\X\\\НX\\\X\\\ ރY\\\      [\ u0002 \ u0018TOM WH HUANG \\\ \ u00187PD02M450001 \\\\\\\\\\\\\\\\\\\\\\\\\\\\\ fActive\0\\\\\\1\\\\\\-\0\\\\\\132\\\\\\\\\\\\I\\\ÎY





我的尝试:



应该是什么我这样做是为了正常吗?



请帮忙,非常感谢。


Quote:

但返回消息的值如下:

 \0\0\0\ 0 \\ u0015 \ u0002 \ u001aSandra LTC75 \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ \\\W\\\X\\\НX\\\X\\\ރY\\\ܯ[ \\\\\\3\\\\\\8810074\\\\bSONY\0\0\\\\\\-\\\W\\\X\\ u0002W\\\\\\LCM\\\\fPCJ100\\\W\\\W\\\X\\\Н   X\\\     X\\\      Y\\\      [\ u0002 \ u0018TOM WH HUANG \\\ \ u00187PD02M450001 \\\\\\\\\\\\\\\\\\\\\\\ \0\\\\\\1\\\\\\-\0\\\\\\132\\\\\\\\\\\\I\\\ÎY



显然,您收到的消息包含不可打印的字符,这是在消息中编码复杂数据的常用技术,截至今天,我们使用xml编码。您需要知道消息是如何编码才能对其进行解码。

另一个问题是您尝试在utf8中显示消息时可能没有。

建议:将消息粘贴到程序员的编辑器中并切换到十六进制模式,这可能会有所帮助。


i am using Confluent.Kafka to Consume data, and my code as(c#):

public static class Confluent
{
    public static void Consume()
    {
        var conf = new ConsumerConfig
        {
            GroupId = Guid.NewGuid().ToString(),
            BootstrapServers = "pdkafka.test.com:9092",//"localhost:9092",
            // Note: The AutoOffsetReset property determines the start offset in the event
            // there are not yet any committed offsets for the consumer group for the
            // topic/partitions of interest. By default, offsets are committed
            // automatically, so in this example, consumption will only start from the
            // earliest message in the topic 'my-topic' the first time you run the program.
            AutoCommitIntervalMs = 5000,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            SecurityProtocol = SecurityProtocol.SaslPlaintext,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername= "test",
            SaslPassword= "123",

        };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("prj.basedata");//c.Subscribe("test");//传入Topic Name

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}



but the return message value like this:

\0\0\0\0\u0015\u0002\u001aSandra  LTC75\0\0\0\0\0\0\0\u0002\u001010604012\u0002\u0006ODM\u0002����W\u0002�����W\u0002�����X\u0002�Н��X\u0002�����X\u0002���ރY\u0002���ܯ[\u0002\u00023\u0002\u000e8810074\u0002\bSONY\0\0\u0002\u0002-\u0002�����W\u0002����X\u0002�����W\u0002\u0006LCM\u0002\fPCJ100\u0002����W\u0002�����W\u0002�����X\u0002�Н��X\u0002�����X\u0002���ރY\u0002���ܯ[\u0002\u0018TOM WH HUANG\u0002\u00187PD02M450001\0\u0002\fActive\0\u0002\u00021\u0002\u0002-\0\u0002\u0006132\u0002�\u0003\u0002\u0002I\u0002��Î�Y



What I have tried:

what should i do to make this normal?

please help,thanks a lot.

解决方案

"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine(


"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. c.Close(); } } } }



but the return message value like this:

\0\0\0\0\u0015\u0002\u001aSandra  LTC75\0\0\0\0\0\0\0\u0002\u001010604012\u0002\u0006ODM\u0002����W\u0002�����W\u0002�����X\u0002�Н��X\u0002�����X\u0002���ރY\u0002���ܯ[\u0002\u00023\u0002\u000e8810074\u0002\bSONY\0\0\u0002\u0002-\u0002�����W\u0002����X\u0002�����W\u0002\u0006LCM\u0002\fPCJ100\u0002����W\u0002�����W\u0002�����X\u0002�Н��X\u0002�����X\u0002���ރY\u0002���ܯ[\u0002\u0018TOM WH HUANG\u0002\u00187PD02M450001\0\u0002\fActive\0\u0002\u00021\u0002\u0002-\0\u0002\u0006132\u0002�\u0003\u0002\u0002I\u0002��Î�Y



What I have tried:

what should i do to make this normal?

please help,thanks a lot.


Quote:

but the return message value like this:

\0\0\0\0\u0015\u0002\u001aSandra  LTC75\0\0\0\0\0\0\0\u0002\u001010604012\u0002\u0006ODM\u0002����W\u0002�����W\u0002�����X\u0002�Н��X\u0002�����X\u0002���ރY\u0002���ܯ[\u0002\u00023\u0002\u000e8810074\u0002\bSONY\0\0\u0002\u0002-\u0002�����W\u0002����X\u0002�����W\u0002\u0006LCM\u0002\fPCJ100\u0002����W\u0002�����W\u0002�����X\u0002�Н��X\u0002�����X\u0002���ރY\u0002���ܯ[\u0002\u0018TOM WH HUANG\u0002\u00187PD02M450001\0\u0002\fActive\0\u0002\u00021\u0002\u0002-\0\u0002\u0006132\u0002�\u0003\u0002\u0002I\u0002��Î�Y


Obviously, the message you receive contain non printable chars, This is a common technique to encode complex data in a message, As of today, we use xml encoding. You need to know how the message is encoded in order to decode it.
Another problem is that you try to display the message in utf8 when it is probably not.
Advice: paste the message in a programmer's editor and switch to hex mode, it may help.


这篇关于如何让我的kafka消息值不返回乱码?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆