類似Leetcode中多執行緒有序執行問題

最近因為公司內的一個比較老的軟體在併發執行時出了很多問題,最近主要將其最佳化,其類似於LeetCode上有些題目,工程主要涉及到硬體上,一般硬體數量不是很大,所以併發量並不是很大,在萬級以內,總結下改進方案。

首先是具體的情況: 軟體A可以管理幾千臺機器,要併發的在幾千臺機器上執行一些操作,軟體A在收到上千條REST API call時,需要將將請求簡單排個序 (因為有時會向同一臺機器傳送多條請求,這樣就有先後執行順序), 收到請求後,每個請求對應一臺機器而且每個請求會包含多個task,不同的機器可以並行執行,但對於同一臺機器上的任務要序列執行。怎麼設計多執行緒同步問題合理 ?

具體設計:

(1) 收到一個請求會建立一個繼承於Runnable的Job, 並將其加入到ThreadPool中的同步佇列jobQueue中,並觸發ThreadPool啟動。

(2) 由第一步啟動TheadPool, 處理同步佇列jobQueue中的任務,然後將這些新任務Task排序並加入到第三步的task manager的同步等待佇列addedQueue中,並觸發第三步的task manager開始執行。如果jobQueue中一直沒有任務,會等待20秒後自動停止。

(3) 執行while迴圈,從同步等待佇列addedQueue中拿取任務並執行,直到沒有任務後結束。

程式碼如下:

Task 任務

public class Task implements Runnable { public static enum TaskOrder { PreTask(“PreTask”), WorkTask(“WorkTask”), PostTask(“PostTask”); private String name; private TaskOrder(String name) { this。name = name; } public String toString() { return this。name; } public int getOrder() { return this。ordinal(); } } private String uuid = “”; private TaskOrder order; private volatile boolean taskComplete = false; private List blocked = new ArrayList<>(); // mock 任務 private AtomicInteger record = new AtomicInteger(2); public Task(String uuid, TaskOrder order) { this。uuid = uuid; this。order = order; } public void addBlock(Task task) { blocked。add(task); } public void addBlockList(List taskList) { blocked。addAll(taskList); } public List getBlocked() { return this。blocked; } public String getUuid() { return uuid; } public void setUuid(String uuid) { this。uuid = uuid; } public void setBlocked(List blocked) { this。blocked = blocked; } public boolean getTaskComplete() { return this。taskComplete; } public TaskOrder getOrder() { return order; } public AtomicInteger getRecord() { return record; } @Override public String toString() { return “Task [uuid=” + uuid + “, order= ” + order。getOrder() + “ ”+ order + “, ” + “ number=” + record。get() + “]” ; } @Override public void run() { taskComplete = false; AtomicInteger record = new AtomicInteger(2); while(record。decrementAndGet() > 0) { try { Thread。sleep(20000); } catch (InterruptedException e) { // TODO Auto-generated catch block e。printStackTrace(); } } taskComplete = true; }}

執行任務的TaskManager

public class TaskManager implements Runnable { private static volatile Thread taskManagerProcessingThread = null; private static volatile boolean taskManagerThreadRunning = false; private List taskQueue = new ArrayList<>(); private static volatile LinkedBlockingQueue addedQueue = new LinkedBlockingQueue<>(); private static ReentrantLock modifyLock = new ReentrantLock(); static class CurrentTaskStatus { private static volatile CurrentTaskStatus currentTaskStatus = new CurrentTaskStatus(); private CurrentTaskStatus(){ } public static CurrentTaskStatus getCurrentTaskInstance() { return currentTaskStatus; } private Map> uuidTaskMap = new ConcurrentHashMap<>(); private List allTaskList = new ArrayList<>(); private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); public void setCurrentTaskList(List taskList) { rwLock。writeLock()。lock(); try { allTaskList = new ArrayList<>(taskList); uuidTaskMap = new ConcurrentHashMap<>(); allTaskList。stream()。forEach(task -> { if (uuidTaskMap。containsKey(task。getUuid())) { uuidTaskMap。get(task。getUuid())。add(task); } else { List list = new ArrayList(); list。add(task); uuidTaskMap。put(task。getUuid(), list); } }); } finally { rwLock。writeLock()。unlock(); } } public void AddCurrentTaskList(LinkedBlockingQueue taskList) { rwLock。writeLock()。lock(); try { taskList。stream()。forEach(task -> { if (!uuidTaskMap。containsKey(task。getUuid()) || (uuidTaskMap。containsKey(task。getUuid()) && !uuidTaskMap。get(task。getUuid())。contains(task))) { allTaskList。add(task); if (uuidTaskMap。containsKey(task。getUuid())) { uuidTaskMap。get(task。getUuid())。add(task); } else { List list = new ArrayList(); list。add(task); uuidTaskMap。put(task。getUuid(), list); } } }); } finally { rwLock。writeLock()。unlock(); } } public void AddCurrentTask(Task task) { rwLock。writeLock()。lock(); try { if (!uuidTaskMap。containsKey(task。getUuid()) || (uuidTaskMap。containsKey(task。getUuid()) && !uuidTaskMap。get(task。getUuid())。contains(task))) { allTaskList。add(task); if (uuidTaskMap。containsKey(task。getUuid())) { uuidTaskMap。get(task。getUuid())。add(task); } else { List list = new ArrayList(); list。add(task); uuidTaskMap。put(task。getUuid(), list); } } } finally { rwLock。writeLock()。unlock(); } } public List getTaskListbyUUID(String uuid) { List taskList = new ArrayList<>(); rwLock。readLock()。lock(); try { if (this。uuidTaskMap。containsKey(uuid)) { return this。uuidTaskMap。get(uuid); } } finally { rwLock。readLock()。unlock(); } return taskList; } public Map> getTaskUUIDMap() { Map> map = new ConcurrentHashMap<>(); rwLock。readLock()。lock(); try { map = this。uuidTaskMap; } finally { rwLock。readLock()。unlock(); } return map; } public List getTaskQueue() { List list = new ArrayList<>(); rwLock。readLock()。lock(); try { list = this。allTaskList; } finally { rwLock。readLock()。unlock(); } return list; } } public TaskManager() { } public static void addTask(List taskList) { modifyLock。lock(); try { List taskToAddList = new ArrayList<>(); Map> taskUUIDMap = CurrentTaskStatus。getCurrentTaskInstance()。getTaskUUIDMap(); // Determine if task is already in the queue。 for (Task taskDef : taskList) { if (taskUUIDMap。containsKey(taskDef。getUuid())) { List runningTask = taskUUIDMap。get(taskDef。getUuid()); if (!runningTask。contains(taskDef)) { taskToAddList。add(taskDef); } else { } } else { taskToAddList。add(taskDef); } } for (Task taskDef : taskToAddList) { determineBlocks(taskDef); addedQueue。add(taskDef); CurrentTaskStatus。getCurrentTaskInstance()。AddCurrentTask(taskDef); } startManager(); } catch(Exception e) { } finally { modifyLock。unlock(); } } // Can refine according to order public static void determineBlocks(Task task) { List taskList = CurrentTaskStatus。getCurrentTaskInstance()。getTaskListbyUUID(task。getUuid()); task。addBlockList(taskList); } private static void startManager() { modifyLock。lock(); try { if (taskManagerThreadRunning == false || taskManagerProcessingThread == null) { taskManagerThreadRunning = true; taskManagerProcessingThread = new Thread(new TaskManager()); taskManagerProcessingThread。start(); } } catch(Exception e) { } finally { modifyLock。unlock(); } } @Override public void run() { while (taskManagerThreadRunning) { modifyLock。lock(); try { if (!addedQueue。isEmpty()) { addedQueue。stream()。forEach(task -> { if (!this。taskQueue。contains(task)) { this。taskQueue。add(task); } }); addedQueue。clear(); } CurrentTaskStatus。getCurrentTaskInstance()。setCurrentTaskList(this。taskQueue); if (this。taskQueue。isEmpty()) { taskManagerProcessingThread = null; taskManagerThreadRunning = false; break; } } finally { modifyLock。unlock(); } for(int i = 0; i < taskQueue。size(); i++) { Task task = taskQueue。get(i); List blocked = task。getBlocked(); if (!blocked。isEmpty()) { Iterator iterator = blocked。iterator(); while(iterator。hasNext()) { Task blockedTask = iterator。next(); if (!this。taskQueue。contains(blockedTask)) { iterator。remove(); } } } blocked = task。getBlocked(); if (blocked。isEmpty()) { if (task。getRecord()。get() <= 0) { Utils。printLog(“ ==Completed: ” + task。toString()); taskQueue。remove(i——); } else { Utils。printLog(“ ==Running: ” + task。toString()); task。getRecord()。decrementAndGet(); } } } } Utils。printLog(“==End TaskManager==\n”); }}

接收任務的job

public class Job implements Runnable { private String uuid = “”; public Job(String uuid) { this。uuid = uuid; } public String getUuid() { return uuid; } public void setUuid(String uuid) { this。uuid = uuid; } @Override public void run() { // 根據需求,要建立額外兩個任務,一個是preTask,一個是postTask Task preTask = new Task(this。uuid, TaskOrder。PreTask); Task workTask = new Task(this。uuid, TaskOrder。WorkTask); Task postTask = new Task(this。uuid, TaskOrder。PostTask); List taskList = new ArrayList<>(); taskList。add(preTask); taskList。add(workTask); taskList。add(postTask); TaskManager。addTask(taskList); }}

Thread Pool

public class TaskThreadPool { private static volatile ExecutorService executorService = null; private static volatile ReentrantLock modifyLock = new ReentrantLock(); private static volatile boolean executorAvailable = false; private static LinkedBlockingQueue jobQueue = new LinkedBlockingQueue<>(); private static final long MAX_WAIT_TIME = 20 * 1000; private static volatile long startTime = 0; private static void startMonitorExecutors(long time) { startTime = time; new Thread(new Runnable() { @Override public void run() { boolean running = true; while(running) { while(!jobQueue。isEmpty()) { try { executorService。execute(jobQueue。take()); } catch (InterruptedException e) { } startTime = System。currentTimeMillis(); } executorAvailable = false; long expireTime = System。currentTimeMillis() - startTime; if (expireTime > MAX_WAIT_TIME && !executorAvailable) { modifyLock。lock(); try { if (!executorAvailable) { if (executorService != null) { executorService。shutdown(); } executorService = null; executorAvailable = false; running = false; Utils。printLog(“Stop Thread Pool”); break; } } finally { modifyLock。unlock(); } } try { Thread。sleep(1000); } catch (InterruptedException e) { } } } })。start(); } /** * Add jobs parallel。 * @param thread * @return */ public static boolean executorTask(Runnable thread) { try { jobQueue。put(thread); } catch (InterruptedException e) { e。printStackTrace(); } return startJobs(); } /* * start job。 */ public static boolean startJobs() { if (!executorAvailable) { modifyLock。lock(); try { if (!executorAvailable) { if (executorService == null) { int number = Runtime。getRuntime()。availableProcessors(); if (number > 6) { number = 6; } else if (number < 3) { number = 3; } executorService = new ThreadPoolExecutor(number, number, 0L, TimeUnit。MILLISECONDS, new LinkedBlockingQueue()); startMonitorExecutors(System。currentTimeMillis()); Utils。printLog(“Start Thread Pool”); } executorAvailable = true; } } catch (Exception e) { e。printStackTrace(); } finally { modifyLock。unlock(); } } return true; }}

測試用例

public class UpateEngineTest { public static void main(String[] args) { Utils。printLog(“===============Started============”); // 啟動兩個Thread併發傳送請求 final String preName = Thread。currentThread()。getName() + “ - ”; for(int i = 0; i < 2; i++) { new Thread(new Runnable() { public void run() { for(int j = 0; j < 20; j++) { // deviceName表示裝置名稱 String deviceName = preName + String。valueOf(j); TaskThreadPool。executorTask(new Job(deviceName)); } } })。start(); } Utils。printLog(“===============Ended============”); } }

測試結果:(結果輸出太多,就只截圖了一部分)。

===============Started===========================Ended============Start Thread Pool ==Running: Task [uuid=main - 2, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 0, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 5, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 3, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 1, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 4, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 2, order= 0 PreTask, number=1] ==Running: Task [uuid=main - 0, order= 0 PreTask, number=1] ==Running: Task [uuid=main - 5, order= 0 PreTask, number=1] ==Running: Task [uuid=main - 3, order= 0 PreTask, number=1] ==Running: Task [uuid=main - 1, order= 0 PreTask, number=1] ==Running: Task [uuid=main - 4, order= 0 PreTask, number=1] ==Running: Task [uuid=main - 6, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 7, order= 0 PreTask, number=2] ==Running: Task [uuid=main - 10, order= 0 PreTask, number=2] ==Completed: Task [uuid=main - 2, order= 0 PreTask, number=0] ==Running: Task [uuid=main - 2, order= 1 WorkTask, number=2] ==Completed: Task [uuid=main - 0, order= 0 PreTask, number=0] ==Running: Task [uuid=main - 0, order= 1 WorkTask, number=2] ==Completed: Task [uuid=main - 5, order= 0 PreTask, number=0]

從結果中查詢uuid=main - 2的任務,發現是有序執行的。

Line 4: ==Running: Task [uuid=main - 2, order= 0 PreTask, number=2] Line 10: ==Running: Task [uuid=main - 2, order= 0 PreTask, number=1] Line 19: ==Completed: Task [uuid=main - 2, order= 0 PreTask, number=0] Line 20: ==Running: Task [uuid=main - 2, order= 1 WorkTask, number=2] Line 45: ==Running: Task [uuid=main - 2, order= 1 WorkTask, number=1] Line 68: ==Completed: Task [uuid=main - 2, order= 1 WorkTask, number=0] Line 69: ==Running: Task [uuid=main - 2, order= 2 PostTask, number=2] Line 105: ==Running: Task [uuid=main - 2, order= 2 PostTask, number=1] Line 128: ==Completed: Task [uuid=main - 2, order= 2 PostTask, number=0] Line 137: ==Running: Task [uuid=main - 2, order= 0 PreTask, number=2] Line 168: ==Running: Task [uuid=main - 2, order= 0 PreTask, number=1] Line 194: ==Completed: Task [uuid=main - 2, order= 0 PreTask, number=0] Line 195: ==Running: Task [uuid=main - 2, order= 1 WorkTask, number=2] Line 228: ==Running: Task [uuid=main - 2, order= 1 WorkTask, number=1] Line 254: ==Completed: Task [uuid=main - 2, order= 1 WorkTask, number=0] Line 255: ==Running: Task [uuid=main - 2, order= 2 PostTask, number=2] Line 288: ==Running: Task [uuid=main - 2, order= 2 PostTask, number=1] Line 311: ==Completed: Task [uuid=main - 2, order= 2 PostTask, number=0]