2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > 用java把通达信导出的交易记录文件高速批量导入mysql数据库

用java把通达信导出的交易记录文件高速批量导入mysql数据库

时间:2023-08-26 07:34:20

相关推荐

用java把通达信导出的交易记录文件高速批量导入mysql数据库

有段时间需要把通达信的交易记录导入到mysql数据库,主要是1分钟和5分钟数据。对比测试过innoDB和MyISAM引擎,不论从数据访问速度还是数据文件空间占用上比较,MyISAM都大占优势。Mysql数据版本为:5.5.47Ā

导入mysql数据库性能最好的应该使用load data,但是为了通用性,同时也想通过java程序实现更极致的性能。故记录下过程。

程序性能:在我笔记本上测试下来,导入速度大约在2.5万3万每秒。

电脑配置:双核四线程,比较低配了。

处理思路:

读取导出交易数据文件目录中的所有文件名,存放到String[]中

启动4个线程读取文件,把数据加工成:insert into <table>values(),(),()这样的语句统一放入BlockingQueue<String>中。

当BlockingQueue<String>存放的数据量达到600,开始执行批量insert到数据库。直到处理完所有文件。

建表:

-- 一分钟数据表名为trade1f,五分钟数据表名为trade5f,字段完全一致。

CREATE TABLE `trade1f` (

`id` int(10) unsigned NOT NULL AUTO_INCREMENT,

`dttime` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '时间',

`code` char(6) DEFAULT NULL COMMENT '股票代码',

`openPrc` float DEFAULT NULL COMMENT '开盘价',

`maxPrc` float DEFAULT NULL COMMENT '最高价',

`minPrc` float DEFAULT NULL COMMENT '最低价',

`closePrc` float DEFAULT NULL COMMENT '收盘价',

`vol` bigint(20) DEFAULT NULL COMMENT '成交量',

`volMoney` bigint(20) DEFAULT NULL COMMENT '成交金额',

`bg` enum('SH','SZ') DEFAULT NULL COMMENT 'SH OR SZ',

`createDt` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (`id`),

KEY `dttime` (`dttime`),

KEY `code` (`code`)

) ENGINE=MyISAMAUTO_INCREMENT=1 DEFAULT CHARSET=utf8

源代码如下:

package itil.stock.bs;

import java.io.File;

import java.io.FilenameFilter;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.Calendar;

import java.util.Date;

import java.util.List;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.TimeUnit;

import com.jfinal.plugin.activerecord.Db;

import com.jfinal.plugin.activerecord.DbKit;

import com.jfinal.plugin.activerecord.Record;

import mon.Kit;

public class GatherFromTdxFile3 {

private static final int batchSize=30;

private static final int valueSize=20;

private static final String sdfStr="yyyy-MM-dd HH:mm:ss";

private static final String insert="INSERT into <table>(dttime,code,openPrc,maxPrc,minPrc,closePrc,vol,volMoney,bg)values";

private static final BlockingQueue<String> insertQueue = new LinkedBlockingQueue<String>(100000);

//此逻辑是使用insert into values(),(),()....一个sql插入多条,一批次含有多条sql

//这里不按文件来执行插入,直到对列里满足待插入的数据数量才执行插入数据库,故一次提交会包含多个文件的数据

public static void hdTradeData(String fileName){

final String fname = fileName.substring(fileName.lastIndexOf("\\")+1);//SZ#300737.txt

String bg = fname.substring(0,2);// SZ or SH

List<String> listData = Kit.readFileByLines(fileName, "GBK");

String info = listData.get(0);

String tableName = "trade5f";

if(info.contains("1分钟线")){

tableName="trade1f";

}else if(info.contains("日线 ")){

tableName="tradeday";

}

boolean isNotDayData = !(tableName.equals("tradeday"));

String[] sk = info.split("\\s");

String code = sk[0];

//String name = info.replace(code,"").replace("1分钟线 不复权", "").replace("5分钟线 不复权", "").trim();

String openPrc,maxPrc,minPrc,closePrc;//用批量方式插入,生成sql语句,不需要管数据类型

String vol,volMoney;

StringBuilder sbValues = null;

int all=0;

// String lastDtStr=null;

// Timestamp dbLastDttime = null;

// boolean ckNone = false;

for(int line=2;line<listData.size()-1;line++){//跳过前两行与最后一行

String lineStr = listData.get(line);

String[] data = lineStr.split("\\s");

final String dtStr = data[0];

String dttimeStr = null;

if(isNotDayData){

String time = data[1];

dttimeStr = dtStr+" "+time.substring(0, 2)+":"+time.substring(2)+":"+"00";

}else{

dttimeStr = dtStr+" 00:00:00";

}

//**********检查当前code与日期是存否在**********

//本逻辑只检测已导入的当天日期最后一条,只有时间在最后一条之后才插入数据库

//如果能保证数据文件不存在重复,注释此段代码,将大大提升导入效率

//使用ThreadLocal提升初始化SimpleDateFormat效率

/*Date dttime = Kit.parse(dttimeStr,sdfStr);

if(!dtStr.equals(lastDtStr)){

lastDtStr=dtStr;

Record ckRec = Db.findFirst("select max(dttime) dttime from "+tableName+" t "

+ "where t.code='"+code+"' and t.dttime='"+dttimeStr+"'");

if(ckRec.get("dttime")!=null){

dbLastDttime=ckRec.getTimestamp("dttime");

}else{

ckNone=true;

}

if(dbLastDttime!=null && !dttime.after(dbLastDttime)){

continue;

}

}

if(!ckNone && dbLastDttime!=null && !dttime.after(dbLastDttime)){

continue;

}*/

//**********检查当前code与日期是存否在**********

all++;

if(all%valueSize==1){

String tmp = insert.replace("<table>", tableName);

sbValues = new StringBuilder(200*valueSize);

sbValues.append(tmp);

}

if(isNotDayData){

openPrc=data[2];

maxPrc=data[3];

minPrc=data[4];

closePrc=data[5];

vol = data[6];

volMoney = data[7];

}else{

openPrc=data[1];

maxPrc=data[2];

minPrc=data[3];

closePrc=data[4];

vol = data[5];

volMoney = data[6];

}

sbValues.append("(\'").append(dttimeStr).append("\',");

sbValues.append('\'').append(code).append("\',");

sbValues.append(openPrc).append(",");

sbValues.append(maxPrc).append(",");

sbValues.append(minPrc).append(",");

sbValues.append(closePrc).append(",");

sbValues.append(vol).append(",");

sbValues.append(volMoney).append(",");

sbValues.append('\'').append(bg).append("\'),");

if(all%valueSize==0){

putQueue(sbValues.substring(0, sbValues.length()-1).toString());

sbValues = null;

if(insertQueue.size()>600){

saveListToDb(getListForSave(false));

}

}

}

if(sbValues!=null){

putQueue(sbValues.substring(0, sbValues.length()-1).toString());

sbValues = null;

}

}

public static void saveListToDb(List<String> sqlList){

if(sqlList!=null && sqlList.size()>0){

Db.batch(sqlList, batchSize);

//System.out.println("saved:"+sqlList.size()*valueSize);

sqlList.clear();

}

}

public static void putQueue(String sql){

try {

insertQueue.put(sql);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

public static synchronized List<String> getListForSave(boolean isforceSave){

if(!isforceSave && insertQueue.size()<10){

return null;

}

List<String> sqlList = new ArrayList<String>(insertQueue.size()*batchSize*200);

while (insertQueue.size()>0) {

final String sql = insertQueue.poll();

if(sql!=null){

sqlList.add(sql);

}

}

return sqlList;

}

//检查一个文件第一条数据是否存在

public static Boolean checkExist(String dir,String[] fileList){

Boolean chk = true;

for(String rec:fileList){

chk = checkExist(dir+"\\"+rec);

if(chk!=null){

return chk;

}

}

return true;

}

public static Boolean checkExist(String dir){

List<String> listData = Kit.readFileByLines(dir, "GBK");

if(listData.size()<4){

return null;//对于被检测的文件,数据不合法将返回null,重新检测下一个文件

}

String info = listData.get(0);

String tableName = "trade5f";

if(info.contains("1分钟线")){

tableName="trade1f";

}

String lineStr = listData.get(2);//取第一行数据

String[] data = lineStr.split("\\s");

final String dtStr = data[0];

String time = data[1];

final String dttimeStr = dtStr+" "+time.substring(0, 2)+":"+time.substring(2)+":"+"00";

Record rec = Db.findFirst("select 1 from "+tableName+" where dttime='"+dttimeStr+"' limit 1");

System.out.println("检查时间:"+dttimeStr+" 是否存在记录");

if(rec==null){

return false;

}

return true;

}

//下载数据保存到数据库

public static void hdFileData(final String dir) {

File file = new File(dir);

if(!file.isDirectory()){

if(checkExist(dir)){

System.out.println("已存在记录,程序退出........");

return;

}

hdTradeData(dir);

return;

}

String[] fileList = file.list(new FilenameFilter(){

public boolean accept(File dir,String name){

return name.startsWith("SH#")||name.startsWith("SZ#");

}

});

if(checkExist(dir,fileList)){

System.out.println("已存在记录,程序退出........");

return;

}

BlockingQueue<String> queue = new LinkedBlockingQueue<String>(5000);

for(String rec:fileList){

try {

queue.put(dir+"\\"+rec);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

int threadNum = Integer.parseInt(Kit.readPropByKey("threadNum", 4));

System.out.println("待处理文件数:"+queue.size()+" 处理线程数:"+threadNum);

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadNum);

long begin = System.currentTimeMillis();

while (queue.size()>0) {

final String rec = queue.poll();

fixedThreadPool.execute(new Runnable() {

public void run() {

hdTradeData(rec);

}

});

}

// 关闭服务后, 阻塞到所有任务被执行完毕或者超时发生,或当前线程被中断

try {

fixedThreadPool.shutdown(); // 平缓关闭服务

fixedThreadPool.awaitTermination(10, TimeUnit.HOURS);

} catch (InterruptedException e) {

e.printStackTrace();

fixedThreadPool.shutdownNow();

}

saveListToDb(getListForSave(true));//最后一次强制执行

long end = System.currentTimeMillis();

System.out.println("处理完成,时间:"+(end-begin)/1000.0+"秒");

}

public static void gen30fData(){

Record t5fMaxRec = Db.findFirst("select max(dttime) dttime from trade5f");

Date t5fMaxDate = t5fMaxRec.getDate("dttime");

if(!Kit.format(t5fMaxRec.getTimestamp("dttime"), sdfStr).endsWith(" 15:00:00")){

System.out.println("5f数据不是15点结束");

return;

}

Record t30fMaxRec = Db.findFirst("select max(dttime) dttime from trade30f");

String sql = null;

if(t30fMaxRec.get("dttime")==null){

sql = "select dttime,code,openPrc,maxPrc,minPrc,closePrc,vol,volMoney,bg from trade5f order by code,dttime";

}else if(t5fMaxDate.getTime()<=(t30fMaxRec.getDate("dttime")).getTime()){

System.out.println("数据已存在");

return;

}else if(t5fMaxDate.after(t30fMaxRec.getDate("dttime"))){

Calendar cld = Calendar.getInstance();

cld.setTime(t30fMaxRec.getDate("dttime"));

cld.add(Calendar.DATE, 1);

sql = "select dttime,code,openPrc,maxPrc,minPrc,closePrc,vol,volMoney,bg from trade5f where dttime>='"+Kit.format(cld.getTime(), "yyyy-MM-dd")+"' and dttime<='"+Kit.format(t5fMaxDate, sdfStr)+"' order by code,dttime";

}else{

System.out.println("else case.........");

return;

}

long begin = System.currentTimeMillis();

List<Record> list = new ArrayList<Record>(batchSize);

int all=0;

try{

Connection conn = DbKit.getConfig().getConnection();

PreparedStatement ps = conn.prepareStatement(sql,

ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

ps.setFetchSize(Integer.MIN_VALUE);

ResultSet rs = ps.executeQuery();

Record preRec = new Record();

preRec.set("code", null);

float openPrc = 0;

float closePrc = 0;

float minPrc = 0;

float maxPrc = 0;

long vol=0;

long volMoney=0;

while(rs.next()){

String dttimeStr = Kit.format(rs.getTimestamp("dttime"), sdfStr);

String code = rs.getString("code");

if(dttimeStr.endsWith(":35:00")||dttimeStr.endsWith(":05:00")){//取整点和30分时的下一5f作为初始化

openPrc=rs.getFloat("openPrc");

minPrc = rs.getFloat("minPrc");

maxPrc = rs.getFloat("maxPrc");

vol = rs.getLong("vol");

volMoney = rs.getLong("volMoney");

}

if(code.equals(preRec.getStr("code"))){

if(rs.getFloat("minPrc")<minPrc){

minPrc = rs.getFloat("minPrc");

}

if(rs.getFloat("maxPrc")>maxPrc){

maxPrc = rs.getFloat("maxPrc");

}

vol+=rs.getLong("vol");

volMoney+=rs.getLong("volMoney");

if(dttimeStr.endsWith(":00:00")||dttimeStr.endsWith(":30:00")){

closePrc = rs.getFloat("closePrc");

Record t30f = new Record();

t30f.set("dttime", rs.getTimestamp("dttime"));

t30f.set("code", code);

t30f.set("openPrc", openPrc);

t30f.set("maxPrc", maxPrc);

t30f.set("minPrc", minPrc);

t30f.set("closePrc", closePrc);

t30f.set("vol", vol);

t30f.set("volMoney", volMoney);

t30f.set("bg", rs.getString("bg"));

list.add(t30f);

all++;

if(all%batchSize==0){

Kit.batchSave("trade30f", list);

list.clear();

System.out.println("saved:"+all);

}

}

}

preRec.set("code", code);

}

} catch (Exception e) {

e.printStackTrace();

}

Kit.batchSave("trade30f", list);

list.clear();

System.out.println("saved:"+all);

long end = System.currentTimeMillis();

System.out.println("处理结束,用时:"+(end-begin)/1000.0+"秒");

}

public static void main(String[] args) {

int cnt = 10000000;

long begin = System.currentTimeMillis();

for(int i=0;i<cnt;i++){

SimpleDateFormat sdf = Kit.getDateFormat(sdfStr);//new SimpleDateFormat(sdfStr);

try {

sdf.parse("-02-11 16:36:39");

} catch (ParseException e) {

e.printStackTrace();

}

}

long end = System.currentTimeMillis();

System.out.println((end-begin)/1000.0);

begin = System.currentTimeMillis();

for(int i=0;i<cnt;i++){

Kit.parse("-02-11 16:36:39",sdfStr);

}

end = System.currentTimeMillis();

System.out.println((end-begin)/1000.0);

}

}

为了方便,以上程序使用的是jfinal框架的数据库操作,使用JDBC也是同样道理,批量提交的逻辑大概是:

Connection conn=xxxx

Statement st = conn.createStatement();

st.addBatch(sql语句1);st.addBatch(sql语句2);st.addBatch(sql语句3);st.addBatch(sql语句n);…………

st.executeBatch();

程序里也包含了用5分钟数据高效生成30分钟数据的逻辑,可参考。

如果电脑性能好,请修改线程池中的线程数,比如8线程可修改为7:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);

程序中的初始化数字请自行修改批次大小:

batchSize=30;

valueSize=20;

Kit.readFileByLines是把文件按行读到List中,代码为:

public static List<String> readFileByLines(String fileName,String encode) {

List<String> list = new ArrayList<String>();

try {

InputStreamReader isr = new InputStreamReader(new FileInputStream(

fileName), encode);

BufferedReader reader = new BufferedReader(isr);

String line = null;

while ((line = reader.readLine()) != null) {

if (line.length() > 0)

list.add(line);

}

reader.close();

isr.close();

} catch (Exception e) {

e.printStackTrace();

}

return list;

}

其它调用到Kit啥的大家都看得懂,自行实现。

程序为了实现更高性能,对于数据已存在的检测只对第一个有效文件的第一行进行检测,如果已存在数据,将不再进行导入。

mysql几个关键的配置:

myisam_max_sort_file_size=40G

key_buffer_size=2048M

innodb_buffer_pool_size=4096M

java调用程序:

GatherFromTdxFile3.hdFileData("C:\\new_tdx2\\T0002\\export");

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。