package com.wjf.zhibo;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import jdk.nashorn.internal.runtime.logging.Logger;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@SpringBootTest
@Log4j2
class ZhiboApplicationTests {
private final static ThreadPoolExecutor selfExecutor = new ThreadPoolExecutor(10,10,1, TimeUnit.MINUTES,new LinkedBlockingDeque<>(5),new ThreadPoolExecutor.CallerRunsPolicy());
@SneakyThrows
@Test
void contextLoads() throws Exception {
/*
多线程调用,再合并
*/
Observable.zip(getDynamic(), getShare(), getPre(), getPlane(), getFiducial(),
(dynamic, share, pre, plane, fiducial) -> {
return dynamic+" "+share+" "+plane+" "+pre+" "+fiducial;
}).subscribeOn(Schedulers.from(selfExecutor)).blockingSubscribe(e->{
log.info("blockingSubscribe :"+e);
});
long startTime = System.currentTimeMillis();
// 数组调用
List<Integer> idList = new ArrayList<>();
for (int i = 0; i <100 ; i++) {
idList.add(i);
}
Observable.fromArray(idList.toArray(new Integer[0])).flatMap(
i->Observable.just(i).subscribeOn(Schedulers.from(selfExecutor)).map(v->rpcTest(v))
).blockingSubscribe(v->log.info("blockingSubscribe: "+v));
long endTime = System.currentTimeMillis();
log.info("当前方法总耗时:"+(endTime-startTime));
}
public static void main(String[] args) {
}
private Integer rpcTest(Integer param ){
try {
Thread.sleep(1000);
log.info("rpcTest:执行 "+param);
} catch (InterruptedException e) {
e.printStackTrace();
}
return param;
}
private Observable<String> getDynamic() throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String result = "getDynamic";
try {
Thread.sleep(1000);
/* 调用服务业务处理*/
} catch (Exception ex) {
log.info(ex.getMessage(),ex);
}
log.info( " get getDynamic info end");
e.onNext(result);
e.onComplete();
}
}).subscribeOn(Schedulers.from(selfExecutor));
}
private Observable<String> getPre() throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String result = "getPre";
try {
Thread.sleep(1000);
/* 调用服务业务处理*/
} catch (Exception ex) {
log.info(ex.getMessage(),ex);
}
log.info( " get getPre info end");
e.onNext(result);
e.onComplete();
}
}).subscribeOn(Schedulers.from(selfExecutor));
}
private Observable<String> getPlane() throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String result = "getPlane";
try {
Thread.sleep(1000);
/* 调用服务业务处理*/
} catch (Exception ex) {
log.info(ex.getMessage(),ex);
}
log.info( " get plane info end");
e.onNext(result);
e.onComplete();
}
}).subscribeOn(Schedulers.from(selfExecutor));
}
private Observable<String> getWork() throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String result = "getWork";
try {
Thread.sleep(1000);
/* 调用服务业务处理*/
} catch (Exception ex) {
log.info(ex.getMessage(),ex);
}
log.info( " get getWork info end");
e.onNext(result);
e.onComplete();
}
}).subscribeOn(Schedulers.from(selfExecutor));
}
private Observable<String> getShare() throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String result = "getShare";
try {
Thread.sleep(1000);
/* 调用服务业务处理*/
} catch (Exception ex) {
log.info(ex.getMessage(),ex);
}
log.info( " get getWork info end");
e.onNext(result);
e.onComplete();
}
}).subscribeOn(Schedulers.from(selfExecutor));
}
private Observable<String> getFiducial() throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
String result = "getFiducial";
try {
Thread.sleep(1000);
/* 调用服务业务处理*/
} catch (Exception ex) {
log.info(ex.getMessage(),ex);
}
log.info( " get getFiducial info end");
e.onNext(result);
e.onComplete();
}
}).subscribeOn(Schedulers.from(selfExecutor));
}
}