FFmpeg:使用自定义线程池进行并行编码 [英] FFmpeg: Parallel encoding with custom thread pool

查看:84
本文介绍了FFmpeg:使用自定义线程池进行并行编码的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图实现的一件事是通过 FFmpeg 的 c API 进行并行编码.这看起来很好地开箱即用;但是,我稍微更改了目标帖子:

One of the things I'm trying to achieve is parallel encoding via FFmpeg's c API. This looks to work out of the box quite nicely; however, I've changed the goal posts slightly:

在现有的应用程序中,我已经有了一个线程池.我想在我的应用程序中重用现有的线程池,而不是通过 FFmpeg 使用另一个线程池.研究了最新的 FFmpeg 主干文档后,看起来很有可能.

In an existing application, I already have a thread pool at hand. Instead of using another thread pool via FFmpeg, I would like reuse the existing thread pool in my application. Having studied the latest FFmpeg trunk docs, it very much looks possible.

使用一些 FFmpeg 示例代码,我创建了一个示例应用程序来演示我想要实现的目标(见下文).示例应用使用 mp2v 编解码器生成纯视频 mpeg2 ts.

Using some FFmpeg sample code, I've created a sample application to demonstrate what I'm trying to achieve (see below). The sample app generates a video-only mpeg2 ts using the mp2v codec.

我遇到的问题是从未调用自定义thread_execute"或thread_execute2".尽管编解码器似乎表明支持线程,但这仍然存在.请注意,我还没有进入线程池.我的第一个目标是让它调用自定义函数指针.

The problem I'm experiencing is that the custom 'thread_execute' or 'thread_execute2' are never invoked. This is despite the fact that the codec appears to indicate that threading is supported. Please be aware that I have not yet plumbed in the thread pool just yet. My first goal is for it to call the custom function pointer.

我尝试在 FFmpeg 邮件列表上寻求帮助,但无济于事.

I've tried to get assistance on the FFmpeg mailing lists but to no avail.

#include <iostream>
#include <thread>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <cstring>
#include <future>

extern "C"
{
#include <libavutil/avassert.h>
#include <libavutil/channel_layout.h>
#include <libavutil/opt.h>
#include <libavutil/timestamp.h>
#include <libavformat/avformat.h>
//#include <libswscale/swscale.h>
#include <libswresample/swresample.h>
}

#define STREAM_DURATION   1000.0
#define STREAM_FRAME_RATE 25 /* 25 images/s */
#define STREAM_PIX_FMT    AV_PIX_FMT_YUV420P /* default pix_fmt */

#define SCALE_FLAGS SWS_BICUBIC

// a wrapper around a single output AVStream
typedef struct OutputStream {
    AVStream *st;
    AVCodecContext *enc;

    /* pts of the next frame that will be generated */
    int64_t next_pts;
    int samples_count;

    AVFrame *frame;
    AVFrame *tmp_frame;

    float t, tincr, tincr2;

    struct SwsContext *sws_ctx;
    struct SwrContext *swr_ctx;
} OutputStream;

/////////////////////////////////////////////////////////////////////////////
//  The ffmpeg variation raises compiler warnings.
char *cb_av_ts2str(char *buf, int64_t ts)
{
    std::memset(buf,0,AV_TS_MAX_STRING_SIZE);
    return av_ts_make_string(buf,ts);
}

/////////////////////////////////////////////////////////////////////////////
//  The ffmpeg variation raises compiler warnings.
char *cb_av_ts2timestr(char *buf, int64_t ts, AVRational *tb)
{
    std::memset(buf,0,sizeof(AV_TS_MAX_STRING_SIZE));
    return av_ts_make_time_string(buf,ts,tb);
}

/////////////////////////////////////////////////////////////////////////////
//  The ffmpeg variation raises compiler warnings.
char *cb_av_err2str(char *errbuf, size_t errbuf_size, int errnum)
{
    std::memset(errbuf,0,errbuf_size);
    return av_make_error_string(errbuf,errbuf_size,errnum);
}

int thread_execute(AVCodecContext* s, int (*func)(AVCodecContext *c2, void *arg2), void* arg, int* ret, int count, int size)
{
    // Do it all serially for now
    std::cout << "thread_execute" << std::endl;

    for (int k = 0; k < count; ++k)
    {
        ret[k] = func(s, arg);
    }

    return 0;
}

int thread_execute2(AVCodecContext* s, int (*func)(AVCodecContext* c2, void* arg2, int, int), void* arg, int* ret, int count)
{
    // Do it all serially for now
    std::cout << "thread_execute2" << std::endl;

    for (int k = 0; k < count; ++k)
    {
        ret[k] = func(s, arg, k, count);
    }

    return 0;
}


static void log_packet(const AVFormatContext *fmt_ctx, const AVPacket *pkt)
{
    char s[AV_TS_MAX_STRING_SIZE];

    AVRational *time_base = &fmt_ctx->streams[pkt->stream_index]->time_base;

    printf("pts:%s pts_time:%s dts:%s dts_time:%s duration:%s duration_time:%s stream_index:%d\n",
           cb_av_ts2str(s,pkt->pts), cb_av_ts2timestr(s,pkt->pts, time_base),
           cb_av_ts2str(s,pkt->dts), cb_av_ts2timestr(s,pkt->dts, time_base),
           cb_av_ts2str(s,pkt->duration), cb_av_ts2timestr(s,pkt->duration, time_base),
           pkt->stream_index);
}

static int write_frame(AVFormatContext *fmt_ctx, const AVRational *time_base, AVStream *st, AVPacket *pkt)
{
    /* rescale output packet timestamp values from codec to stream timebase */
    av_packet_rescale_ts(pkt, *time_base, st->time_base);
    pkt->stream_index = st->index;

    /* Write the compressed frame to the media file. */
    log_packet(fmt_ctx, pkt);
    return av_interleaved_write_frame(fmt_ctx, pkt);
}

/* Add an output stream. */
static void add_stream(OutputStream *ost, AVFormatContext *oc,
                       AVCodec **codec,
                       enum AVCodecID codec_id)
{
    AVCodecContext *c;
    int i;

    /* find the encoder */
    *codec = avcodec_find_encoder(codec_id);
    if (!(*codec)) {
        fprintf(stderr, "Could not find encoder for '%s'\n",
                avcodec_get_name(codec_id));
        exit(1);
    }

    ost->st = avformat_new_stream(oc, NULL);
    if (!ost->st) {
        fprintf(stderr, "Could not allocate stream\n");
        exit(1);
    }
    ost->st->id = oc->nb_streams-1;
    c = avcodec_alloc_context3(*codec);
    if (!c) {
        fprintf(stderr, "Could not alloc an encoding context\n");
        exit(1);
    }
    ost->enc = c;

    switch ((*codec)->type)
    {
        case AVMEDIA_TYPE_AUDIO:
            c->sample_fmt  = (*codec)->sample_fmts ?
                             (*codec)->sample_fmts[0] : AV_SAMPLE_FMT_FLTP;
            c->bit_rate    = 64000;
            c->sample_rate = 44100;
            if ((*codec)->supported_samplerates) {
                c->sample_rate = (*codec)->supported_samplerates[0];
                for (i = 0; (*codec)->supported_samplerates[i]; i++) {
                    if ((*codec)->supported_samplerates[i] == 44100)
                        c->sample_rate = 44100;
                }
            }
            c->channels        = av_get_channel_layout_nb_channels(c->channel_layout);
            c->channel_layout = AV_CH_LAYOUT_STEREO;
            if ((*codec)->channel_layouts) {
                c->channel_layout = (*codec)->channel_layouts[0];
                for (i = 0; (*codec)->channel_layouts[i]; i++) {
                    if ((*codec)->channel_layouts[i] == AV_CH_LAYOUT_STEREO)
                        c->channel_layout = AV_CH_LAYOUT_STEREO;
                }
            }
            c->channels        = av_get_channel_layout_nb_channels(c->channel_layout);
            ost->st->time_base = (AVRational){ 1, c->sample_rate };
            break;

        case AVMEDIA_TYPE_VIDEO:
            c->codec_id = codec_id;

            c->bit_rate = 400000;
            /* Resolution must be a multiple of two. */
            c->width    = 352;
            c->height   = 288;
            /* timebase: This is the fundamental unit of time (in seconds) in terms
             * of which frame timestamps are represented. For fixed-fps content,
             * timebase should be 1/framerate and timestamp increments should be
             * identical to 1. */
            ost->st->time_base = (AVRational){ 1, STREAM_FRAME_RATE };
            c->time_base       = ost->st->time_base;

            c->gop_size      = 12; /* emit one intra frame every twelve frames at most */
            c->pix_fmt       = STREAM_PIX_FMT;
            if (c->codec_id == AV_CODEC_ID_MPEG2VIDEO) {
                /* just for testing, we also add B-frames */
                c->max_b_frames = 2;
            }
            if (c->codec_id == AV_CODEC_ID_MPEG1VIDEO) {
                /* Needed to avoid using macroblocks in which some coeffs overflow.
                 * This does not happen with normal video, it just happens here as
                 * the motion of the chroma plane does not match the luma plane. */
                c->mb_decision = 2;
            }
            break;

        default:
            break;
    }

    if (c->codec->capabilities & AV_CODEC_CAP_FRAME_THREADS ||
        c->codec->capabilities & AV_CODEC_CAP_SLICE_THREADS)
    {
        if (c->codec->capabilities & AV_CODEC_CAP_FRAME_THREADS)
        {
            c->thread_type = FF_THREAD_FRAME;
        }
        if (c->codec->capabilities & AV_CODEC_CAP_SLICE_THREADS)
        {
            c->thread_type = FF_THREAD_SLICE;
        }

        c->execute = &thread_execute;
        c->execute2 = &thread_execute2;
        c->thread_count = 4;

        // NOTE: Testing opaque.
        c->opaque = (void*)0xff;
    }

    /* Some formats want stream headers to be separate. */
    if (oc->oformat->flags & AVFMT_GLOBALHEADER)
        c->flags |= AV_CODEC_FLAG_GLOBAL_HEADER;
}

/**************************************************************/
/* video output */

static AVFrame *alloc_picture(enum AVPixelFormat pix_fmt, int width, int height)
{
    AVFrame *picture;
    int ret;

    picture = av_frame_alloc();
    if (!picture)
        return NULL;

    picture->format = pix_fmt;
    picture->width  = width;
    picture->height = height;

    /* allocate the buffers for the frame data */
    ret = av_frame_get_buffer(picture, 32);
    if (ret < 0) {
        fprintf(stderr, "Could not allocate frame data.\n");
        exit(1);
    }

    return picture;
}

static void open_video(AVFormatContext *oc, AVCodec *codec, OutputStream *ost, AVDictionary *opt_arg)
{
    int ret;
    AVCodecContext *c = ost->enc;
    //AVDictionary *opt = NULL;

    //av_dict_copy(&opt, opt_arg, 0);

    /* open the codec */
    ret = avcodec_open2(c, codec, NULL);
    //av_dict_free(&opt);
    if (ret < 0) {
        char s[AV_ERROR_MAX_STRING_SIZE];
        fprintf(stderr, "Could not open video codec: %s\n", cb_av_err2str(s,AV_ERROR_MAX_STRING_SIZE,ret));
        exit(1);
    }

    /* allocate and init a re-usable frame */
    ost->frame = alloc_picture(c->pix_fmt, c->width, c->height);
    if (!ost->frame) {
        fprintf(stderr, "Could not allocate video frame\n");
        exit(1);
    }

    /* If the output format is not YUV420P, then a temporary YUV420P
     * picture is needed too. It is then converted to the required
     * output format. */
    ost->tmp_frame = NULL;
    if (c->pix_fmt != AV_PIX_FMT_YUV420P) {
        ost->tmp_frame = alloc_picture(AV_PIX_FMT_YUV420P, c->width, c->height);
        if (!ost->tmp_frame) {
            fprintf(stderr, "Could not allocate temporary picture\n");
            exit(1);
        }
    }

    /* copy the stream parameters to the muxer */
    ret = avcodec_parameters_from_context(ost->st->codecpar, c);
    if (ret < 0) {
        fprintf(stderr, "Could not copy the stream parameters\n");
        exit(1);
    }
}

/* Prepare a dummy image. */
static void fill_yuv_image(AVFrame *pict, int frame_index,
                           int width, int height)
{
    int x, y, i;

    i = frame_index;

    /* Y */
    for (y = 0; y < height; y++)
        for (x = 0; x < width; x++)
            pict->data[0][y * pict->linesize[0] + x] = x + y + i * 3;

    /* Cb and Cr */
    for (y = 0; y < height / 2; y++) {
        for (x = 0; x < width / 2; x++) {
            pict->data[1][y * pict->linesize[1] + x] = 128 + y + i * 2;
            pict->data[2][y * pict->linesize[2] + x] = 64 + x + i * 5;
        }
    }
}

static AVFrame *get_video_frame(OutputStream *ost)
{
    AVCodecContext *c = ost->enc;

    /* check if we want to generate more frames */
    if (av_compare_ts(ost->next_pts, c->time_base,
                      STREAM_DURATION, (AVRational){ 1, 1 }) >= 0)
        return NULL;

    /* when we pass a frame to the encoder, it may keep a reference to it
     * internally; make sure we do not overwrite it here */
    if (av_frame_make_writable(ost->frame) < 0)
        exit(1);

    if (c->pix_fmt != AV_PIX_FMT_YUV420P) {
        /* as we only generate a YUV420P picture, we must convert it
         * to the codec pixel format if needed */
        /*if (!ost->sws_ctx) {
            ost->sws_ctx = sws_getContext(c->width, c->height,
                                          AV_PIX_FMT_YUV420P,
                                          c->width, c->height,
                                          c->pix_fmt,
                                          SCALE_FLAGS, NULL, NULL, NULL);
            if (!ost->sws_ctx) {
                fprintf(stderr,
                        "Could not initialize the conversion context\n");
                exit(1);
            }
        }
        fill_yuv_image(ost->tmp_frame, ost->next_pts, c->width, c->height);
        sws_scale(ost->sws_ctx,
                  (const uint8_t * const *)ost->tmp_frame->data, ost->tmp_frame->linesize,
                  0, c->height, ost->frame->data, ost->frame->linesize);*/
    } else {
        fill_yuv_image(ost->frame, ost->next_pts, c->width, c->height);
    }

    ost->frame->pts = ost->next_pts++;

    return ost->frame;
}

/*
 * encode one video frame and send it to the muxer
 * return 1 when encoding is finished, 0 otherwise
 */
static int write_video_frame(AVFormatContext *oc, OutputStream *ost)
{
    int ret;
    AVCodecContext *c;
    AVFrame *frame;
    int got_packet = 0;
    AVPacket pkt = { 0 };

    c = ost->enc;

    frame = get_video_frame(ost);

    if (frame)
    {
        ret = avcodec_send_frame(ost->enc, frame);
        if (ret < 0)
        {
            char s[AV_ERROR_MAX_STRING_SIZE];
            fprintf(stderr, "Error encoding video frame: %s\n", cb_av_err2str(s, AV_ERROR_MAX_STRING_SIZE, ret));
            exit(1);
        }
    }

    av_init_packet(&pkt);

    ret = avcodec_receive_packet(ost->enc,&pkt);
    if (ret < 0)
    {
        if (ret == AVERROR(EAGAIN)) { ret = 0; }
        else
        {
            char s[AV_ERROR_MAX_STRING_SIZE];
            fprintf(stderr, "Error receiving packet: %s\n", cb_av_err2str(s,AV_ERROR_MAX_STRING_SIZE,ret));
            exit(1);
        }
    }
    else
    {
        got_packet = 1;
        ret = write_frame(oc, &c->time_base, ost->st, &pkt);
    }

    if (ret < 0) {
        char s[AV_ERROR_MAX_STRING_SIZE];
        fprintf(stderr, "Error while writing video frame: %s\n", cb_av_err2str(s,AV_ERROR_MAX_STRING_SIZE,ret));
        exit(1);
    }

    return (frame || got_packet) ? 0 : 1;
}

static void close_stream(AVFormatContext *oc, OutputStream *ost)
{
    avcodec_free_context(&ost->enc);
    av_frame_free(&ost->frame);
    av_frame_free(&ost->tmp_frame);
    //sws_freeContext(ost->sws_ctx);
    //swr_free(&ost->swr_ctx);
}

/**************************************************************/
/* media file output */

int main(int argc, char **argv)
{
    OutputStream video_st = { 0 }, audio_st = { 0 };
    const char *filename;
    AVOutputFormat *fmt;
    AVFormatContext *oc;
    AVCodec /**audio_codec,*/ *video_codec;
    int ret;
    int have_video = 0, have_audio = 0;
    int encode_video = 0, encode_audio = 0;
    AVDictionary *opt = NULL;
    int i;

    /* Initialize libavcodec, and register all codecs and formats. */
    av_register_all();
    avformat_network_init();

    if (argc < 2) {
        printf("usage: %s output_file\n"
                   "API example program to output a media file with libavformat.\n"
                   "This program generates a synthetic audio and video stream, encodes and\n"
                   "muxes them into a file named output_file.\n"
                   "The output format is automatically guessed according to the file extension.\n"
                   "Raw images can also be output by using '%%d' in the filename.\n"
                   "\n", argv[0]);
        return 1;
    }

    filename = argv[1];
    for (i = 2; i+1 < argc; i+=2) {
        if (!strcmp(argv[i], "-flags") || !strcmp(argv[i], "-fflags"))
            av_dict_set(&opt, argv[i]+1, argv[i+1], 0);
    }

    const char *pfilename = filename;

    /* allocate the output media context */
    avformat_alloc_output_context2(&oc, NULL, "mpegts", pfilename);
    if (!oc) {
        printf("Could not deduce output format from file extension: using MPEG.\n");
        avformat_alloc_output_context2(&oc, NULL, "mpeg", pfilename);
    }
    if (!oc)
        return 1;

    fmt = oc->oformat;

    /* Add the audio and video streams using the default format codecs
     * and initialize the codecs. */
    if (fmt->video_codec != AV_CODEC_ID_NONE) {
        add_stream(&video_st, oc, &video_codec, fmt->video_codec);
        have_video = 1;
        encode_video = 1;
    }
    /*if (fmt->audio_codec != AV_CODEC_ID_NONE) {
        add_stream(&audio_st, oc, &audio_codec, fmt->audio_codec);
        have_audio = 1;
        encode_audio = 1;
    }*/

    /* Now that all the parameters are set, we can open the audio and
     * video codecs and allocate the necessary encode buffers. */
    if (have_video)
        open_video(oc, video_codec, &video_st, opt);

    //if (have_audio)
    //    open_audio(oc, audio_codec, &audio_st, opt);

    av_dump_format(oc, 0, pfilename, 1);

    /* open the output file, if needed */
    if (!(fmt->flags & AVFMT_NOFILE)) {
        ret = avio_open(&oc->pb, pfilename, AVIO_FLAG_WRITE);
        if (ret < 0) {
            char s[AV_ERROR_MAX_STRING_SIZE];
            fprintf(stderr, "Could not open '%s': %s\n", pfilename,
                    cb_av_err2str(s,AV_ERROR_MAX_STRING_SIZE,ret));
            return 1;
        }
    }

    /* Write the stream header, if any. */
    ret = avformat_write_header(oc, &opt);
    if (ret < 0) {
        char s[AV_ERROR_MAX_STRING_SIZE];
        fprintf(stderr, "Error occurred when opening output file: %s\n",
                cb_av_err2str(s,AV_ERROR_MAX_STRING_SIZE,ret));
        return 1;
    }

    while (encode_video || encode_audio) {
        /* select the stream to encode */
        if (encode_video &&
            (!encode_audio || av_compare_ts(video_st.next_pts, video_st.enc->time_base,
                                            audio_st.next_pts, audio_st.enc->time_base) <= 0)) {
            encode_video = !write_video_frame(oc, &video_st);
        } else {
            //encode_audio = !write_audio_frame(oc, &audio_st);
        }

        //std::this_thread::sleep_for(std::chrono::milliseconds(35));
    }

    /* Write the trailer, if any. The trailer must be written before you
     * close the CodecContexts open when you wrote the header; otherwise
     * av_write_trailer() may try to use memory that was freed on
     * av_codec_close(). */
    av_write_trailer(oc);

    /* Close each codec. */
    if (have_video)
        close_stream(oc, &video_st);
    if (have_audio)
        close_stream(oc, &audio_st);

    if (!(fmt->flags & AVFMT_NOFILE))
        /* Close the output file. */
        avio_closep(&oc->pb);

    /* free the stream */
    avformat_free_context(oc);

    return 0;
}

                                                //

环境:

  • Ubuntu Zesty (17.04)
  • FFmpeg 3.2.4 版(通过包管理器)
  • gcc 6.3 (C++)

推荐答案

您必须执行以下操作:

  1. 调用avcodec_alloc_context3(...).此调用将在新上下文中设置默认的 execute 和 execute2 函数
  2. set c->thread_count = number_of_threads_in_your_thread_pool()
  3. 调用avcodec_open2(...).
  4. 设置 c->executec->execute2 指向你的函数
  5. 调用ff_thread_free(c).此函数未在 libavcodec 标头中公开,但您可以添加以下行:

  1. call avcodec_alloc_context3(...). This call will set default execute and execute2 functions in new context
  2. set c->thread_count = number_of_threads_in_your_thread_pool()
  3. call avcodec_open2(...).
  4. set c->execute and c->execute2 to point to your functions
  5. call ff_thread_free(c). This function isnt exposed in libavcodec headers but you can add following line:

extern "C" void ff_thread_free(AVCodecContext *s);

缺点是libavcodec会在avcodec_open2(...)调用后创建内部线程池,在ff_thread_free()调用中会删除该池.内部线程池非常有效,但如果您打算对多个视频源进行并行编码,则效果不佳.在这种情况下,libavcodec 将为每个编码视频源创建单独的线程池.

Drawback is that libavcodec will create internal thread pool after avcodec_open2(...) call, and that pool will be deleted in ff_thread_free() call. Internal thread pool is very efficient, but its not good if you plan to do parallel encoding of multiple video feeds. In that case libavcodec will create separate thread pool for each encoding video feed.

这篇关于FFmpeg:使用自定义线程池进行并行编码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆