AI 동시성 이슈 극복 과정과 성능 개선 (feat. AWS Lambda)

장진수 · 원프레딕트 백엔드 엔지니어
October 25, 2023

기존에 발행된 글을 정리해서 다시 소개합니다.

Intro

Substation 2.0 제품을 개발하며 새로운 기능을 만드는 와중에, 기존 백엔드 서버에서 동작하던 AI 연산부에서 치명적인 이슈를 발견한 적이 있습니다.

해당 이슈는 스프린트 플래닝 과정에서 PO와 당시 옆에 있었던 프론트 대빵, 그리고 팀 내 개발자 등 당시 180cm가 넘는 거구들과 화이트보드 앞에서 열띤 토의를 했고(심리적 압박감 무엇..?), 아래 이슈로 인해서 백엔드와 AI를 분리하기로 했습니다.

스프린트를 거듭할수록 왜 이런 부류의 개발 난이도가 계속 발생하는가?!

Issue

Substation2.0 서비스에서 AI 알고리즘이 동작하는 순간은 크게 3가지로 분류됩니다.

  1. 신규 고객이 서브스테이션에 변압기 및 DGA(절연유 가스) 데이터를 처음 등록할 때 (Bulk Upload)
  2. 기존 변압기의 DGA 가스 이력을 새로 추가할 때 (Web Upload)
  3. 작업자가 변압기 점검일지를 작성할 때

이 중 3번째 상황에서 여러 사람이 점검일지를 동시에 작성하게 되면, AI 알고리즘도 동시에 여러 개가 돌아가야 합니다. 하지만 AI 개발자가 처음 알고리즘을 개발했을 때는 동시 실행이 불가능한 구조로 설계를 해뒀습니다.

즉, 백엔드 서버가 동시에 여러 개의 API 요청을 받을 수 있고, 동시에 여러 개의 DB 트랜잭션도 가능하지만, AI 연산은 오직 1개의 프로세스만 가능한 구조입니다.

기존 동작 Process
기존 동작 Process

AI 연산 중에 또다른 AI 요청이 들어오면 Exception...
AI 연산 중에 또다른 AI 요청이 들어오면 Exception...

어떻게 해결하지?

AI 알고리즘 연산부를 동시 처리가 가능하도록 바꾸자

해당 이슈를 처음 발견했을 때는 AI 연산 코드를 동시 처리 되도록 수정하면 되지 싶어서 해당 코드를 훑어봤습니다. 하지만 하나의 함수가 600~700줄이 넘는 코드를 리팩토링할 엄두가 나지 않았고, ADSP 라는 자잘한 데이터 분석 자격증을 가지고는 있지만 주1 당시엔 도메인 지식도 부족했고, 실무에서 사용하는 AI 코드를 건드리는 것은 Risk가 크다고 판단됐습니다. 현 스프린트 일정 내에 처리하기엔 배보다 배꼽이 더 큰 상황이었습니다.

AI 연산을 큐에 쌓아두자

AI 연산하는 부분을 큐에 쌓아 두어서, 순차적으로 돌리게 하자라는 의견도 나왔습니다.

얼핏 보면 그럴듯한 해결 방법으로 생각할 수 있지만, 이 방법은 제품의 사용자가 많아지면 많아질수록 큐 대기 시간이 증가하게 되는 문제가 있습니다. 트래픽이 많아지거나 시간이 지날수록 고객에게 좋은 서비스를 제공하지 못하는 구조는 개발자로서 제 자존심이 허락하지 않습니다.

Backend 서버와 AI를 분리하자

Clean Architecture나 DDD 관련 서적을 찾아보면, 개발할 때 유지보수 측면에서 코드 간 응집도는 높이고 결합도는 낮게 설계하도록 권장하고 있습니다. 평소에도 Backend와 AI 간에 관심사의 분리가 필요하다고 생각했고, DB와 강결합 되어 있는 AI 코드를 간섭이 없는 순수한 함수 형태로 떼어내서 이 AI 연산부를 별도의 여러 대의 서버에서 구현하게 하면 구조적으로 해결 가능할 것 같았습니다.

AI 연산부를 AWS Lambda 로 분리
AI 연산부를 AWS Lambda 로 분리

Lambda 도입

AI 서버를 분리하기에 앞서, “하나의 EC2 서버에 쿠버네티스와 비슷한 환경을 구현해 여러 AI 컨테이너를 띄울 것인가?”, “2, 3개의 EC2 서버를 띄울 것인가?” 아니면 “Lambda를 이용해 함수를 호출할 때만 비용이 나가게 할 것인가?”에 대한 고민이 있었습니다.

개발 난이도만 본다면 여러 대의 EC2를 사용해서 구현하는 게 가장 쉽고, 쿠버네티스, Lambda 순으로 어려울 거라 생각됐지만, 비용적인 측면을 고려한다면 압도적으로 Lambda가 저렴했습니다. 또한 최소한의 비용으로 최대의 효율이 뽑아내도록 하는 것이 백엔드 개발자로서 숙명이라 생각해 Lambda로 결정했습니다.

Aws Lambda
AWS Lambda

EFS (Elastic File System)

https://aws.amazon.com/ko/efs

AI 코드를 Lambda로 분리하기 위해서는 필요한 Python 패키지를 설치해야 하는데, AI 분야에서 사용하는 라이브러리는 보통 용량이 큰 편이었습니다.

Lambda에서 실행할 수 있는 형태로 올리기 위해서는 Zip 압축 파일로 올리거나, S3 Bucket에 올려야 하는데, 각각 50MB, 160MB의 용량 제한이 있습니다. 의존성까지 포함해서 2GB가 넘어가는 프로그램을 Lambda에 올리기 위해서는 대안이 필요했습니다.

DevOps 팀의 열정적인 분에게 찾아가 조언을 구하니 AWS EFS(Elastic File System)을 활용해서 패키지를 설치하고, EC2에서 가져다 쓰게 설정하면 해결이 가능할 것 같다는 조언을 받을 수 있었습니다. 이를 구성하기까지 많은 시행착오가 있었지만, 덕분에 Lambda를 돌릴 수 있는 환경을 구축했습니다.

이렇게 해도 해결하기 어렵다면 Lambda Layer를 고려해 보려 했지만, 다행히 성공했기에 Pass...

Aws EFS
AWS EFS
https://docs.aws.amazon.com/lambda/latest/dg/configuration-layers.html

Lambda 함수 마운트 폴더 설정

import sys

sys.path.append("/mnt/efs/packages")   # 마운트 폴더 설정

from MtrAPI import MtrAPI


def lambda_handler(event, context):
    ai_processing_result = MtrAPI(event)

    return ai_processing_result

이렇게 기존 Substation 제품의 AI 동시성 이슈에 대해서 구조적으로 Backend 파트와 AI 연산 파트를 분리해서 결합도를 낮췄고, 분리된 AI 파트를 AWS Lambda를 활용해서 별도의 AI 서버를 통한 로드 밸런싱 없이 다중 처리가 가능하도록 수정했습니다.

새로운 이슈의 등장

해당 기능을 만들면서 단일 변압기 설비에 대한 AI 연산이 많이 발생할 것으로 기대했으나, 시간이 지나 추가된 기획에 의하면 오히려 단일 설비보다 회사 전체 설비에 대한 AI 연산을 돌리는 기능이 많아질 것으로 확인되었습니다. 이런...

설비 하나의 AI 연산은 약 4~5초 걸리지만, 회사 전체 설비에 대해 AI 연산이 필요하다면 구조적인 개선뿐만 아니라 성능 개선 또한 같이 이루어져야 했습니다.

Boto3 비동기?

처음에는 AWS Lambda 함수 호출을 비동기로 처리하는 것을 고려해 봤습니다. AWS에서 Lambda를 공식적으로 지원해 주는 Python 라이브러리로 Boto3 가 있습니다. 아래와 같은 방식으로 Boto3 Session을 만들어서 쓰면 AI 연산이 Lambda에서 동기식으로 정상 동작하는 것을 확인할 수 있습니다.

# 동기 방식
def execute_lambda(payload):
    lambda_client = get_boto3_session()
    function_name = "substation_lambda"

    # Lambda 함수 실행
    response = lambda_client.invoke(
        FunctionName=function_name, Payload=json.dumps(payload)
    )
    result = json.loads(response["Payload"].read().decode("utf-8"))
    return result

다만 ‘성능’과 ‘동시성’ 처리를 위해 비동기 방식으로 요청하기 위해 여러 삽질이 있었고, Boto3 1.26.94 버전 공식 매뉴얼에서 해당 기능이 deprecated 되었고, 응답 코드만 받아올 수 있다는 결론을 얻게 되었습니다. 주2

# 비동기 방식
async def execute_lambda_async(payload):
    lambda_client = get_boto3_session()

    # Lambda 함수 실행
    response = await asyncio.get_running_loop().run_in_executor(
        None,
        lambda_client.invoke,
        {"FunctionName": "substation_lambda", "Payload": payload},
    )
    return response["Payload"].read().decode("utf-8")

여기까지 오기에도 많은 시간과 에너지를 소모했지만, 뭔가 2% 부족했습니다.

실제로 많은 변전소 설비와 DGA 데이터를 담고 있는 Excel 파일을 Bulk Upload할 때 동기식으로 연산하게 되면 많은 시간이 소요되었습니다.

동기식 처리시 소요 시간

    start_time = time.time()
    for asset in asset_list:
        # AI Input 데이터 추출
        feature = get_feature(session, asset.id)

        # AI 연산 시작
        result = execute_lambda(feature)
        if result.get("status") != 200:
            raise Exception("Ai Lambda 연산 중에 에러가 발생했습니다")

        # AI 연산 결과 저장
        save_result(session, result.get("response"))

    end_time = time.time()
    elapsed_time = end_time - start_time  # 경과 시간 계산

    print(f"실행 시간: {elapsed_time}초")

실행시간: 760.14 초

여러 개의 AI 연산을 동기식으로 처리
여러 개의 AI 연산을 동기식으로 처리

병렬처리

Boto3 라이브러리에서 비동기를 지원하지 않는다면 파이썬 기본 기능을 이용해 자체적으로 ThreadProcess를 활용해 구현해 보기로 했습니다. 참고로 보통 아래의 상황에서는 Thread나 Process를 사용해서 해당 작업을 병렬로 실행하는 것을 권장합니다.

  • 애플리케이션이 I/O를 처리할 때는 운영 체제가 스트림을 처리해서 결과를 돌려줄 때까지 계속 대기한다. 이 시간 동안 애플리케이션을 멈춰 있다.
  • 애플리케이션이 CPU를 사용하여 계산할 때 CPU 중 하나만 사용한다. 다른 CPU는 유휴상태로 있다.

Thrad VS Process

Python 언어에서는 C#, Java와 달리 GIL(Global Interpreter Lock)이란 골칫거리 녀석이 있습니다. GIL은 ‘파이썬 전역 인터프리터 락’의 줄임말로 CPython이 Byte 코드를 실행할 때마다 Thread에 Lock이 걸리게 됩니다. 그러다 보면 Thread 여러 개로 애플리케이션을 확장하더라도 GIL로 인해 모든 Thread가 이 GIL을 차지하기 위해 경합하게 되고, 성능이 떨어지게 됩니다.

반면에 멀티 Process를 사용하게 되면 한 번에 많은 작업을 실행할 수 있습니다. GIL이 관여하지 않으므로 Thread보다 빠르지만, Process 사이에는 어떤 메모리 공간도 공유되지 않기 때문에 Process에서 다른 Process로 전환하기 위해서는 상태 없이 (stateless) 동작해야 한다는 조건을 만족해야 합니다. 또한 Process를 사용하게 되면 네트워크 비용이 더 증가할 수 있다는 단점이 있습니다.

정리하면 I/O 작업을 동시에 처리할 때는 Thread가 효율적이고, CPU 사용량을 최대로 끌어올리려면 많은 프로세스를 사용하는 게 효율적입니다.

Lambda 같은 경우도 Backend 서버에서 Lambda 함수를 호출하는 I/O 작업이기에 Thread가 좀 더 효율적일 것이라 기대했습니다. 다만 Python의 GIL에 의한 성능저하가 우려되어 Thread를 사용할 때와 Process를 사용할 때를 비교해, 더 효율적인 것을 사용하기로 했습니다.

병렬처리 오류

해당 파이썬 코드에서 사용할 병렬처리 라이브러리로 concurrent.futures를 선택했습니다. 기존 코드에서 Lambda를 돌릴 함수와 해당 함수에 Input 값을 넣어줄 부분을 묶어서 하나의 병렬처리를 위한 함수로 구현하면 Thread와 Process 모두 동일한 에러가 발생했습니다.

TypeError("'dict' object is not callable")

ProcessPoolExecutor

        with futures.ProcessPoolExecutor(max_workers=4) as executor:
            futures_list = [
                executor.submit(process_asset(session, asset))
                for asset in assets
            ]
            # AI 알고리즘 연산 결과 저장
            for future in futures_list:
                result = future.result()
                if result is not None:
                    save_result(session, result.get("response"))


def process_asset(session, asset):
    try:
        # AI 연산 Input 데이터 가공
        feature = get_feature(session, asset.id)
        # AI 연산 시작
        result = execute_lambda(feature)
        if result.get("status") != 200:
            raise Exception("Ai Lambda 연산 중에 에러가 발생했습니다")
        return result

TreadPoolExecutor

        with futures.ThreadPoolExecutor(max_workers=4) as executor:
            futures_list = [
                executor.submit(process_asset(session, asset))
                for asset in assets
            ]
            # AI 알고리즘 연산 결과 저장
            for future in futures_list:
                result = future.result()
                if result is not None:
                    save_result(session, result.get("response"))


def process_asset(session, asset):
    try:
        # AI 연산 Input 데이터 가공
        feature = get_feature(session, asset.id)
        # AI 연산 시작
        result = execute_lambda(feature)
        if result.get("status") != 200:
            raise Exception("Ai Lambda 연산 중에 에러가 발생했습니다")
        return result

가능성이 있어 보이는 코드에는 모두 예외 처리를 했음에도 예외를 타지 않는 에러가 발생했고, ChatGPT의 도움을 받아보니 아래 공유 데이터에 대한 이슈가 해결의 실마리처럼 보였습니다.

“스레드 간에 공유되는 데이터에 대한 접근 충돌이 있는지 확인합니다. 스레드 간의 동시 접근을 막기 위해 동기화 메커니즘을 사용할 수 있습니다.”

ChatGPT 와의 대화
ChatGPT 와의 대화

병렬처리 해결

GPT의 도움을 받아 고민해 보니 병렬 처리에 사용되는 process_asset 함수가 마음에 걸렸습니다.

Lambda 함수를 실행하기 전에 Input 값을 만들어 주는 함수가 공유 데이터에 대한 이슈와 더불어 DB session까지 물고 있었기에 문제의 원인이라 생각됐고, 전반적인 레거시 코드를 수정하고 병렬처리로 사용할 process_asset 함수를 Lambda 실행 부분만 포함하도록 분리해 정상적으로 실행되게 만들 수 있었습니다.

[Before]

def process_asset(session, asset):
    # AI 연산 Input 데이터 가공
    feature = get_feature(session, asset.id)
    # AI 연산 시작
    result = execute_lambda(feature)
    if result.get("status") != 200:
        raise Exception("Ai Lambda 연산 중에 에러가 발생했습니다")
    return result

[After]

def process_asset(feature):
    # AI 연산 시작
    result = execute_lambda(feature)
    if result.get("status") != 200:
        raise Exception("Ai Lambda 연산 중에 에러가 발생했습니다")
    return result

병렬처리 성능 비교

4개 Thread로 돌릴 때 시간 확인

TreadPoolExecutor - 4 worker

    start_time = time.time()

    # AI 연산 데이터 추출
    features = get_features(session, company_id)
    # AI Lambda 연산
    with futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures_list = [
            executor.submit(process_asset, feature) for feature in features
        ]
    # AI 알고리즘 연산 결과 저장
    for future in futures_list:
        result = future.result()
        if result is not None:
            save_result(session, result.get("response"))

    end_time = time.time()
    elapsed_time = end_time - start_time # 경과 시간 계산

실행 시간: 126.87955212593079초

8개 Thread로 돌릴 때 시간 확인

TreadPoolExecutor - 8 worker

    start_time = time.time()

    # AI 연산 데이터 추출
    features = get_features(session, company_id)
    # AI Lambda 연산
    with futures.ThreadPoolExecutor(max_workers=8) as executor:
        futures_list = [
            executor.submit(process_asset, feature) for feature in features
        ]
    # AI 알고리즘 연산 결과 저장
    for future in futures_list:
        result = future.result()
        if result is not None:
            save_result(session, result.get("response"))

    end_time = time.time()
    elapsed_time = end_time - start_time # 경과 시간 계산

실행 시간: 197.9756360054016초

4개 Process로 돌릴 때 시간 확인

ProcessPoolExecutor - 4 worker

    start_time = time.time()

    # AI 연산 데이터 추출
    features = get_features(session, company_id)
    # AI Lambda 연산
    with futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures_list = [
            executor.submit(process_asset, feature) for feature in features
        ]
    # AI 알고리즘 연산 결과 저장
    for future in futures_list:
        result = future.result()
        if result is not None:
            MtrAPI_RusultUpload(session, result.get("response"))

    end_time = time.time()
    elapsed_time = end_time - start_time  # 경과 시간 계산

실행시간: 276.74906373023987

8개 Process 로 돌릴 때 시간 확인

ProcessPoolExecutor - 8 worker

    start_time = time.time()

    # AI 연산 데이터 추출
    features = get_features(session, company_id)
    # AI Lambda 연산
    with futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures_list = [
            executor.submit(process_asset, feature) for feature in features
        ]
    # AI 알고리즘 연산 결과 저장
    for future in futures_list:
        result = future.result()
        if result is not None:
            MtrAPI_RusultUpload(session, result.get("response"))

    end_time = time.time()
    elapsed_time = end_time - start_time  # 경과 시간 계산

실행시간:212.99634408950806

여러 개의 AI 연산을 병렬로 처리
여러 개의 AI 연산을 병렬로 처리

결론

기존의 Substation 제품에서 동시에 AI 연산을 할 수 없었던 근본적인 문제를 해결하기 위해 Lambda를 활용해서 동시다발적으로 처리할 수 있도록 아키텍처를 변경했고, 예상한 대로 I/O 작업을 하는 병렬처리 작업에서는 Multi Process보다 Multi Thread가 성능상 이점을 가진다는 것을 확인할 수 있었습니다.

Excel Bulk Upload 기준 성능 향상 지표
표
쓰레드와 프로세스 성능비교

  1. 데이터분석 준전문가 (Advanced Data Analytics Semi-Professional)
  2. invoke_async - Boto3 Docs 1.26.94 documentation
원프레딕트는 더 나은 제품을 고민하며 기술적인 문제를 함께 풀어낼 동료를 찾고 있습니다.
자세한 내용은 채용 사이트를 참고해 주세요.