文档结构  
可译网翻译有奖活动正在进行中,查看详情 现在前往 注册?
原作者:Nikita Koksharov (2016-09-13)    来源:Dzone [英文]
coyee    计算机    2016-09-14    1评/10503阅
翻译进度:已翻译   参与翻译: 淡蓝色 (5)

能够立即执行或者调度一个任务或工作成为一个典型的现代分布式Java应用程序需求。对于那些使用Redis的人来说,这种需求也变得更重要。

Redisson现在提供了一个新的方便的方法来执行这样的分布式任务执行和调度,它是通过标准JDK的ExecutorService和ScheduledExecutorService API实现的,被提交的任务在Redisson节点服务器上运行,它们连接同一个Redis数据库。

Image title

Redisson 节点

Redisson节点是标准的Java应用,唯一目标就是执行被提交的任务,每个Redisson节点可以看成是分布式环境中远程worker。

第 1 段(可获 1.46 积分)

它也可以通过一个Redisson实例在主要应用中跨多个进程。

所有任务类都是动态加载的,所以你不需要将他们在放在Redisson节点的路径中并重新启动任务类的变化。

任务定义

一个任务应该实现 java.util.concurrent.Callable或java.lang.Runnable interface接口。

Here is an example using Callable interface:

下面是一个Callable接口实现的示例:

public class CallableTask implements Callable<Long> {

    @RInject
    private RedissonClient redissonClient;

    private long anyParam;

    public CallableTask() {
    }

    public CallableTask(long anyParam) {
        this.anyParam = anyParam;
    }

    @Override
    public Long call() throws Exception {
        // ...
    }

}

下面是Runnable 接口实现:

public class RunnableTask implements Runnable {

    @RInject
    private RedissonClient redissonClient;

    private long anyParam;

    public RunnableTask() {
    }

    public RunnableTask(long anyParam) {
        this.anyParam = anyParam;
    }

    @Override
    public void run() {
        // ...
    }

}

任务能够通过构造器赋予参数。一个被提交的任务能够通过@RInject注释访问Redisson实例。这里提供了对所有Redisson功能的任务访问:Redis 基于 Maps, Multimaps, Sets, Lists, Queues, Locks, Semaphores, PublishSubscribe,和许多其他的 objects, collections, locks, and services

第 2 段(可获 1.54 积分)

提交任务执行

通过ExecutorService API使我们提交一个任务变得很容易,因为RExecutorService 已经实现了java.util.concurrent.ExecutorService。 

RExecutorService executorService = redisson.getExecutorService("myExecutor");

executorService.submit(new RunnableTask());
// or with parameter
executorService.submit(new RunnableTask(41));

executorService.submit(new CallableTask());
// or with parameter
executorService.submit(new CallableTask(53));

提交任务调度执行

调度任务被提交是由通过RScheduledExecutorService实现 java.util.concurrent.ScheduledExecutorService接口完成的

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");

executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES);
// or 
executorService.schedule(new RunnableTask(), 5, TimeUnit.SECONDS);

executorService.scheduleAtFixedRate(new RunnableTask(), 10, 25, TimeUnit.HOURS);
// or
executorService.scheduleWithFixedDelay(new RunnableTask(), 5, 10, TimeUnit.HOURS);

调度任务使用Cron表达式

正如我们所理解的现代应用程序的复杂程度可以有多高,任务调度程序服务允许用户定义更加复杂的调度使用cron表达式。它的格式可以和与Quartz cron format完全兼容。

第 3 段(可获 1.15 积分)
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");

executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));
// or
executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// or
executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

任务取消

任何任务都可以在任何阶段被取消:

Future<?> f = executorService.schedule(...);
// or
Future<?> f = executorService.submit(...);

f.cancel(true);

不需要额外的工作来处理取消除了在当一个任务已经在运行阶段的情况下。在这种情况下,取消处理类似于Java执行线程中断。

下面是将所有值聚合在大的Redis map中,这个过程会花费很长时间

public class CallableTask implements Callable<Long> {

    @RInject
    private RedissonClient redissonClient;

    @Override
    public Long call() throws Exception {
        RMap<String, Integer> map = redissonClient.getMap("myMap");
        Long result = 0;
        for (Integer value : map.values()) {           
        // check if task has been canceled
           if (Thread.currentThread().isInterrupted()) {
                // task has been canceled
                return null;
           }
           result += value;
        }
        return result;
    }

}

 

第 4 段(可获 0.85 积分)

扩展异步模式

所有的标准方法都暴露在 java.util.concurrent.ScheduledExecutorServicejava.util.concurrent.ExecutorService,接口提交任务给ExecutorService是同步的,处理结果接受可以使用异步的,这是通过标准java.util.concurrent.Future实现。Redisson还提供一套RExecutorServiceAsync.*Async方法也可以用来以异步方式提交任务,并允许获取任务id和绑定一个未来的对象。

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");

RFuture<MyResultObject> future = executorService.submitAsync(new CallableTask());
// or
RScheduledFuture<MyResultObject> future = executorService.scheduleAsync(new RunnableTask(), 5, TimeUnit.SECONDS);
// or
RScheduledFuture<MyResultObject> future = executorService.scheduleAtFixedRateAsync(new RunnableTask(), 10, 25, TimeUnit.HOURS);

future.addListener(new FutureListener<MyResultObject>() {
     public void operationComplete(Future<MyResultObject> f) { 
         // ...
     }
});

// cancel task by id
String taskId = future.getId();
// ...
executorService.cancelScheduledTask(taskId);

Redisson 节点是我们已添加到集合的有用工具集,由 Redisson 提供的最新改变游戏规则的特征。我们致力于在未来一段日子带来更多的功能,请耐心等待,要有耐心。

第 5 段(可获 1.35 积分)

文章评论

不服高人有罪
这个的意义在哪?
仅仅为了让task在远程执行?
这样的话至少多了2次网络传输工作吧,而且storm也可以提供这样的计算功能啊。