2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > list 分批_Java大数据量(多线程)分段分批处理

list 分批_Java大数据量(多线程)分段分批处理

时间:2022-08-21 18:10:37

相关推荐

list 分批_Java大数据量(多线程)分段分批处理

Java大数据量(多线程)分段分批处理

发布时间:-07-13 10:44,

浏览次数:2609

, 标签:

Java

分段处理主类

github地址:

/zuojingang/common-tools-intergrated/blob/master/src/main/java/pers/zuo/component/piecewise/PiecewiseHandler.java

package ponent.piecewise; import java.util.ArrayList; import

java.util.List;import java.util.concurrent.Callable; import

java.util.concurrent.ExecutionException;import

java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;

import ponent.piecewise.bean.PiecewiseKey; import

ponent.piecewise.bean.PiecewiseResult;import

ponent.piecewise.bean.PiecewiseTask;/** * @author zuojingang * *

@param * the type of part process return */ public abstract class

PiecewiseHandler { public void nThreads( final Map

PiecewiseResult>>> nThreadResult,final int

totalNum)throws Exception { nThreads(nThreadResult, totalNum, D_THREAD_SIZE,

D_PART_SIZE); }/** * @param totalNum * @param threadSize * @return nThreads

process result. */ public void nThreads( final Map

PiecewiseResult>>> nThreadResult,final int

totalNum,final int threadSize, final int partSize) throws Exception { if (null

== nThreadResult ||0 >= totalNum || 0 >= threadSize) { return; }

ExecutorService fixThreadPool = Executors.newFixedThreadPool(D_N_THREAD);

List fTaskList =new ArrayList<>(); int fromIndex = 0; try { while

(totalNum > fromIndex) {final int thisFromIndex = fromIndex; final int

threadProcessNum = Math.min(totalNum - fromIndex, threadSize);final int

thisToIndex = thisFromIndex + threadProcessNum;if (0 < threadProcessNum) {

PiecewiseTask futureTask = PiecewiseBuilder.buildTask(new Callable() {

@Override public Boolean call() throws Exception { final Map

PiecewiseResult> threadResult = PiecewiseBuilder .initializeThreadResult();

nThreadResult.put(PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex),

PiecewiseBuilder.buildResult(threadResult)); singleThread(threadResult,

thisFromIndex, threadProcessNum, partSize);return true; } },

PiecewiseBuilder.buildKey(thisFromIndex, thisToIndex));

fixThreadPool.submit(futureTask); fTaskList.add(futureTask); } fromIndex +=

threadProcessNum; } boolean finished =true; for (PiecewiseTask futureTask :

fTaskList) {try { finished = finished && futureTask.get(); } catch

(InterruptedException | ExecutionException e) {

nThreadResult.get(futureTask.getTaskKey()).setException(e); } } }catch

(Exception e) {throw e; } finally { // the threadPool must manual-lock after use

fixThreadPool.shutdown(); } } public void singleThread(final Map

PiecewiseResult> threadResult,final int totalNum) {

singleThread(threadResult,0, totalNum); } public void singleThread(final

Map> threadResult,final int offset, final int

totalNum) { singleThread(threadResult, offset, totalNum, D_PART_SIZE); }/** *

@param offset * @param toIndex * @param partSize * @return process subList

values and include first index(offset) and exclude * latest index(offset +

totalNum) */ public void singleThread(final Map

PiecewiseResult> threadResult,final int offset, final int totalNum, final

int partSize) {if (0 >= totalNum || 0 >= partSize) { return; } final int

toIndex = offset + totalNum; int fromIndex = offset;while (toIndex > fromIndex)

{ int thisToIndex = Math.min(fromIndex + partSize, toIndex); V partResult =null

; Exception pe =null; try { partResult = partProcess(fromIndex, thisToIndex); }

catch (Exception e) { pe = e; }

threadResult.put(PiecewiseBuilder.buildKey(fromIndex, thisToIndex),

PiecewiseBuilder.buildResult(partResult, pe)); fromIndex = thisToIndex; } }/** *

@param offset * @param partSize * @return part process result */ protected

abstract V partProcess(final int fromIndex, final int toIndex) throws

Exception; public staticfinal int D_N_THREAD = 10; public static final int

D_THREAD_SIZE =10000; public static final int D_PART_SIZE = 1000; }

分段任务定制类

package ponent.piecewise.bean; import java.util.ArrayList; import

java.util.List;import java.util.concurrent.Callable; import

java.util.concurrent.FutureTask;/** * @author zuojingang * * @param

extends Number> the type of part process return */ public class PiecewiseTask

extends FutureTask { private final PiecewiseKey taskKey; public

PiecewiseTask(Callable callable, PiecewiseKey taskKey) {super

(callable);this.taskKey = taskKey; } public PiecewiseKey getTaskKey() { return

taskKey; } }

分段任务Key值类

package ponent.piecewise.bean; public class PiecewiseKey { private

final Integerfrom; private final Integer to; public PiecewiseKey(Integer from,

Integer to) { super();this.from = from; this.to = to; } public Integer getFrom

() {return from; } public Integer getTo() { return to; } @Override public int

hashCode() { final int prime = 31; int result = 1; result = prime * result + ((

from == null) ? 0 : from.hashCode()); result = prime * result + ((to == null) ?

0 : to.hashCode()); return result; } @Override public boolean equals(Object

obj) {if (this == obj) return true; if (obj == null) return false; if

(getClass() != obj.getClass())return false; PiecewiseKey other = (PiecewiseKey)

obj;if (from == null) { if (other.from != null) return false; } else if (!from

.equals(other.from)) return false; if (to == null) { if (other.to != null)

return false; } else if (!to.equals(other.to)) return false; return true; } }

分段任务返回值类

package ponent.piecewise.bean; public class PiecewiseResult {

private final V val; private Exception exception; public PiecewiseResult(V val)

{ super();this.val = val; } public PiecewiseResult(V val, Exception exception)

{ super();this.val = val; this.exception = exception; } public Exception

getException() { return exception; } public void setException(Exception

exception) {this.exception = exception; } public V getVal() { return val; } }

获取实例工具类

package ponent.piecewise.manager; import java.util.HashMap; import

java.util.Map;import java.util.concurrent.Callable; import

ponent.piecewise.bean.PiecewiseKey;import

ponent.piecewise.bean.PiecewiseResult;import

ponent.piecewise.bean.PiecewiseTask;public class PiecewiseBuilder {

public static PiecewiseKey buildKey(Integer from, Integer to) { return new

PiecewiseKey(from, to); }public static PiecewiseResult buildResult(V

val) {return new PiecewiseResult(val); } public static PiecewiseResult

buildResult(V val, Exception exception) { return new PiecewiseResult(val,

exception); }public static PiecewiseTask buildTask(Callable callable,

PiecewiseKey taskKey) {return new PiecewiseTask(callable, taskKey); } /** *

this method aimed for simple when define the nThreadResult * * @return */ public

static Map

PiecewiseResult>>>initializeNThreadResult() { return new HashMap<>(); } /**

* this method aimed for simple when define the threadResult * * @return */

public static Map> initializeThreadResult

() {return new HashMap<>(); } }

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。