코드 유사성 판단 경진 대회- private 3위
라이브러리 및 seed고정
#from google.colab import drive
# drive.mount('/content/drive')
!pip install transformers torch pandas tqdm
import pandas as pd
import numpy as np
import os
import random
from itertools import combinations, product
import re
import sklearn
from sklearn.model_selection import train_test_split
import torch
torch.set_float32_matmul_precision('high')
from torch.utils.data import Dataset, DataLoader
import transformers
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
#from google.colab import files
# Seed 고정 함수
def seed_everything(seed: int = 42, contain_cuda: bool = False):
os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
print(f"Seed set as {seed}")
seed = 42
seed_everything(seed)
# CUDA 사용 가능 여부 확인 및 GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
데이터 생성
def preprocess_and_remove_extras(filepath):
with open(filepath, 'r', encoding='utf-8') as file:
content = file.read()
content = re.sub(re.compile("/\*.*?\*/", re.DOTALL), "", content) # 멀티 라인 주석 제거
content = re.sub(re.compile("//.*?\n"), "", content) # 싱글 라인 주석 제거
content = re.sub(re.compile("#include <.*?>\n"), "", content) # angle brackets를 사용하는 include 제거
content = re.sub(re.compile("#include \".*?\"\n"), "", content) # double quotes를 사용하는 include 제거
content = re.sub(re.compile("#define .*?\n"), "", content) # 매크로 정의 제거
content = re.sub(re.compile("[\t ]+"), " ", content) # 공백 및 탭 정리
content = re.sub(re.compile("\n\s*\n"), "\n", content)# 여러 줄바꿈을 하나로
# 공백이 아닌 줄만 선택하여 리스트로 만든 후, 문자열로 결합
processed_script = '\n'.join([line.strip() for line in content.splitlines() if line.strip()])
return processed_script
code_folder = "./data/train_code"
problem_folders = os.listdir(code_folder)
preprocess_scripts = []
problem_nums = []
# 500개 Sample code에 대한 전처리
for problem_folder in tqdm(problem_folders):
scripts = os.listdir(os.path.join(code_folder, problem_folder)) # code/problem000/.cpp 파일
problem_num = problem_folder # 문제 번호 폴더명
for script in scripts:
# .ipynb_checkpoints 폴더 무시
if script == ".ipynb_checkpoints":
continue
script_file = os.path.join(code_folder,problem_folder,script)
if os.path.isfile(script_file): # 경로가 실제 파일인지 확인
preprocessed_script = preprocess_and_remove_extras(script_file)
if preprocessed_script: # 전처리된 스크립트가 비어 있지 않다면
preprocess_scripts.append(preprocessed_script)
problem_nums.append(problem_folder) # 문제 번호 추가
df = pd.DataFrame(data= {'code':preprocess_scripts, 'problem_num':problem_nums})
# train과 validation data set 분리
train_df, valid_df, train_label, valid_label = train_test_split(
df,
df['problem_num'],
random_state=42,
test_size=0.1,
stratify=df['problem_num']
)
train_df = train_df.reset_index(drop=True) # Reindexing
valid_df = valid_df.reset_index(drop=True)
train_data 150만쌍
codes = train_df['code'].to_list() # code 컬럼을 list로 변환 - codes는 code가 쭉 나열된 형태임
problems = train_df['problem_num'].unique().tolist() # 문제 번호를 중복을 제외하고 list로 변환
problems.sort()
train_positive_pairs = []
train_negative_pairs = []
for problem in tqdm(problems):
# 각각의 문제에 대한 code를 골라 정답 코드로 저장, 아닌 문제는 other_codes로 저장
# 이때 train_df에는 problem_num이 정렬된 상태가 아니기 때문에 index가 다를 수 있음
solution_codes = train_df[train_df['problem_num'] == problem]['code'].to_list()
other_codes = train_df[train_df['problem_num'] != problem]['code'].to_list()
# positive_pairs 1500개 (총 500 * 1500 = 750,000개) 추출
# negative_pairs 1500개 (총 500 * 1500 = 750,000개) 추출
positive_pairs = list(combinations(solution_codes,2))
random.shuffle(positive_pairs)
positive_pairs = positive_pairs[:1500]
random.shuffle(other_codes)
other_codes = other_codes[:1500]
negative_pairs = []
for pos_codes, others in zip(positive_pairs, other_codes):
negative_pairs.append((pos_codes[0], others))
train_positive_pairs.extend(positive_pairs)
train_negative_pairs.extend(negative_pairs)
# total_positive_pairs와 negative_pairs의 정답 코드를 묶어 code1로 지정
# total_positive_pairs와 negative_pairs의 비교 대상 코드를 묶어 code2로 지정
# 해당 코드에 맞는 label 설정
code1 = [code[0] for code in train_positive_pairs] + [code[0] for code in train_negative_pairs]
code2 = [code[1] for code in train_positive_pairs] + [code[1] for code in train_negative_pairs]
label = [1]*len(train_positive_pairs) + [0]*len(train_negative_pairs)
# DataFrame으로 선언
create_train_data = pd.DataFrame(data={'code1':code1, 'code2':code2, 'similar':label})
create_train_df = create_train_data.sample(frac=1).reset_index(drop=True) # frac: 추출할 표본 비율
train_data 200만쌍
codes = train_df['code'].to_list() # code 컬럼을 list로 변환 - codes는 code가 쭉 나열된 형태임
problems = train_df['problem_num'].unique().tolist() # 문제 번호를 중복을 제외하고 list로 변환
problems.sort()
train_positive_pairs = []
train_negative_pairs = []
for problem in tqdm(problems):
# 각각의 문제에 대한 code를 골라 정답 코드로 저장, 아닌 문제는 other_codes로 저장
# 이때 train_df에는 problem_num이 정렬된 상태가 아니기 때문에 index가 다를 수 있음
solution_codes = train_df[train_df['problem_num'] == problem]['code'].to_list()
other_codes = train_df[train_df['problem_num'] != problem]['code'].to_list()
# positive_pairs 2000개 (총 500 * 2000 = 1000,000개) 추출
# negative_pairs 2000개 (총 500 * 2000 = 1000,000개) 추출
positive_pairs = list(combinations(solution_codes,2))
random.shuffle(positive_pairs)
positive_pairs = positive_pairs[:2000]
random.shuffle(other_codes)
other_codes = other_codes[:2000]
negative_pairs = []
for pos_codes, others in zip(positive_pairs, other_codes):
negative_pairs.append((pos_codes[0], others))
train_positive_pairs.extend(positive_pairs)
train_negative_pairs.extend(negative_pairs)
# total_positive_pairs와 negative_pairs의 정답 코드를 묶어 code1로 지정
# total_positive_pairs와 negative_pairs의 비교 대상 코드를 묶어 code2로 지정
# 해당 코드에 맞는 label 설정
code1 = [code[0] for code in train_positive_pairs] + [code[0] for code in train_negative_pairs]
code2 = [code[1] for code in train_positive_pairs] + [code[1] for code in train_negative_pairs]
label = [1]*len(train_positive_pairs) + [0]*len(train_negative_pairs)
# DataFrame으로 선언
create2_train_data = pd.DataFrame(data={'code1':code1, 'code2':code2, 'similar':label})
create2_train_df = create2_train_data.sample(frac=1).reset_index(drop=True) # frac: 추출할 표본 비율
valid_data
codes = valid_df['code'].to_list() # code 컬럼을 list로 변환 - codes는 code가 쭉 나열된 형태임
problems = valid_df['problem_num'].unique().tolist() # 문제 번호를 중복을 제외하고 list로 변환
problems.sort()
valid_positive_pairs = []
valid_negative_pairs = []
for problem in tqdm(problems):
# 각각의 문제에 대한 code를 골라 정답 코드로 저장, 아닌 문제는 other_codes로 저장
# 이때 train_df에는 problem_num이 정렬된 상태가 아니기 때문에 index가 다를 수 있음
solution_codes = valid_df[valid_df['problem_num'] == problem]['code'].to_list()
other_codes = valid_df[valid_df['problem_num'] != problem]['code'].to_list()
# positive_pairs 200개 (총 500 * 200 = 100,000개) 추출
# negative_pairs 200개 (총 500 * 200 = 100,000개) 추출
positive_pairs = list(combinations(solution_codes,2))
random.shuffle(positive_pairs)
positive_pairs = positive_pairs[:200]
random.shuffle(other_codes)
other_codes = other_codes[:200]
negative_pairs = []
for pos_codes, others in zip(positive_pairs, other_codes):
negative_pairs.append((pos_codes[0], others))
valid_positive_pairs.extend(positive_pairs)
valid_negative_pairs.extend(negative_pairs)
# total_positive_pairs와 negative_pairs의 정답 코드를 묶어 code1로 지정
# total_positive_pairs와 negative_pairs의 비교 대상 코드를 묶어 code2로 지정
# 해당 코드에 맞는 label 설정
code1 = [code[0] for code in valid_positive_pairs] + [code[0] for code in valid_negative_pairs]
code2 = [code[1] for code in valid_positive_pairs] + [code[1] for code in valid_negative_pairs]
label = [1]*len(valid_positive_pairs) + [0]*len(valid_negative_pairs)
# DataFrame으로 선언
valid_data = pd.DataFrame(data={'code1':code1, 'code2':code2, 'similar':label})
val_df = valid_data.sample(frac=1).reset_index(drop=True) # frac: 추출할 표본 비율
train data 합치기, 데이터 셋 클래스 정의
train1_df: 새로 만든 150만개의 create_train_df + sample train data = 총 152만개
train2_df: 새로 만든 200만개의 create_train_df + sample train data = 총 202만개
train1_df: 새로 만든 150만개의 create_train_df + sample train data = 총 152만개
train2_df: 새로 만든 200만개의 create_train_df + sample train data = 총 202만개
sample_data_path = './data/sample_train.csv' # 예시로 주어진 샘플 데이터도 학습데이터에 포함
sample_df = pd.read_csv(sample_data_path)
train1_df = pd.concat([create_train_df, sample_df], ignore_index=True) # 총 152만개의 train데이터 생성
train2_df = pd.concat([create2_train_df, sample_df], ignore_index=True) # 총 202만개의 train데이터 생성
# 전처리 적용
# 이미 create_train_data 만들 때 적용했지만, test.csv와, sample_train.csv에도 같은 전처리를 하기 위해 다시 정의
def remove_extras(code):
code = re.sub(re.compile("/\*.*?\*/", re.DOTALL), "", code) # 멀티 라인 주석 제거
code = re.sub(re.compile("//.*?\n"), "", code) # 싱글 라인 주석 제거
code = re.sub(re.compile("#include <.*?>\n"), "", code) # angle brackets를 사용하는 include 제거
code = re.sub(re.compile("#include \".*?\"\n"), "", code) # double quotes를 사용하는 include 제거
code = re.sub(re.compile("#define .*?\n"), "", code) # 매크로 정의 제거
code = re.sub(re.compile("[\t ]+"), " ", code) # 탭과 여러 공백을 하나의 공백으로
code = re.sub(re.compile("\n\s*\n"), "\n", code) # 여러 줄바꿈을 하나로
return code.strip()
# 데이터셋 클래스 정의
# 여기서 전처리 함수는 remove_extras
class CodePairDataset(Dataset):
def __init__(self, tokenizer, data, max_length=512, include_labels=True):
self.tokenizer = tokenizer
self.data = data
self.max_length = max_length
self.include_labels = include_labels
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
record = self.data.iloc[idx]
code1 = remove_extras(record['code1'])
code2 = remove_extras(record['code2'])
inputs = self.tokenizer(
code1, code2,
padding='max_length', truncation=True, max_length=self.max_length, return_tensors="pt"
)
inputs = {key: val.squeeze() for key, val in inputs.items()}
if self.include_labels:
inputs['labels'] = torch.tensor(record['similar'], dtype=torch.long)
return inputs
첫번째 모델
# 152만쌍 train, epoch 4, lr = 2e-5
# GraphCodeBERT 모델 및 토크나이저 로드
model_name = "microsoft/graphcodebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.truncation_side = 'left'
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2).to(device)
# 훈련 세트와 검증 세트에 대한 데이터셋 생성
train1_dataset = CodePairDataset(tokenizer, train1_df, max_length=512)
val_dataset = CodePairDataset(tokenizer, val_df, max_length=512, include_labels=True)
# 데이터 로더 준비
train1_loader = DataLoader(train1_dataset, batch_size=48, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=48, shuffle=False)
# 파인 튜닝을 위한 옵티마이저 설정
optimizer = AdamW(model.parameters(), lr=2e-5)
# 훈련 루프 수정
model.train()
for epoch in range(4): # 에폭 수 필요에 따라 조정
total_loss = 0
model.train()
for batch in tqdm(train1_loader):
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
total_loss += loss.item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 에폭당 평균 훈련 손실 계산
epoch_loss = total_loss / len(train1_loader)
print(f"Epoch {epoch+1}, Loss: {epoch_loss}")
# 검증 세트를 이용한 모델 평가
model.eval()
total_eval_accuracy = 0
for batch in tqdm(val_loader):
batch = {k: v.to(device) for k, v in batch.items()}
with torch.no_grad():
outputs = model(**batch)
logits = outputs.logits
predictions = torch.argmax(logits, dim=-1)
labels = batch["labels"]
# 정확도 계산
accuracy = (predictions == labels).cpu().numpy().mean() * 100
total_eval_accuracy += accuracy
# 에폭당 평균 검증 정확도 계산
avg_val_accuracy = total_eval_accuracy / len(val_loader)
print(f"Validation Accuracy: {avg_val_accuracy:.2f}%")
# 첫번째 모델 저장
torch.save(model.state_dict(), './data/model_graphcodebert4.pth')
두번째 모델
# 202만쌍 train, epoch 3, lr =2e-5
# GraphCodeBERT 모델 및 토크나이저 로드
# 초기화 해줌으로써, 첫번째 모델과 독립적으로 작동이 가능함
model_name = "microsoft/graphcodebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.truncation_side = 'left'
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2).to(device)
# 훈련 세트와 검증 세트에 대한 데이터셋 생성
train2_dataset = CodePairDataset(tokenizer, train2_df, max_length=512)
val_dataset = CodePairDataset(tokenizer, val_df, max_length=512, include_labels=True)
# 데이터 로더 준비
train2_loader = DataLoader(train2_dataset, batch_size=48, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=48, shuffle=False)
# 파인 튜닝을 위한 옵티마이저 설정
optimizer = AdamW(model.parameters(), lr=2e-5)
# 훈련 루프 수정
model.train()
for epoch in range(3): # 에폭 수 필요에 따라 조정
total_loss = 0
model.train()
for batch in tqdm(train2_loader):
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
total_loss += loss.item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 에폭당 평균 훈련 손실 계산
epoch_loss = total_loss / len(train2_loader)
print(f"Epoch {epoch+1}, Loss: {epoch_loss}")
# 검증 세트를 이용한 모델 평가
model.eval()
total_eval_accuracy = 0
for batch in tqdm(val_loader):
batch = {k: v.to(device) for k, v in batch.items()}
with torch.no_grad():
outputs = model(**batch)
logits = outputs.logits
predictions = torch.argmax(logits, dim=-1)
labels = batch["labels"]
# 정확도 계산
accuracy = (predictions == labels).cpu().numpy().mean() * 100
total_eval_accuracy += accuracy
# 에폭당 평균 검증 정확도 계산
avg_val_accuracy = total_eval_accuracy / len(val_loader)
print(f"Validation Accuracy: {avg_val_accuracy:.2f}%")
# 두번째 모델 저장
torch.save(model.state_dict(), './data/model_graphcodebert6.pth')
최종 모델
#
저장 했던 두 모델을 불러와서 test데이터에 대한 예측을 수행합니다
# 마지막으로 test데이터를 평가
test_data_path = './data/test.csv'
# 테스트 데이터를 DataFrame으로 로드
test_df = pd.read_csv(test_data_path)
# sample_submission.csv 파일 로딩
sample_submission_path = './data/sample_submission.csv'
sample_submission = pd.read_csv(sample_submission_path)
model_path_1 = './data/model_graphcodebert4.pth'
model_path_2 = './data/model_graphcodebert6.pth'
# 모델 및 토크나이저 초기화
model_name = "microsoft/graphcodebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.truncation_side = 'left'
# 테스트 데이터셋 준비
test_dataset = CodePairDataset(tokenizer, test_df, max_length=512, include_labels=False)
test_loader = DataLoader(test_dataset, batch_size=48, shuffle=False)
def evaluate_model(model_path):
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2).to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()
probabilities = []
with torch.no_grad():
for batch in test_loader:
batch = {k: v.to(device) for k, v in batch.items()}
outputs = model(**batch)
logits = outputs.logits
probs = torch.softmax(logits, dim=-1).cpu().numpy() # 확률로 변환
probabilities.extend(probs)
return np.array(probabilities)
# 각 모델을 평가하여 확률을 얻음
probabilities_1 = evaluate_model(model_path_1)
probabilities_2 = evaluate_model(model_path_2)
# 확률 평균을 최종 모델의 확률로 정함
final_probabilities = (probabilities_1 + probabilities_2) / 2
final_predictions = np.argmax(final_probabilities, axis=1)
# 최종 예측 결과를 sample_submission.csv에 저장
sample_submission['similar'] = final_predictions
sample_submission.to_csv('./data/final_submission.csv', index=False)
# 제출 파일 다운로드
#from google.colab import files
#files.download('./data/final_submission.csv')
후기
처음 나가는 nlp 대회라 쉽지 않았네요.. 그래도 좋은 성적으로 마무리 지어서 기쁩니다:)
댓글남기기