자바 nio selector 이용하여 응답을 기다리지 않고 메시지를 보낼때 속도를 조절한 아이디어 어디 없을까요?

emptynote의 이미지

자바 nio selector 이거 물건입니다.
저같이 시스템 프로그래밍 경험이 없어도 비동기 서버를 손쉽게 만들게 해 주기때문입니다.
그렇지만 클라이언트용 서버 접속 API 는 쉽지 않네요.

비동기 특성을 이용하면 응답을 기다리지 않고 메시지를 보낼 수 있는데 여기에 문제가 발생합니다.
서버 처리 용량을 고려 않고 무조건 응답을 기다리지 않고 메시지를 보내게 되면 서버 자원이 고갈되어 응답을 보낼 수 없게 됩니다.

이것에 대한 해법 무엇이 좋을까요?
그래서 찾은 답이 클라이언트에서 보내는 속도 조절입니다.

그런데 어떻게 속도를 조절하면 좋을까요?

이게 제 고민입니다.

제가 이것을 구현한 방법은 서버로 보낸 메시지들은 '송신 끝난 메시지 큐' 와 '송신중 메시지 큐' 이렇게 2개 큐에 분리하여 저장하고

2개 큐에 저장된 메시지 총 갯수는 최대 갯수 제한을 걸어 제한을 하며

'송신 끝난 메시지 큐' 에 담긴 메시지는 '생존 시간'을 주워 클라이언트 측에서 서버로 메시지를 보내는 속도를 조절하고 있습니다.

그런데 이 방법은 속도를 조절하는 주체가 클라이언트라서 서버 입장에서 능동적으로 자원 고갈에 대해서 대비할 수 없습니다.

서버에 대해서 이야기 하면 끝이 없으니

지금 여기서 도움을 얻고자 하는것은 '자바 nio selector 이용하여 응답을 기다리지 않고 메시지를 보낼때 속도를 조절한 아이디어' 입니다.

저랑 다른 아이디어 갖고 계신 분 서로 아이디어를 공유를 했으면 합니다.

swish95의 이미지

서버쪽에서 데이타를 받아서 파일로 쓰든 처리를 하든 끝나야 보내도록 하면 되지 않을까요?

UDP 야 보내면 끝이니까 그렇지만 TCP 로 하면 받는 쪽에서 명확한 처리가 가능하니까
받는 서버쪽에서 기다리라고 하면 클라이언트는 기다리지 않을까요?

------------------------------------------------------------
ProgrammingHolic

emptynote의 이미지

그것도 한 방법이지만 비동기 특성이 죽습니다.

응답을 기다리지 않고 요청을 해야 비동기입니다.

저는 오픈 소스인 RPC 서버를 기반으로 하는 개발 프레임워크 '코다' 개발자입니다.

제가 이 방식에 익숙하고 무엇보다도 나름 이 방식이 장점을 가졌다고 생각하기때문에 맨땅에 헤딩중입니다.

서버 접속 API 가 있고 2 종류가 있으며

(1) 자바 JDK 1.2 구 IO 를 이용한 connection pool

(2) 자바 JDK NIO 를 이용한 connection pool

2개를 지원중이며

(1) 항의 connection pool 에서 얻은 '연결'(=소켓?)은 (1-1) '동기 메시지' : 요청후 응답을 받은 후 다시 요청을 보낼 수 있고

(2) 항의 connection pool 에서 얻은 '연결'(=소켓?)은 (2-1) '비동기 메시지' : 응답을 기다리지 않고 요청 할수 도 있고 (2-2) '동기 메시지' : 요청후 응답을 받은 후 재 요청 할 수 있습니다.

'동기 메시지' 는 응답을 받은 후 재 요청을 할 수 있으므로 서버측에 응답이 쌓일 일이 없어 결과적으로 자원이 고갈 되지 않았습니다.

'비동기 메시지' 의 경우 서버측에서는 순식간에 클라이언트에 보낼 응답이 어마 무시하게 쌓여 서버 자원이 고갈됩니다.

아쉽게도 말씀하신 방법은 '동기 메시지' 로 이미 구현을 해서 잘 사용중입니다.

제가 고민중인 문제는 '비동기 메시지' 를 어떻게 속도 조절하며 보낼 것인가 입니다.

cogniti3의 이미지

그게 마땅한 방법이 없을 겁니다. 예를 들어,
1억명이 코다 서버에 접속한다고 칩시다. 결국엔 사실상 물리적인 서버 또는 네트웍을 증설하는거 밖에는 마땅한 방법이 없을 겁니다. 서버 손좀 보면 1억명 처리할 거 2억명 처리하고 그럴 수는 있겠지만요.

jick의 이미지

서버가 스스로 자원 관리를 하는 수밖에 없을 것 같은데요.

말하자면, 서버가 스스로 "현재 큐에 쌓여있는 작업의 양"을 계속 체크하고 있어야 한다는 거죠. 이 큐에 쌓여있는 작업은 클라이언트에서 방금 받은 요청, 현재 처리중인 요청, 그리고 아직 클라이언트한테 보내지 못한 결과를 다 포함합니다. (물론 진짜 제대로 하려면 이 세 가지의 용량을 별도로 체크하고 있을 수도 있습니다.)

그래서 큐가 지정한 사이즈를 넘어가면 작업이 처리되어 큐의 크기가 줄어들 때까지 더 이상 새로운 요청은 받지 않는 거죠. 서버가 계속 요청을 거부(?)하면 서버 커널의 TCP 버퍼에 들어오는 요청이 쌓일 테고, 이 버퍼마저 가득 차면 서버가 클라이언트에게 "더 이상 데이터를 받을 수 없음" 메시지를 보낼 거고, 그러면 클라이언트의 TCP 버퍼에 데이터가 쌓이기 시작할 거고... 이 버퍼마저 가득차면 더 이상 클라이언트가 메시지를 보낼 수 없게 됩니다.

물론, 악의적인 클라이언트가 있을 경우 마음먹고 요청을 폭주시키면 다른 선의의(?) 클라이언트가 아무 요청도 처리할 수 없는 상태가 될 수 있습니다. 그러므로 이런 경우까지 고려하려면 이제 dos (denial-of-service) protection 같은 걸 연구해야 할 텐데... 뭐 거기까지 가면 기존 주제에서 너무 벗어난 것 같으니 그만합시다. :)

emptynote의 이미지

답변 감사합니다.

서버에 관한 이야기는 끝이 없으니 하지 않는것 이해 부탁드립니다.

서버는 버림의 미학을 선택하여 큐가 가득차 있다면 가감하게 버리도록 했습니다.

-----------------------------------------------------------------------------
이제 부터 본론입니다.

속도 조절을 하지 않는 클라이언트는 악의를 갖고 공격하는 클라이언트다

이러게 악의를 갖는 클라이언트를 정의하고 있습니다.

하여 제가 작성하여 제공할

응답을 기다리지 않고 요청을 보내는 기능을 가진

'클라이언트용 서버 접속 API' 에 '속도 조절' 기능을 넣고자 고민을 해서

나름 아이디를 내어서 구현을 했고

더 나은 아이디가 있는지 조언을 구한것입니다.

아래 소스 보시면 2개 핵심 큐 'finishedStreamBufferArrayDeque' 와 'workingStreamBufferArrayDeque' 를 보실 수 있을겁니다.

2개 큐는 논리적인 하나의 큐로써 최대 크기를 갖고 있으며

서버로 보내야할 메시지는 'workingStreamBufferArrayDeque' 에 담기고

전송이 다 끝난 시점에서 'finishedStreamBufferArrayDeque' 로 옮겨지게 되어

생존시간 동안은 유효성을 갖고 finishedStreamBufferArrayDeque 안에서 존재하게 됩니다.

이렇게 생존시간을 두게 되면 논리적인 하나의 큐에 넣는 속도가 조절되어

결국은 서버로 전송되는 속도 역시 조절 되게 되는것이 구현 핵심 아이디어입니다.

============ 관련 소스

/*******************************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *  
 * <a href="http://www.apache.org/licenses/LICENSE-2.0
" rel="nofollow">http://www.apache.org/licenses/LICENSE-2.0
</a> *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package kr.pe.codda.client.connection.asyn;
 
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
 
import kr.pe.codda.common.etc.CommonStaticFinalVars;
import kr.pe.codda.common.exception.NoMoreWrapBufferException;
import kr.pe.codda.common.exception.OutgoingStreamTimeoutException;
import kr.pe.codda.common.exception.RetryException;
import kr.pe.codda.common.exception.TimeoutDelayException;
import kr.pe.codda.common.io.ClientOutgoingStreamIF;
import kr.pe.codda.common.io.StreamBuffer;
 
/**
 * 클라이언트용 송신 스트림, 내부적으로는 스트립 버퍼 환영 큐(=ArrayDeque)로 관리한다.
 * 
 * @author Won Jonghoon
 *
 */
public class ClientOutgoingStream implements ClientOutgoingStreamIF {
	protected Logger log = Logger.getLogger(CommonStaticFinalVars.CORE_LOG_NAME);
 
	private final ReentrantLock lock = new ReentrantLock();
 
	private final ClientIOEventControllerIF asynClientIOEventController;
	private final SelectionKey ownerSelectionKey;
	private final int streamBufferQueueCapacity;
 
	private final ArrayDeque<StreamBuffer> finishedStreamBufferArrayDeque;
	private final ArrayDeque<StreamBuffer> workingStreamBufferArrayDeque;
	private final long aliveTimePerWrapBuffer;
 
	private transient StreamBuffer workingStreamBuffer = null;
 
	/**
	 * 생성자
	 * 
	 * @param asynClientIOEventController     비동기 클라이언트 입출력 이벤트 제어자
	 * @param ownerSelectionKey               소유 세렉션 키
	 * @param outputStreamBufferQueueCapacity 메시지가 담기는 '스트림 버퍼'를 원소로 갖는 환영 큐 크기
	 * @param aliveTimePerWrapBuffer          랩버퍼 1개당 생존 시간, 단위 : nanoseconds
	 */
	public ClientOutgoingStream(ClientIOEventControllerIF asynClientIOEventController, SelectionKey ownerSelectionKey,
			int outputStreamBufferQueueCapacity, long aliveTimePerWrapBuffer) {
		if (null == asynClientIOEventController) {
			throw new IllegalArgumentException("the parameter asynClientIOEventController is null");
		}
 
		if (null == ownerSelectionKey) {
			throw new IllegalArgumentException("the parameter ownerSelectionKey is null");
		}
 
		if (outputStreamBufferQueueCapacity <= 0) {
			throw new IllegalArgumentException(
					"the parameter outputStreamBufferQueueCapacity is less than or equal to zero");
		}
 
		if (aliveTimePerWrapBuffer <= 0) {
			throw new IllegalArgumentException("the parameter aliveTimePerWrapBuffer is less than or equal to zero");
		}
 
		this.asynClientIOEventController = asynClientIOEventController;
		this.ownerSelectionKey = ownerSelectionKey;
		this.streamBufferQueueCapacity = outputStreamBufferQueueCapacity;
		this.aliveTimePerWrapBuffer = aliveTimePerWrapBuffer;
 
		workingStreamBufferArrayDeque = new ArrayDeque<StreamBuffer>(streamBufferQueueCapacity);
		finishedStreamBufferArrayDeque = new ArrayDeque<StreamBuffer>(streamBufferQueueCapacity);
	}
 
	@Override
	public void add(StreamBuffer messageStreamBuffer, long timeout)
			throws OutgoingStreamTimeoutException, RetryException, TimeoutDelayException, InterruptedException {
		if (null == messageStreamBuffer) {
			throw new IllegalArgumentException("the parameter messageStreamBuffer is null");
		}
 
		// FIXME!
		// log.info("call offer in client");
 
		boolean isLocked = lock.tryLock(timeout, TimeUnit.MILLISECONDS);
 
		if (!isLocked) {
			throw new OutgoingStreamTimeoutException("fail to get this client outgoing stream's lock");
		}
 
		try {
			long lockBeginTime = System.nanoTime();
			long endTimeForTimeout = lockBeginTime + timeout * CommonStaticFinalVars.ONE_MILLISECONDS_EXPRESSED_IN_NANOSECONDS;
 
			final int streamBufferCount = finishedStreamBufferArrayDeque.size() + workingStreamBufferArrayDeque.size();
 
			// FIXME!
			// log.info("streamBufferCount="+streamBufferCount);
 
			if (streamBufferCount == streamBufferQueueCapacity) {
				// FIXME!
				// log.info("최대 치 도달에 따른 기다림 시작");
 
				if (finishedStreamBufferArrayDeque.isEmpty()) {
					throw new RetryException();
				}
 
				StreamBuffer finishedStreamBuffer = finishedStreamBufferArrayDeque.peekFirst();
				long expiredTime = finishedStreamBuffer.getExpiredTime();
 
				/*
				// FIXME!
				log.info("lockBeginTime="+lockBeginTime);
				log.info("expiredTime="+expiredTime);
				log.info("endTimeForTimeout="+endTimeForTimeout);
				log.info("비교결과1="+ (lockBeginTime < expiredTime));
				log.info("비교결과2="+ (endTimeForTimeout < expiredTime));
				*/
 
 
				if (lockBeginTime < expiredTime) {
 
					if (endTimeForTimeout < expiredTime) {
						throw new TimeoutDelayException(timeout - (System.nanoTime() - lockBeginTime) / CommonStaticFinalVars.ONE_MILLISECONDS_EXPRESSED_IN_NANOSECONDS);
					}
					long waitingTime = (expiredTime - lockBeginTime);
 
					long millis = waitingTime / CommonStaticFinalVars.ONE_MILLISECONDS_EXPRESSED_IN_NANOSECONDS;
					int nanos = (int) (waitingTime % CommonStaticFinalVars.ONE_MILLISECONDS_EXPRESSED_IN_NANOSECONDS);
 
					Thread.sleep(millis, nanos);
 
				}
 
				finishedStreamBufferArrayDeque.removeFirst();
 
			}
 
			workingStreamBufferArrayDeque.addLast(messageStreamBuffer);
 
			if (null == workingStreamBuffer) {
				workingStreamBuffer = messageStreamBuffer;
			}
 
			turnOnSocketWriteMode();
		} finally {
			lock.unlock();
		}
	}
 
	@Override
	public int write(SocketChannel writableSocketChannel) throws IOException, NoMoreWrapBufferException {
		if (null == workingStreamBuffer) {
			return 0;
		}
 
		int ret = workingStreamBuffer.write(writableSocketChannel);
 
		if (! workingStreamBuffer.hasRemaining()) {
 
			boolean isLocked = lock.tryLock();
 
			if (isLocked) {
				try {
 
					StreamBuffer finishedStreamBuffer = workingStreamBufferArrayDeque.removeFirst();
					finishedStreamBuffer.releaseAllWrapBuffers();
 
					finishedStreamBuffer.setExpiredTimeBasedOnPosition(aliveTimePerWrapBuffer);
					finishedStreamBufferArrayDeque.add(finishedStreamBuffer);
 
					// FIXME!
					// log.info("작업중인 스트림을 종료된 스트림으로 이동");
 
					if (workingStreamBufferArrayDeque.isEmpty()) {
						// FIXME!
						// log.info("송신할 스트림 없음");
 
						workingStreamBuffer = null;
						/** socket write event turn off */
						ret = -1;
 
						turnOffSocketWriteMode();
					} else {
						workingStreamBuffer = workingStreamBufferArrayDeque.peekFirst();
 
						// FIXME!
						// log.info("작업 버퍼의 내용 송신 완료로 인한 새 작업 버퍼로 교체");
					}
				} finally {
					lock.unlock();
				}
			}
		}
 
		return ret;
 
	}
 
	/**
	 * SelectionKey.OP_WRITE 등록
	 * 
	 * @throws CancelledKeyException 멤버 변수 'ownerSelectionKey' 가 접속 종료등으로 등록 취소 되었을
	 *                               경우 던지는 예외
	 */
	private void turnOnSocketWriteMode() throws CancelledKeyException {
		ownerSelectionKey.interestOps(ownerSelectionKey.interestOps() | SelectionKey.OP_WRITE);
 
		asynClientIOEventController.wakeup();
	}
 
	/**
	 * SelectionKey.OP_WRITE 취소
	 * 
	 * @throws CancelledKeyException 멤버 변수 'ownerSelectionKey' 가 접속 종료등으로 등록 취소 되었을
	 *                               경우 던지는 예외
	 */
	private void turnOffSocketWriteMode() throws CancelledKeyException {
		ownerSelectionKey.interestOps(ownerSelectionKey.interestOps() & ~SelectionKey.OP_WRITE);
	}
 
	@Override
	public void close() {
		while (!workingStreamBufferArrayDeque.isEmpty()) {
			workingStreamBufferArrayDeque.removeFirst().releaseAllWrapBuffers();
		}
	}
}

swish95의 이미지

님이 말한 그런식을 지원하려면 작성글에서 제시한 방법 말고는 마땅한 방법은 없겠죠
네트워크가 됐든 디스크가 됐든 그런 방식이면 서버 자원은 고갈 될거니까요

그런데 그런식으로 응답을 기다리는 큐를 검사하게 되면 전제로 한 비동기로 보내는 의미와 모순이 되는게 아닐까 조심스럽게 의견 개진 해봅니다. ^^

------------------------------------------------------------
ProgrammingHolic

emptynote의 이미지

큐는 입력 메시지 추가하는 속도만 조절한 뿐 응답 안 기다립니다.

응답을 기다리면 비동기가 아니죠.