如何在Go中将单个Postgres DB连接重用于行插入? [英] How to reuse a single Postgres DB connection for row inserts in Go?

查看:68
本文介绍了如何在Go中将单个Postgres DB连接重用于行插入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Go语言,通过到DB的单个连接(在下面的代码的init函数中打开)来为从Rabbitmq收到的每条新消息插入一行数据到Postgres表中.

I'm trying to use Go to insert a row of data into a Postgres table for each new message received from rabbitmq, using a single connection to the DB which is opened in the init function of the code below.

代码不仅仅打开一个连接,而是打开497并最大化,这导致行插入停止...

Rather than opening just one connection, the code opens 497 and maxes out which causes the row inserts to stop...

我已尝试使用以下问题中的信息在函数内部打开数据库连接,其中说我应该打开一个连接并使用全局db允许主函数将sql语句传递给init函数中打开的连接.

I have tried using the info in these questions opening and closing DB connection in Go app and open database connection inside a function which say I should open one connection and use global db to allow the main function to pass the sql statement to the connection opened in the init function.

我以为我已经做到了,但是每行新连接都会打开,因此一旦达到postgres连接限制,代码就会停止工作.

I thought I had done this, however a new connection is being opened for each new row so the code stops working once the postgres connection limit is reached...

我是Go语言的新手,并且编程经验有限,在过去的两天里,我一直在尝试了解/解决此问题,我确实可以通过一些帮助来了解我在哪里出错了……

I am new to Go and have limited programming experience, I have been trying to understand/resolve this issue for the past two days and I could really do with some help understanding where I am going wrong with this...

var db *sql.DB

func init() {
    var err error
    db, err = sql.Open ( "postgres", "postgres://postgres:postgres@SERVER/PORT/DB")
    if err != nil {
        log.Fatal("Invalid DB config:", err)
    }
    if err = db.Ping(); err != nil {
        log.Fatal("DB unreachable:", err)
    }
}

func main() {

// RABBITMQ CONNECTION CODE IS HERE

// EACH MESSAGE RECEIVED IS SPLIT TO LEGEND, STATUS, TIMESTAMP VARIABLES

// VARIABLES ARE PASSED TO sqlSatement    

        sqlStatement := `
        INSERT INTO heartbeat ("Legend", "Status", "TimeStamp")
        VALUES ($1, $2, $3)
`
        // sqlStatement IS THEN PASSED TO db.QueryRow

        db.QueryRow(sqlStatement, Legend, Status, TimeStamp)
    }
}()

<-forever
}

完整代码如下所示:

package main

import (
    "database/sql"
    "log"
    _ "github.com/lib/pq"

    "github.com/streadway/amqp"
    "strings"
)
var db *sql.DB

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func init() {
    var err error
    db, err = sql.Open ( "postgres", "postgres://postgres:postgres@192.168.1.69:5432/test?sslmode=disable")
    if err != nil {
        log.Fatal("Invalid DB config:", err)
    }
    if err = db.Ping(); err != nil {
        log.Fatal("DB unreachable:", err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://Admin:Admin@192.168.1.69:50003/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "HEARTBEAT", // name
        false,       // durable
        false,       // delete when unused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {

        for d := range msgs {
            myString := string(d.Body[:])
            result := strings.Split(myString, ",")
            Legend := result[0]
            Status := result[1]
            TimeStamp := result[2]

            sqlStatement := `
    INSERT INTO heartbeat ("Legend", "Status", "TimeStamp")
    VALUES ($1, $2, $3)
    `
            //
            db.QueryRow(sqlStatement, Legend, Status, TimeStamp)
        }
    }()

    <-forever
}

推荐答案

首先,*sql.DB不是连接,而是连接池,它将打开所需数量的连接和postgres服务器的数量允许.仅当池中没有可用的空闲连接时,它才会打开新连接.

First off, *sql.DB is not a connection but a pool of connections, it will open as many connection as it needs to and as many as the postgres server allows. It only opens new connections when there is no idle one in the pool ready for use.

所以问题是数据库打开的连接没有被释放,为什么?因为您使用的是QueryRow而不是对返回的*Row值调用Scan.

So the issue is that the connections that DB opens aren't being released, why? Because you're using QueryRow without calling Scan on the returned *Row value.

在内部*Row拥有一个*Rows实例,该实例可以访问自己的连接,并且在调用Scan时该连接会自动释放.如果未调用Scan,则不会释放连接,这将导致DB池在下次调用QueryRow时打开新连接.因此,由于您不释放任何连接,因此DB会继续打开新的连接,直到达到postgres设置所指定的限制,然后对QueryRow的下一个调用将挂起,因为它等待连接变为空闲状态.

Under the hood *Row holds a *Rows instance which has access to its own connection and that connection is released automatically when Scan is called. If Scan is not called then the connection is not released which causes the DB pool to open a new connection on the next call to QueryRow. So since you're not releasing any connections DB keeps opening new ones until it hits the limit specified by the postgres settings and then the next call to QueryRow hangs because it waits for a connection to become idle.

因此,如果您不关心输出,则需要使用Exec,或者需要在返回的*Row上调用Scan.

So you either need to use Exec if you don't care about the output, or you need to call Scan on the returned *Row.

这篇关于如何在Go中将单个Postgres DB连接重用于行插入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆