如何使用所有使用相同Hibernate会话的多个线程创建服务? [英] How to create a service with multiple threads which all use the same Hibernate session?

查看:116
本文介绍了如何使用所有使用相同Hibernate会话的多个线程创建服务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是解析一个大的XML文件,并根据XML数据将对象保存到数据库,并快速完成。该操作需要事务处理,以便在解析XML时遇到问题时可以回滚,或者无法验证创建的对象。



我正在使用 Grails Executor插件来执行操作。问题是我在服务中创建的每个线程都有自己的事务和会话。如果我创建了4个线程,并且1个失败,那么3个没有失败的会话可能已经被刷新,或者他们可能在未来刷新。



我想如果我可以告诉每个线程使用当前Hibernate会话,这可能会解决我的问题。我的另一个想法是,我可以防止所有会话刷新,直到知道所有会话完成而没有错误。不幸的是,我不知道如何做这些事情。



还有一个额外的问题。有很多这些XML文件需要解析,而且很多将在未来创建。这些XML文件中的很多包含的数据在分析时会创建与先前分析过的XML文件时已创建的对象相同的对象。在这种情况下,我需要引用现有的对象。我已经为每个类添加了一个 isUnique 变量来解决这个问题。使用Grails 唯一约束不起作用,因为它不需要 hasMany 关系考虑到我在我的问题中概述了此处



与真实情况相比,以下示例非常简单。我分析的XML文件有很多嵌套的元素,它们有很多属性。



想象下列域类:

  class Foo {
String ver

Set< Bar> bar
Set< Baz> bazs
static hasMany = [bars:Bar,bazs:Baz]

boolean getIsUnique(){
Util.isUnique(this)
}
static瞬变= [
'isUnique'
]

静态约束= {
ver(可空:假)
isUnique(
验证器: val,obj - >
obj.isUnique
}

}
}

class Bar {
字符串名称

boolean getIsUnique(){
Util.isUnique(this)
}
static transients = [
'isUnique'
]

static constraints = {
isUnique(
validator:{val,obj - >
obj.isUnique
}

}
}


class Baz {
字符串名称

boolean getIsUnique(){
Util.isUnique(this)
}
静态瞬变= [
isUnique设置
]

静态约束= {
isUnique设置(
验证:{VAL,OBJ - >
obj.isUnique
}

}
}

以下是位于我的 src / groovy 文件夹中的 Util.groovy 类。这个类包含我用来确定一个域类的实例是唯一的和/或检索已经存在的平等实例的方法:

  import org.hibernate.Hibernate 
$ b $ class Util {
/ **
*获得对象的域类的第一个实例,前提是
*是等于提供的对象。
*
* @param obj
* @return obj的域类的第一个实例,它等于obj
* /
static def getFirstDuplicate(def obj){
def objClass = Hibernate.getClass(obj)
objClass.getAll()。find {it == obj}
}

/ **
*确定一个对象是否在其领域类中是唯一的
*
* @param obj
* @return如果obj是唯一的,则返回true,否则返回false
* /
static def isUnique(def obj){
getFirstDuplicate(obj)== null
}

/ **
*验证所有对象的约束,除了包含在
*提供黑名单,如果有效则保存该对象。
*
* @param obj
* @return验证的对象,保存如果有效
* /
static def validateWithBlacklistAndSave(def obj,def blacklist = null){
def propertiesToValidate = obj.domainClass.constraints.keySet()。collectMany {!blacklist?.contains(it)? (obj.validate(propertiesToValidate)){
obj.save(validate:false)
}
obj
} $ b [$]
$ b

假设XML文件A与此类似:

 < foo ver =1.0> 
<! - 开始栏部分 - >
< bar name =bar_1/>
< bar name =bar_2/>
< bar name =bar_3/>
...
< bar name =bar_5000/>

<! - 开始baz部分 - >
< baz name =baz_1/>
< baz name =baz_3/>
...
< baz name =baz_100000/>
< / foo>

假设XML文件B与此类似(与XML文件A相同,新添加了 bar ,并添加了一个新的 baz >)。当在XML文件A之后解析XML文件B时,应该创建三个新对象1.)具有 name = bar_5001的 Bar code> 2.)A Baz with name = baz_100001 ,3.)A Foo with ver = 2.0 以及 bars bazs 等于显示的内容,重用已存在的 Bar Baz 的实例从导入XML文件 A

 < foo ver = 2.0 > 
<! - 开始栏部分 - >
< bar name =bar_1/>
< bar name =bar_2/>
< bar name =bar_3/>
...
< bar name =bar_5000/>
< bar name =bar_5001/>

<! - 开始baz部分 - >
< baz name =baz_1/>
< baz name =baz_3/>
...
< baz name =baz_100000/>
< baz name =baz_100001/>
< / foo>

和以下类似的服务:

<$

//传递一个20MB的XML文件
def upload(def xml){
String rslt = null
def xsd = Util.getDefsXsd()
if(Util.validateXmlWithXsd(xml,xsd)){//验证XML文件的结构
def fooXml = new XmlParser()。parseText(xml .getText())//解析XML

def bars = callAsync {//创建一个线程来创建Bar对象
def bars = []
for(barXml in fooXml.bar){//循环每个小节foo XML元素中的XML元素
def bar = new Bar(//创建一个新的Bar对象
名称:barXml.attribute(name)

bar = retrieveExistingOrSave(bar)//如果一个Bar的实例已经存在,则使用它
bars.add(bar)//将新的Bar对象添加到酒吧列表
}
酒吧//返回酒吧列表
}

def bazs = callAsync {//创建一个用于创建Baz对象的线程
def bazs = []
(fooXml.baz中的bazXml){//循环遍历foo XML元素中的每个baz XML元素
def baz = new Baz(//创建一个新的Baz对象
名称:bazXml.attribute(name)

baz = retrieveExistingOrSave(baz)//如果与此相同的Baz已经存在,然后使用它
bazs.add(baz)//将新的Baz对象添加到Bazs列表
}
bazs //返回列表Bazs
}

bars = bars.get()//等待线程,然后调用Future.get()来获得酒吧列表
bazs = bazs.get()//等待线程,然后调用Future.get()来获取Bazs列表

def foo = new Foo(//创建一个带有列表的新的Foo对象和bazs
ver:fooXml.attribute(ver)
bars:bars
bazs:bazs
).save()

rslt =成功上传$ {xml.getName()}!
} else {
rslt =文件XSD验证失败!

rslt


private def retrieveExistingOrSave(def obj,def existingObjCache){
def dup = Util.getFirstDuplicate(obj)
obj = dup?:Util.validateWithBlacklistAndSave(obj,[isUnique])
if(obj.errors.allErrors){
log.error$ {obj}有错误$ {obj.errors }
throw new RuntimeException()//强制事务回滚
}
obj
}
}

所以问题是如何获取我的服务的 upload 方法内部发生的所有事情发生在一个单一的会话,所以发生的一切都可以回滚,如果任何一部分失败?

解决方案

服务可以优化,以解决一些痛点:$ b​​
$ b


  • 我同意@takteek,解析xml会很耗时。因此,计划使这部分异步。

  • 每次创建子对象时,您都不需要 flush 。请参阅下面的优化。



服务类看起来像这样:

 //传入一个20MB的XML文件
def upload(def xml){
String rslt = null
def xsd = Util.getDefsXsd()
if(Util.validateXmlWithXsd(xml,xsd)){
def fooXml = new XmlParser()。parseText(xml.getText())
$ b $ def foo = new Foo() .save(flush:true)

def bars = callAsync {
saveBars(foo,fooXml)
}

def bazs = callAsync {
saveBazs(foo,fooXml)
}

//合并分离的实例并检查子对象
//是否被填充。如果孩子是
//也可以发出刷新,但我们不需要它
//默认情况下,域类也被验证。
foo = bars.get()。merge()//将来返回foo
foo = bazs.get()。merge()//将来返回foo

//合并分离的实例并检查子对象
//是否被填充。如果孩子是
//缺席,则回滚整个交易
handleTransaction {
if(foo.bars& foo.bazs){
foo.save(flush:true )
} else {
//如果
中的任何一个//子元素尚未与父元素相关联,则会到达其他块
//如果出现问题,则会发生这种情况在
//任一线程中,对应的
//事务将在相应的会话中回滚
//。因此空的关联。

//仅设置事务回滚
TransactionAspectSupport
.currentTransactionStatus()
.setRollbackOnly()

//或者抛出一个异常和
//让handleTransaction处理回滚
抛出新异常(回滚事务)
}
}

rslt =成功上传$ {xml.getName()}!
} else {
rslt =文件XSD验证失败!

rslt

$ b $ def saveBars(foo foo,fooXml){
handleTransaction {
for(barXml in fooXml.bar) {
def bar = new Bar(name:barXml.attribute(name))
foo.addToBars(bar)
}
//可选我认为会话被刷新
//方法结束
foo.save(flush:true)
}

foo
}

def saveBazs (foo foo,fooXml){
handleTransaction {
(fooXml.baz中的bazXml){
def baz = new Baz(name:bazXml.attribute(name))
foo.addToBazs(baz)
}

//可选我认为会话被刷新
//方法结束
foo.save(flush:true)





def handleTransaction(Closure clos){
try {
clos()
)catch(e){
TransactionAspectSupport.currentTransactionStatus()。setRollbackOnly()


if(TransactionAspectSupport.currentTransactionStatus()。isRollbackOnly())
TransactionAspectSupport.currentTransactionStatus()。setRollbackOnly()
}


My goal is to parse a large XML file and persist objects to DB based on the XML data, and to do it quickly. The operation needs to be transactional so I can rollback in case there is a problem parsing the XML or an object that gets created cannot be validated.

I am using the Grails Executor plugin to thread the operation. The problem is that each thread I create within the service has its own transaction and session. If I create 4 threads and 1 fails the session for the 3 that didn't fail may have already flushed, or they may flush in the future.

I was thinking if I could tell each thread to use the "current" Hibernate session that would probably fix my problem. Another thought I had was that I could prevent all sessions from flushing until it was known all completed without errors. Unfortunately I don't know how to do either of these things.

There is an additional catch too. There are many of these XML files to parse, and many that will be created in the future. Many of these XML files contain data that when parsed would create an object identical to one that was already created when a previous XML file was parsed. In such a case I need to make a reference to the existing object. I have added a transient isUnique variable to each class to address this. Using the Grails unique constraint does not work because it does not take hasMany relationships into account as I have outlined in my question here.

The following example is very simple compared to the real thing. The XML file's I'm parsing have deeply nested elements with many attributes.

Imagine the following domain classes:

class Foo {
    String ver

    Set<Bar> bars
    Set<Baz> bazs
    static hasMany = [bars: Bar, bazs: Baz]

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        ver(nullable: false)
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}

class Bar {
    String name

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}


class Baz {
    String name

    boolean getIsUnique() {
        Util.isUnique(this)
    }
    static transients = [
        'isUnique'
    ]

    static constraints = {
        isUnique(
            validator: { val, obj ->
                obj.isUnique
            }
        )
    }
}

And here is my Util.groovy class located in my src/groovy folder. This class contains the methods I use to determine if an instance of a domain class is unique and/or retrieve the already existing equal instance:

import org.hibernate.Hibernate

class Util {
    /**
     * Gets the first instance of the domain class of the object provided that
     * is equal to the object provided.
     *
     * @param obj
     * @return the first instance of obj's domain class that is equal to obj
     */
    static def getFirstDuplicate(def obj) {
        def objClass = Hibernate.getClass(obj)
        objClass.getAll().find{it == obj}
    }

    /**
     * Determines if an object is unique in its domain class
     *
     * @param obj
     * @return true if obj is unique, otherwise false
     */
    static def isUnique(def obj) {
        getFirstDuplicate(obj) == null
    }

    /**
     * Validates all of an object's constraints except those contained in the
     * provided blacklist, then saves the object if it is valid.
     *
     * @param obj
     * @return the validated object, saved if valid
     */
    static def validateWithBlacklistAndSave(def obj, def blacklist = null) {
        def propertiesToValidate = obj.domainClass.constraints.keySet().collectMany{!blacklist?.contains(it)?  [it] : []}
        if(obj.validate(propertiesToValidate)) {
            obj.save(validate: false)
        }
        obj
    }
}

And imagine XML file "A" is similar to this:

    <foo ver="1.0">
        <!-- Start bar section -->
        <bar name="bar_1"/>
        <bar name="bar_2"/>
        <bar name="bar_3"/>
        ...
        <bar name="bar_5000"/>

        <!-- Start baz section -->
        <baz name="baz_1"/>
        <baz name="baz_2"/>
        <baz name="baz_3"/>
        ...
        <baz name="baz_100000"/>
    </foo>

And imagine XML file "B" is similar to this (identical to XML file "A" except one new bar added and one new baz added). When XML file "B" is parsed after XML file "A" three new objects should be created 1.) A Bar with name = bar_5001 2.) A Baz with name = baz_100001, 3.) A Foo with ver = 2.0 and a list of bars and bazs equal to what is shown, reusing instances of Bar and Baz that already exist from the import of XML file A:

    <foo ver="2.0">
        <!-- Start bar section -->
        <bar name="bar_1"/>
        <bar name="bar_2"/>
        <bar name="bar_3"/>
        ...
        <bar name="bar_5000"/>
        <bar name="bar_5001"/>

        <!-- Start baz section -->
        <baz name="baz_1"/>
        <baz name="baz_2"/>
        <baz name="baz_3"/>
        ...
        <baz name="baz_100000"/>
        <baz name="baz_100001"/>
    </foo>

And a service similar to this:

class BigXmlFileUploadService {

    // Pass in a 20MB XML file
    def upload(def xml) {
        String rslt = null
        def xsd = Util.getDefsXsd()
        if(Util.validateXmlWithXsd(xml, xsd)) { // Validate the structure of the XML file
            def fooXml = new XmlParser().parseText(xml.getText()) // Parse the XML

            def bars = callAsync { // Make a thread for creating the Bar objects
                def bars = []
                for(barXml in fooXml.bar) { // Loop through each bar XML element inside the foo XML element
                    def bar = new Bar( // Create a new Bar object
                        name: barXml.attribute("name")
                    )
                    bar = retrieveExistingOrSave(bar) // If an instance of Bar that is equal to this one already exists then use it
                    bars.add(bar) // Add the new Bar object to the list of Bars
                }
                bars // Return the list of Bars
            }

            def bazs = callAsync { // Make a thread for creating the Baz objects
                def bazs = []
                for(bazXml in fooXml.baz) { // Loop through each baz XML element inside the foo XML element
                    def baz = new Baz( // Create a new Baz object
                        name: bazXml.attribute("name")
                    )
                    baz = retrieveExistingOrSave(baz) // If an instance of Baz that is equal to this one already exists then use it
                    bazs.add(baz) // Add the new Baz object to the list of Bazs
                }
                bazs // Return the list of Bazs
            }

            bars = bars.get() // Wait for thread then call Future.get() to get list of Bars
            bazs = bazs.get() // Wait for thread then call Future.get() to get list of Bazs

            def foo = new Foo( // Create a new Foo object with the list of Bars and Bazs
                ver: fooXml.attribute("ver")
                bars: bars
                bazs: bazs
            ).save()

            rslt = "Successfully uploaded ${xml.getName()}!"
        } else {
            rslt = "File failed XSD validation!"
        }
        rslt
    }

    private def retrieveExistingOrSave(def obj, def existingObjCache) {
        def dup = Util.getFirstDuplicate(obj)
        obj = dup ?: Util.validateWithBlacklistAndSave(obj, ["isUnique"])
        if(obj.errors.allErrors) {
            log.error "${obj} has errors ${obj.errors}"
            throw new RuntimeException() // Force transaction to rollback
        }
        obj
    }
}

So the question is how do I get everything that happens inside of my service's upload method to act as it happened in a single session so EVERYTHING that happens can be rolled back if any one part fails?

解决方案

Service can be optimized to address some of the pain points:

  • I agree with @takteek, parsing the xml would be time consuming. So, plan to make that part async.
  • You do not need flush on each creation of child object. See below for the optimization.

Service class would look something like:

// Pass in a 20MB XML file
def upload(def xml) {
    String rslt = null
    def xsd = Util.getDefsXsd()
    if (Util.validateXmlWithXsd(xml, xsd)) {
        def fooXml = new XmlParser().parseText(xml.getText())

        def foo = new Foo().save(flush: true)

        def bars = callAsync {
            saveBars(foo, fooXml)
        }

        def bazs = callAsync {
            saveBazs(foo, fooXml)
        }

        //Merge the detached instances and check whether the child objects
        //are populated or not. If children are 
        //Can also issue a flush, but we do not need it yet
        //By default domain class is validated as well.
        foo = bars.get().merge() //Future returns foo
        foo = bazs.get().merge() //Future returns foo

        //Merge the detached instances and check whether the child objects
        //are populated or not. If children are 
        //absent then rollback the whole transaction
        handleTransaction {
             if(foo.bars && foo.bazs){
                foo.save(flush: true)
            } else {
                //Else block will be reached if any of 
                //the children is not associated to parent yet
                //This would happen if there was a problem in 
                //either of the thread, corresponding
                //transaction would have rolled back 
                //in the respective sessions. Hence empty associations.

                //Set transaction roll-back only
                   TransactionAspectSupport
                       .currentTransactionStatus()
                       .setRollbackOnly()

                //Or throw an Exception and 
                //let handleTransaction handle the rollback
                throw new Exception("Rolling back transaction")
            }
        }

        rslt = "Successfully uploaded ${xml.getName()}!"
    } else {
        rslt = "File failed XSD validation!"
    }
    rslt
}

def saveBars(Foo foo, fooXml) {
    handleTransaction {
        for (barXml in fooXml.bar) {
            def bar = new Bar(name: barXml.attribute("name"))
            foo.addToBars(bar)
        }
        //Optional I think as session is flushed
        //end of method
        foo.save(flush: true)
    }

    foo
}

def saveBazs(Foo foo, fooXml) {
    handleTransaction {
        for (bazXml in fooXml.baz) {
            def baz = new Baz(name: bazXml.attribute("name"))
            foo.addToBazs(baz)
        }

        //Optional I think as session is flushed
        //end of method
        foo.save(flush: true)
    }

    foo
}

def handleTransaction(Closure clos){
    try {
        clos()
    } catch (e) {
        TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
    }

    if (TransactionAspectSupport.currentTransactionStatus().isRollbackOnly())
        TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
}

这篇关于如何使用所有使用相同Hibernate会话的多个线程创建服务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆