jpa 与 https 请求多线程弹簧 [英] jpa with https request multithreading spring

查看:26
本文介绍了jpa 与 https 请求多线程弹簧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理 spring JPAHTTP post 请求,逐行获取数据,然后将数据发布到 API 的 HTTP 请求中,它对我来说效果很好,但我在这里处理大量数据,所以我必须使用多线程,但我是 Java 和 Spring 新手,我该如何实现以使用 10 个线程,并且每个线程每次并行读取 1k 次?

I'm working with spring JPA and HTTP post request, fetching the data row by row then post the data into HTTP request to API and its worked fine with me, but here im working with bulk number of data, so i have to use multi-threading but im new with java and spring how do I implement to work with 10 thread and each one of them reads 1k per each time in parallel that here ?

我已经阅读了有关 10 个线程的多线程的一些内容,其中每个线程每次读取 1k 行,我的数据库中有大约 1000 万条记录

i have read something about multithreading for 10 threads each thread of them read 1k row per each time, I have around 10 million records in my database

访问DataJpaApplication 类:

AccessingDataJpaApplication class :

@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {

    private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
    @Autowired

    private Bulk_repositoryRepository bulk_repositoryRepository;


    public static void main(String[] args) {
        SpringApplication.run(AccessingDataJpaApplication.class);
    }
    Date currentDate = new Date();

    @Override
    public void run(String... args) throws Exception {
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
        headers.setBasicAuth("user", "pass");

        while(true) {
            Date currentDate = new Date();
            logger.info("Just Started"); 
            for (Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate)) {
                System.out.print(churnss);
                logger.info(churnss.toString());
                AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

                logger.info(AddOffer.toString());
                HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

                ResponseEntity<String> responseEntity = restTemplate.exchange(
                        "api link", HttpMethod.POST, entity, String.class);

                if(responseEntity.getStatusCode() == HttpStatus.OK){
                    String response = responseEntity.getBody();
                    churnss.setStatus(1);
                    churnss.setProcessDate(new Date());
                    churnss.setFulfilment_status(response);
                    logger.info(churnss.toString() + ", Response: " + response);
                    bulk_repositoryRepository.save(churnss);
                }else {
                    logger.warn("Record Id: " + churnss.getId() + ", Http Failed Response: " + responseEntity.getStatusCode());
                }
            }
            Thread.sleep(1000);
        }
    }

}

Bulk_repository 类:

Bulk_repository class:

@Entity
@Table(name = "BULK_REPOSITORY")
public class Bulk_repository {

   @Id
   @GeneratedValue(strategy=GenerationType.AUTO)
   @Column(name = "id")
   private long id;

   @Column(name = "msisdn")
   private String msisdn;

   @Column(name = "camp_start_date")   
   private Date campStartDate;

   @Column(name = "camp_end_date")
   private Date campEndDate;

   @Column(name = "camp_type")
   private int campType;

   @Column(name = "camp_cd")
   private String camp_cd;

   @Column(name = "status")
   private int status;

   @Column(name = "process_date")
   private Date processDate;

   @Column(name = "entry_date")
   private Date entryDate;

   @Column(name = "entry_user")
   private String entry_user;

   @Column(name = "param1")
   private String param1;

   @Column(name = "param2")
   private String param2;

   @Column(name = "param3")
   private String param3;

   @Column(name = "param4")
   private String param4;

   @Column(name = "param5")
   private String param5;

   @Column(name = "error_desc")
   private String error_desc;

   @Column(name = "fulfilment_status")
   private int fulfilment_status;
   ##then getter and setters and tostring

Bulk_repositoryRepository 类:

Bulk_repositoryRepository class :

public interface Bulk_repositoryRepository extends CrudRepository<Bulk_repository, Long> {

      Date today = new Date();
      List<Bulk_repository>findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(int status, int campType,Date today0, Date today1);
      Bulk_repository findById(long id);
}

AddOfferRequest 类:

AddOfferRequest class :

public class AddOfferRequest {

    private String ChannelID="113";
    private String MSISDN;
    private String ServiceID;

    public AddOfferRequest() {
    }
    public AddOfferRequest(String channelID,String mSISDN,String serviceID ) {
        this.MSISDN = mSISDN;
        this.ServiceID = serviceID;

    }
    ## then getter and setter and tostring

我已经创建了 AsyncConfiguration 类:

i have created AsyncConfiguration class:

package com.example.accessingdatajpa;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;


@Configuration
@EnableAsync
public class AsyncConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);
    @Bean (name = "taskExecutor")
    public Executor taskExecutor() {
        LOGGER.debug("Creating Async Task Executor");
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("CarThread-");
        executor.initialize();
        return executor;
    }
}

但直到现在我无法理解如何将 findby 和 http 帖子与多线程结合起来

but till now i can't undertand how can combaine the findby and http post with multithreading

推荐答案

重写您的代码.代替 List 返回 Stream.这将延迟从数据库加载记录,而不是尝试一次完成所有操作.

Rewrite your code. Instead of a List<Bulk_repository> return a Stream<Bulk_repository>. This will lazily load the records from the database, instead of trying to do everything at once.

然后使用TaskExecutor来执行每个线程不同的请求,给它一个任务,当有空闲线程时就会执行.

Then use the TaskExecutor to execute the different requests per thread, just give a task to it and it will be executed when there is a free thread.

@SpringBootApplication
public class AccessingDataJpaApplication implements CommandLineRunner {

    private static final Logger logger = LoggerFactory.getLogger(AccessingDataJpaApplication.class);

    @Autowired
    private Bulk_repositoryRepository bulk_repositoryRepository;

    @Autowired
    private AsyncTaskExecutor executor;

    @Autowired
    private RestTemplate rest;

    public static void main(String[] args) {
        SpringApplication.run(AccessingDataJpaApplication.class);
    }

    @Override
    public void run(String... args) throws Exception {
        Date currentDate = new Date();

        Stream< Bulk_repository> results = Bulk_repository churnss : bulk_repositoryRepository.findAllByStatusAndCampTypeAndCampStartDateLessThanEqualAndCampEndDateGreaterThanEqual(0,2,currentDate,currentDate);

        results.forEach(it -> executor.submit(this.process(it)));
        Thread.sleep(1000);
    }

    private void process(RestTemplate rest, Bulk_repository churnss) {
      AddOfferRequest AddOffer = new AddOfferRequest("113", churnss.getMsisdn(),churnss.getParam1());

      HttpEntity<AddOfferRequest> entity = new HttpEntity<AddOfferRequest>(AddOffer,headers);

      try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(
                        "api link", HttpMethod.POST, entity, String.class);
         if(responseEntity.getStatusCode() == HttpStatus.OK){
           String response = responseEntity.getBody();
           churnss.setStatus(1);
           churnss.setProcessDate(new Date());
           churnss.setFulfilment_status(response);
           bulk_repositoryRepository.save(churnss);
         }else {
           logger.warn("Record Id: {}, Http Failed Response: {}",churnss.getId(), responseEntity.getStatusCode());
                }
      } catch (RestClientException rce) {
          logger.warn("Record Id: {} Http Failed. ", churnss.getId(), rce);
      }               
    }

}

注意:这是从我的头顶输入的,未经测试.不过应该提供一些指导.

NOTE: This was typed from the top of my head and isn't tested. However should provide some guidance.

这篇关于jpa 与 https 请求多线程弹簧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆