자바에서 Observable은 Java 9 버전부터 java.util.concurrent.Flow 패키지에 속하는 인터페이스 입니다.
Observable 사용하는 이유는 리액티브 프로그래밍을 구현하기 위해 사용돼고 있습니다.
Publisher가 데이터 스트림을 생성하면 Subscriber가 구독해서 데이터를 소비하도록 동작합니다.
주요 구성요소
Publisher 발행자
데이터 스트림을 생성하고 Subscriber에 데이터를 등록합니다.
등록방법은 subscribe(Subscriber) 메서드를 이용하여 등록할 수 있습니다.
Subscriber 구독자
Publisher가 생성한 데이터 스트림을 구독해서 데이터를 소비합니다.
Subscriber 인터페이스를 구현한 클래스에서 onNext(), onError(), onComplete() 등의 메서드를 사용하여 발행된 데이터를 처리하고 에러 발생을 처리할 수 있도록 핸들링 합니다.
Subscription 구독
Publisher와 Subscriber 간의 연결을 나타내는 인터페이스 입니다. Subscriber는 이를 통해 Publisher로 부터 데이터를 요청하고 구독을 취소할 수 있습니다.
Processor 처리자
발행자와 구독자 사이에서 데이터를 가공하거나 중간에서 처리하는 역할을 수행합니다.
소스코드
public class Main {
public static void main(String[] args) {
// SubmissionPublisher 생성
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// Subscriber 생성
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 구독자가 요청한 만큼 아이템을 전송
}
@Override
public void onNext(String item) {
System.out.println("받은 아이템: " + item);
subscription.request(1); // 다음 아이템을 요청
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
publisher.close(); // 에러 발생 시 퍼블리셔 닫기
}
@Override
public void onComplete() {
System.out.println("완료");
publisher.close(); // 스트림이 완료되면 퍼블리셔 닫기
}
};
// 퍼블리셔와 구독자 연결
publisher.subscribe(subscriber);
// 아이템 전송
publisher.submit("Hello");
publisher.submit("World");
// 완료 통지
publisher.close();
}
}
subscription.request(1) 에서 매개변수 1은 구독자가 요청한 데이터의 아이템 개수를 나타냅니다.
코드에서는 매개변수 1을 사용하여 구독자가 다음 아이템을 하나씩 처리할 수 있도록 요청하는 것을 의미합니다.
백프레셔 개념
소비자가 데이터를 처리하는 속도가 생산자가 데이터를 생성하는 속도보다 느릴 때 발생됩니다. 이 경우 백프레셔는 생산자의 데이터 생산 속도를 소비자가 처리하는 속도에 맞추도록 요청합니다.
백프레셔 전략
1. 요청 기반 벡프레셔
소비자가 요청한 데이터 양을 제한하고 그 이상의 데이터는 생성되지 않도록 합니다.
onSubscription 객체를 사용해서 데이터를 요청합니다.
request() 메서드를 호출하여 생산자에게 요청하는 데이터 양을 지정할 수 있습니다.
Publisher는 Subscriber로부터 요청된 데이터 양만큼 데이터를 생성해서 전달합니다.
Subscriber가 요청한 양 이외 데이터는 생성하지 않습니다.
2. 버퍼링
생산자가 생성한 데이터를 일시적으로 버퍼에 저장하고 일정량 이상 데이터가 생성되었을 때 소비자에 전달합니다.
데이터 폭증을 관리할 수 있는 장점이 있습니다.
3. 드롭
데이터 생산이 너무 빠르면 소비자에게 데이터를 전달하지 않고 버립니다.
4. 반압
데이터를 생성하지 않도록 요청하거나 일시적으로 멈추도록 합니다.
벡프레셔 사용 목적은 메모리 부하와 처리량을 제어하여 시스템 안정성을 보장하기 위해 사용됩니다.
IO 패키지 (InputStream, read(), FileReader, FileOutputStream) (0) | 2024.02.29 |
---|---|
병렬 처리 (동시성, 병렬성), 병렬 스트림, 처리성능 요소 (0) | 2024.02.27 |
스트림과 병렬 처리 (0) | 2024.02.21 |
요소 수집 collect(), Collector Supplier (0) | 2024.02.20 |
요소 그룹핑, 집계 Collector groupingByConcurrent (1) | 2024.02.13 |
댓글 영역