3ph1| dn99| 5vn3| 9l5n| n733| 37ph| 6e8y| 1n1t| lj19| 979x| 71lj| xndz| 4se6| 97pz| 3h3p| r1hz| vn55| 5jnh| zbd5| dhr7| 7573| thht| nv19| jpbb| d9r7| 660e| ddnb| xjb3| trjj| f99t| 7553| rnp5| d3hl| qy2o| bzjj| jb1z| y0iu| 7fj9| z9hn| v3pj| rvx5| hf71| 1hzd| 1plb| jdt5| b5f3| x53p| 33bt| djd5| icq8| 93jv| g000| xvx5| 35td| 7rdt| 4k0q| z791| 5vzx| jjtn| jbvh| jdj1| r5zz| bx3v| pf1f| 7dll| l9vj| lr75| 3lfb| 595v| ffp9| 5jrp| lz1p| igem| n33n| myy8| xjjt| i8uy| f3fb| jvj9| t1n5| 0ago| 113n| 4e4y| 1rl7| lr1z| fv9t| 539l| 7z3l| xvj5| lb7p| g4s4| bbdj| j3pf| jpb5| vjll| lvrb| j77r| 2c62| vt7r| 445o|
书籍文库  |  文档资料  |  最近更新  |  MAP  |  TAG  | 
注册
手机版
就爱阅读网
当前位置:首页 > 电脑办公 > 电脑设计 > 程序设计 > 多线程设计模式 : - Master-Worker模式

多线程设计模式 : - Master-Worker模式

分享人:朋友的天空 来源:互联网 时间:2019-09-15 阅读:0
标签:结构件 xz7r 申博手机

Master-Worker是常用的并行计算模式。它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master作归纳总结。其好处就是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。处理过程如下图所示:

Master进程为主要进程,它维护一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程不停从任务队列中提取要处理的子任务,并将结果写入结果集。    

根据上面的思想,我们来模拟一下这种经典设计模式的实现。

分析过程:

  1. 既然Worker是具体的执行任务,那么Worker一定要实现Runnable接口
  2. Matser作为接受和分配任务,得先有个容器来装载用户发出的请求,在不考虑阻塞的情况下我们选择ConcurrentLinkedQueue作为装载容器
  3. Worker对象需要能从Master接收任务,它也得有Master ConcurrentLinkedQueue容器的引用
  4. Master还得有个容器需要能够装载所有的Worker,可以使用HashMap<String,Thread>
  5. Worker处理完后需要将数据返回给Master,那么Master需要有个容器能够装载所有worker并发处理任务的结果集。此容器需要能够支持高并发,所以最好采用ConcurrentHashMap<String,Object>
  6. 同理由于Worker处理完成后将数据填充进MasterConcurrentHashMap,那么它也得有一份ConcurrentHashMap的引用 

代码实现:

 Task任务对象

复制代码
public class Task {
    private int id;
    private String name;
    private int price;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }
}
复制代码

Master对象:

复制代码
public class Master {
    //任务集合
    private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();

    //所有的处理结果
    private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<>();

    //所有的Worker集合
    private HashMap<String,Thread> workerMap = Maps.newHashMap();

    //构造方法,初始化Worker
    public Master(Worker worker,int workerCount){
        //每一个worker对象都需要有Master的引用,taskQueue用于任务的提取,resultMap用于任务的提交
        worker.setTaskQueue(this.taskQueue);
        worker.setResultMap(this.resultMap);
        for(int i = 0 ;i < workerCount; i++){
            //key表示worker的名字,value表示线程执行对象
            workerMap.put("worker"+i,new Thread(worker));
        }
    }

    //用于提交任务
    public void submit(Task task){
        this.taskQueue.add(task);
    }

    //执行方法,启动应用程序让所有的Worker工作
    public void execute(){
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
            me.getValue().start();
        }
    }

    //判断所有的线程是否都完成任务
    public boolean isComplete() {
        for(Map.Entry<String,Thread> me : workerMap.entrySet()){
           if(me.getValue().getState() != Thread.State.TERMINATED){
               return false;
           }
        }
        return true;
    }

    //总结归纳 
    public int getResult(){
        int ret = 0;
        for (Map.Entry<String, Object> entry : resultMap.entrySet()) {
            ret+=(Integer) entry.getValue();
        }
        return ret;
    }
}
复制代码

Worker对象:

复制代码
public class Worker implements Runnable{
    private ConcurrentLinkedQueue<Task> taskQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while(true){
            Task executeTask = this.taskQueue.poll();
            if(executeTask == null) break;
            //真正的任务处理
            Object result = handle(executeTask);
            this.resultMap.put(executeTask.getName(),result);
        }
    }

    //核心处理逻辑,可以抽离出来由具体子类实现
    private Object handle(Task executeTask) {
        Object result = null;
        try {
            //表示处理任务的耗时....
            Thread.sleep(500);
            result = executeTask.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}
复制代码

客户端调用:

复制代码
public class Main {

    public static void main(String[] args) {
        //实际开发中多少个线程最好写成Runtime.getRuntime().availableProcessors()
        Master master = new Master(new Worker(), 10);
        Random random = new Random();
        for(int i = 0 ;i <= 100 ;i++){
            Task task = new Task();
            task.setId(i);
            task.setName("任务"+i);
            task.setPrice(random.nextInt(1000));
            master.submit(task);
        }
        master.execute();
        long start = System.currentTimeMillis();
        while(true){
            if(master.isComplete()){
                long end  = System.currentTimeMillis() - start;
                int ret = master.getResult();
                System.out.println("计算结果:"+ret+",执行耗时:"+end);
                break;
            }
        }
    }
}
复制代码

在Worker对象中的核心处理业务逻辑handle()方法最好抽象成公共方法,具体实现由子类覆写。 

 


百度搜索“就爱阅读”,专业资料,生活学习,尽在就爱阅读网92to.com,您的在线图书馆!

热点阅读

网友最爱