linux内核AIO功能 [英] linux kernel aio functionality

查看:290
本文介绍了linux内核AIO功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我测试的内核异步IO功能(不是POSIX AIO),我试图弄清楚它是如何工作的。下面的code是一个完整的程序,我只是反复写一个阵列使用O_DIRECT打开的文件。我得到一个错误的回调函数写字节无缘预计1024年得到0(见work_done的fprintf中声明())。

对于那些不熟悉内核AIO时,code以下执行以下操作:


  1. 初始化一些结构

  2. prepare AIO(io_ prep_pwrite)

  3. 提交IO请求(io_submit)

  4. 检查事件完成(io_getevents)

  5. 调用回调函数,看看是否一切正常。

我在第5步中得到一个错误,如果我不使用O_DIRECT打开文件,事情做工精细,但它比拥有异步写入的目的。
谁能告诉我什么,我做错了什么?这是内核AIO的正确用法,例如,是我使用的回调是否正确?是否有关于O_DIRECT的使用情况如何?

任何限制

我编译使用'GCC -Wall test.c的-laio

先谢谢了。

  / *
 *文件:myaiocp.c
 *作者:kmehta
 *
 *创建于2011年7月11日,12:50 PM
 *
 *
 *测试内核AIO。
 *程序创建一个二维矩阵和多次写入它来创建所需大小的文件。
 *写操作使用内核AIO函数执行(io_ prep_pwrite,io_submit等)
 * /
#定义_GNU_SOURCE
的#define _XOPEN_SOURCE 600#包括LT&;&stdio.h中GT;
#包括LT&;&stdlib.h中GT;
#包括LT&;&getopt.h GT;
#包括LT&;&pthreads.h中GT;
#包括LT&;&fcntl.h GT;
#包括LT&;&string.h中GT;
#包括LT&; SYS / uio.h>
#包括LT&; SYS / time.h中>
#包括LT&;&omp.h GT;
#包括LT&;&unistd.h中GT;
#包括LT&; SYS / types.h中>
#包括LT&; SYS / stat.h>
#包括LT&;&errno.h中GT;
#包括LT&;&libaio.h GT;焦炭** BUF;
长seg_size;
诠释seg_rows;
双TOTAL_SIZE;
字符*文件名;
静态INT wait_count = 0;无效io_task();
清理无效();
无效allocate_2D_matrix(INT []);
INT FILE_OPEN(字符*);
无效wr_done(io_context_t CTX,结构IOCB * IOCB,长期资源,长RES2);INT主(INT ARGC,字符** argv的){
    TOTAL_SIZE = 1048576; // 1MB
    seg_size = 1024; // 1kB的
    seg_rows = 1024;
    文件名=aio.out;    INT变暗[] = {seg_rows,seg_size};
    allocate_2D_matrix(DIMS); //创建二维矩阵    io_task();
    清理(​​);    返回0;
}/ *
 *创建一个二维矩阵
 * /
无效allocate_2D_matrix(中间体变暗[2]){
    INT I;
    字符*的数据;    //创建矩阵
    数据=(字符*)释放calloc(1,DIMS [0] *变暗[1] *的sizeof(字符));
    如果(数据== NULL){
        的printf(\\ n无法矩阵分配内存\\ n);
        出口(1);
    }    BUF =(字符**)的malloc(DIMS [0] * sizeof的(字符*));
    如果(BUF == NULL){
        的printf(\\ n无法矩阵分配内存\\ n);
        出口(1);
    }    对于(I = 0; I&下;变暗[0];我++){
        的buf [I] =及(数据[我*变暗[1]]);
    }
}静态无效IO_ERROR(为const char * FUNC,INT RC)
{
    如果(RC == -ENOSYS)
        fprintf中(标准错误,AIO在此内核\\ n);
    否则如果(RC℃,)
        fprintf中(标准错误,%S:%S \\ n,FUNC,字符串错误(-rc));
    其他
        fprintf中(标准错误,%S:误差%d个\\ N,FUNC,RC);    出口(1);
}/ *
 *回调函数
 * /
静态无效work_done(io_context_t CTX,结构IOCB * IOCB,长期资源,长RES2)
{    如果(RES2!= 0){
        IO_ERROR(AIO写,RES2);
      }      如果(RES = iocb->!u.c.nbytes){
            fprintf中(标准错误,写字节无缘期望%禄拿到%LD \\ N,
                  iocb-> u.c.nbytes,RES2);
            出口(1);
      }
      wait_count - ;
      的printf(%D,wait_count);
}/ *
 *等待程序。获取事件和调用回调函数work_done()
 * /
INT io_wait_run(io_context_t CTX,长ITER)
{
      结构io_event事件[ITER]
      结构io_event * EP;
      INT RET,N;      / *
       *起床同时aio_maxio事件。
       * /
      RET = N = io_getevents(CTX,ITER,国际热核实验堆,事件,NULL);
      的printf(得到%d \\事件N,N);
      / *
       *调用每个事件的回调函数。
       * /
      为(EP =事件; N--大于0; EP ++){
            io_callback_t CB =(io_callback_t)EP-GT&;数据;结构IOCB * IOCB = EP-GT&; OBJ; CB(CTX,IOCB,EP>资源,EP> RES2);
      }
      返回RET;
}无效io_task(){
    长偏移= 0;
    INT bufIndex = 0;    //打开文件
    INT FD = FILE_OPEN(文件名);    //初始化结构
    我长;
    长ITER = TOTAL_SIZE / seg_size; //没有。迭代以达到所需的文件大小(TOTAL_SIZE)
    io_context_t myctx;
    如果(0 = io_queue_init(ITER,&安培;!myctx))
    {
        PERROR(无法初始化IO队列);
        出口(EXIT_FAILURE);
    }
    结构IOCB * IOQ [ITER]    //遍历ITER倍,达到所需的文件大小
    对于(i = 0; I< ITER;我++){
        结构IOCB * IO =(结构IOCB *)malloc的(的sizeof(结构IOCB));
        io_ prep_pwrite(IO,FD,BUF [bufIndex],seg_size,抵消);
        io_set_callback(IO,work_done);
        IOQ [I] = 10;        胶印+ = seg_size;
        bufIndex ++;
        如果(bufIndex> seg_rows - 1)//如果整个矩阵写的,再次从索引0开始
            bufIndex = 0;
    }    的printf(DONE preparing现在提交.. \\ n);
    如果(ITER!= io_submit(myctx,ITER,IOQ))
    {
        PERROR(失败上提交);
        出口(EXIT_FAILURE);
    }    的printf(现在正在等待完成... \\ n);
    wait_count = ITER;
    中期业绩;    而(wait_count){
        RES = io_wait_run(myctx,ITER);
        如果(RES℃,)
            IO_ERROR(io_wait_run,RES);
    }    关闭(FD);
}无效清理(){
    免费(BUF [0]);
    免费(BUF);
}INT FILE_OPEN(字符*文件名){
    INT的fd;
    如果(-1 ==(FD =开放(文件名,O_DIRECT | O_CREAT | O_WRONLY | O_TRUNC,0666))){
        的printf(\\ n错误打开文件\\ n);
        出口(-1);
    }    返回FD;
}


解决方案

首先,而不是使用抓好 libaio的 POSIX AIO


  

是否有关于O_DIRECT的任何使用限制?


我不是100%肯定这是真正的问题,但 O_DIRECT 有一些要求(从TLPI报价居多):


  • 数据缓冲区转移必须存储器边界即是块大小的倍数对齐(使用 posix_memalign

  • 的文件或设备偏移量,数据传输开始必须是块大小的倍数

  • 要传送必须是块大小的倍数的数据的长度

一目了然,我可以看到你不采取奥比precautions在 allocate_2D_matrix 对齐内存。


  

如果我不使用O_DIRECT打开文件,事情做工精细,但它
  比拥有异步的目的写道。


这碰巧不是这种情况。异步I / O工作良好,没有 O_DIRECT (例如认为系统调用的数量削减)。

I am testing kernel asynchronous io functions (not posix aio) and am trying to figure out how it works. The code below is a complete program where I simply write an array repeatedly to a file opened using O_DIRECT. I get an error in the callback function "write missed bytes expect 1024 got 0" (see the fprintf statement in work_done()).

For those not familiar with kernel aio, the code below does the following:

  1. Init some structs
  2. Prepare aio (io_prep_pwrite)
  3. Submit io requests (io_submit)
  4. Check for event completion (io_getevents)
  5. Call a callback function to see if everything went ok.

I get an error at step 5. If I do not open the file using O_DIRECT, things work fine, but it beats the purpose of having async writes. Can someone tell me what I am doing wrong? Is this the correct usage of kernel aio, for example, is my use of callbacks correct? Are there any restrictions on the usage of O_DIRECT?

I compile using 'gcc -Wall test.c -laio'

Thanks in advance.

/* 
 * File:   myaiocp.c
 * Author: kmehta
 *
 * Created on July 11, 2011, 12:50 PM
 *
 *
 * Testing kernel aio. 
 * Program creates a 2D matrix and writes it multiple times to create a file of desired size. 
 * Writes are performed using kernel aio functions (io_prep_pwrite, io_submit, etc.)
 */
#define _GNU_SOURCE
#define _XOPEN_SOURCE 600

#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <fcntl.h>
#include <string.h>
#include <sys/uio.h>
#include <sys/time.h>
#include <omp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <libaio.h>

char ** buf;
long seg_size;
int seg_rows;
double total_size;
char * filename;
static int wait_count = 0;

void io_task();
void cleanup();
void allocate_2D_matrix(int[]);
int file_open(char *);
void wr_done(io_context_t ctx, struct iocb* iocb, long res, long res2);

int main(int argc, char **argv) {
    total_size  = 1048576;      //1MB
    seg_size    = 1024;         //1kB
    seg_rows    = 1024;
    filename    = "aio.out";

    int dims[] = {seg_rows, seg_size};
    allocate_2D_matrix(dims);   //Creates 2D matrix

    io_task();
    cleanup();

    return 0;
}

/*
 * Create a 2D matrix
 */
void allocate_2D_matrix(int dims[2]) {
    int i;
    char *data;

    //create the matrix
    data = (char *) calloc(1, dims[0] * dims[1] * sizeof (char));
    if (data == NULL) {
        printf("\nCould not allocate memory for matrix.\n");
        exit(1);
    }

    buf = (char **) malloc(dims[0] * sizeof (char *));
    if (buf == NULL) {
        printf("\nCould not allocate memory for matrix.\n");
        exit(1);
    }

    for (i = 0; i < dims[0]; i++) {
        buf[i] = &(data[i * dims[1]]);
    }
}

static void io_error(const char *func, int rc)
{
    if (rc == -ENOSYS)
        fprintf(stderr, "AIO not in this kernel\n");
    else if (rc < 0)
        fprintf(stderr, "%s: %s\n", func, strerror(-rc));
    else
        fprintf(stderr, "%s: error %d\n", func, rc);

    exit(1);
}

/*
 * Callback function
 */
static void work_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{

    if (res2 != 0) {
        io_error("aio write", res2);
      }

      if (res != iocb->u.c.nbytes) {
            fprintf(stderr, "write missed bytes expect %lu got %ld\n",
                  iocb->u.c.nbytes, res2);
            exit(1);
      }
      wait_count --;
      printf("%d ", wait_count);
}

/*
 * Wait routine. Get events and call the callback function work_done()
 */
int io_wait_run(io_context_t ctx, long iter)
{
      struct io_event events[iter];
      struct io_event *ep;
      int ret, n;

      /*
       * get up to aio_maxio events at a time.
       */
      ret = n = io_getevents(ctx, iter, iter, events, NULL);
      printf("got %d events\n", n);
      /*
       * Call the callback functions for each event.
       */
      for (ep = events ; n-- > 0 ; ep++) {
            io_callback_t cb = (io_callback_t)ep->data ; struct iocb *iocb = ep->obj ; cb(ctx, iocb, ep->res, ep->res2);
      }
      return ret;
}

void io_task() {
    long offset = 0;
    int bufIndex = 0;

    //Open file
    int fd = file_open(filename);

    //Initialize structures
    long i; 
    long iter = total_size / seg_size;  //No. of iterations to reach desired file size (total_size)
    io_context_t myctx;
    if(0 != io_queue_init(iter, &myctx))
    {
        perror("Could not initialize io queue");
        exit(EXIT_FAILURE);
    }
    struct iocb * ioq[iter];

    //loop through iter times to reach desired file size
    for (i = 0; i < iter; i++) {
        struct iocb *io = (struct iocb*) malloc(sizeof (struct iocb));
        io_prep_pwrite(io, fd, buf[bufIndex], seg_size, offset);
        io_set_callback(io, work_done);
        ioq[i] = io;

        offset += seg_size;
        bufIndex ++;
        if (bufIndex > seg_rows - 1)    //If entire matrix written, start again from index 0
            bufIndex = 0;
    }

    printf("done preparing. Now submitting..\n");
    if(iter != io_submit(myctx, iter, ioq))
    {
        perror("Failure on submit");
        exit(EXIT_FAILURE);
    }

    printf("now awaiting completion..\n");
    wait_count = iter;
    int res;

    while (wait_count) {
        res = io_wait_run(myctx, iter);
        if (res < 0)
            io_error("io_wait_run", res);
    }

    close(fd);
}

void cleanup() {
    free(buf[0]);
    free(buf);
}

int file_open(char *filename) {
    int fd;
    if (-1 == (fd = open(filename, O_DIRECT | O_CREAT | O_WRONLY | O_TRUNC, 0666))) {
        printf("\nError opening file. \n");
        exit(-1);
    }

    return fd;
}

解决方案

First of all, good job using libaio instead of POSIX aio.

Are there any restrictions on the usage of O_DIRECT ?

I'm not 100% sure this is the real problem, but O_DIRECT has some requirements (quoting mostly from TLPI):

  • The data buffer being transferred must be aligned on a memory boundary that is a multiple of the block size (use posix_memalign)
  • The file or device offset at which data transfer commences must be a multiple of the block size
  • The length of the data to be transferred must be a multiple of the block size

At a glance, I can see you are not taking aby precautions to align memory in allocate_2D_matrix.

If I do not open the file using O_DIRECT, things work fine, but it beats the purpose of having async writes.

This happens not to be the case. Asynchronous I/O works well without O_DIRECT (for instance think of the number of system calls slashed).

这篇关于linux内核AIO功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆