java之ReactiveX开发编写案例

package com.wjf.zhibo;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import jdk.nashorn.internal.runtime.logging.Logger;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@SpringBootTest
@Log4j2
class ZhiboApplicationTests {

	private final static ThreadPoolExecutor selfExecutor = new ThreadPoolExecutor(10,10,1, TimeUnit.MINUTES,new LinkedBlockingDeque<>(5),new ThreadPoolExecutor.CallerRunsPolicy());

	@SneakyThrows
	@Test
	void contextLoads() throws Exception {

		/*
		多线程调用,再合并
		 */
		Observable.zip(getDynamic(), getShare(), getPre(), getPlane(), getFiducial(),
				(dynamic, share, pre, plane, fiducial) -> {
					return dynamic+" "+share+" "+plane+" "+pre+" "+fiducial;
				}).subscribeOn(Schedulers.from(selfExecutor)).blockingSubscribe(e->{
					log.info("blockingSubscribe :"+e);
		});

		long startTime  = System.currentTimeMillis();

		// 数组调用
		List<Integer> idList = new ArrayList<>();
		for (int i = 0; i <100 ; i++) {
			idList.add(i);
		}

		Observable.fromArray(idList.toArray(new Integer[0])).flatMap(
					i->Observable.just(i).subscribeOn(Schedulers.from(selfExecutor)).map(v->rpcTest(v))
				).blockingSubscribe(v->log.info("blockingSubscribe:  "+v));

		long endTime  = System.currentTimeMillis();
		log.info("当前方法总耗时:"+(endTime-startTime));
	}

	public static void main(String[] args) {

	}

	private Integer rpcTest(Integer param ){
		try {

			Thread.sleep(1000);

			log.info("rpcTest:执行 "+param);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		return param;
	}

	private Observable<String> getDynamic() throws Exception {
		return Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String result = "getDynamic";
				try {
					Thread.sleep(1000);
					/* 调用服务业务处理*/
				} catch (Exception ex) {
					log.info(ex.getMessage(),ex);
				}
				log.info( " get getDynamic info end");

				e.onNext(result);
				e.onComplete();
			}
		}).subscribeOn(Schedulers.from(selfExecutor));
	}
	private Observable<String> getPre() throws Exception {
		return Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String result = "getPre";
				try {
					Thread.sleep(1000);
					/* 调用服务业务处理*/
				} catch (Exception ex) {
					log.info(ex.getMessage(),ex);
				}
				log.info( " get getPre info end");

				e.onNext(result);
				e.onComplete();
			}
		}).subscribeOn(Schedulers.from(selfExecutor));
	}

	private Observable<String> getPlane() throws Exception {
		return Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String result = "getPlane";
				try {
					Thread.sleep(1000);
					/* 调用服务业务处理*/
				} catch (Exception ex) {
					log.info(ex.getMessage(),ex);
				}
				log.info( " get plane info end");

				e.onNext(result);
				e.onComplete();
			}
		}).subscribeOn(Schedulers.from(selfExecutor));
	}


	private Observable<String> getWork() throws Exception {
		return Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String result = "getWork";
				try {
					Thread.sleep(1000);
					/* 调用服务业务处理*/
				} catch (Exception ex) {
					log.info(ex.getMessage(),ex);
				}
				log.info( " get getWork info end");
				e.onNext(result);
				e.onComplete();

			}
		}).subscribeOn(Schedulers.from(selfExecutor));
	}

	private Observable<String> getShare() throws Exception {
		return Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String result = "getShare";
				try {
					Thread.sleep(1000);
					/* 调用服务业务处理*/
				} catch (Exception ex) {
					log.info(ex.getMessage(),ex);
				}
				log.info( " get getWork info end");
				e.onNext(result);
				e.onComplete();

			}
		}).subscribeOn(Schedulers.from(selfExecutor));
	}
	private Observable<String> getFiducial() throws Exception {
		return Observable.create(new ObservableOnSubscribe<String>() {
			@Override
			public void subscribe(ObservableEmitter<String> e) throws Exception {
				String result = "getFiducial";
				try {
					Thread.sleep(1000);
					/* 调用服务业务处理*/
				} catch (Exception ex) {
					log.info(ex.getMessage(),ex);
				}
				log.info( " get getFiducial info end");
				e.onNext(result);
				e.onComplete();

			}
		}).subscribeOn(Schedulers.from(selfExecutor));
	}
}