728x90
반응형
이번 포스트에서는 아두이노와 파이썬을 활용해 Peltier 모듈을 제어하고, 온도 상승 시간 (Rising time) 및 복귀 시간 (Return time) 을 계산하는 실험을 정리합니다. 실시간 PID 제어를 통해 일정한 온도 변화 자극을 만들고, 이후 수집된 데이터를 기반으로 파이썬에서 자동 분석을 수행합니다.
실험 개요
- 목표: ΔT 자극을 주고 난 뒤, 목표 온도 도달 시간과 baseline 온도로 복귀하는 시간을 측정
- 센서: Thermistor (NTC)
- 액츄에이터: Peltier 모듈
- 제어 방식: PID 제어
- 샘플링 주기: 10Hz (100ms)
- 데이터 수집 방식: Serial 통신 → CSV 저장
하드웨어 구성
- Arduino Mega
- Peltier module (Thermoelectric cooler)
- Thermistor
- Motor Driver (PWM 제어)
- 외부 전원 (12V 이상)
아두이노 코드 (PID 제어 및 Serial 수신)
#include <PID_v1.h>
#define PWM 10
#define PHASE1 2
#define PHASE2 3
int Vi = 1023;
float R1 = 22000;
float T = 0;
float c1 = -1.185559046e-03;
float c2 = 5.505203063e-04;
float c3 = -9.653138374e-07;
double input = 0, output = 0;
double init_setpoint = 32.5;
double setpoint = init_setpoint;
double Kp = 384, Ki = 2, Kd = 32;
PID myPID(&input, &output, &setpoint, Kp, Ki, Kd, DIRECT);
String serialBuffer = "";
unsigned long startTime = 0;
bool started = false;
void setup() {
Serial.begin(115200);
pinMode(PWM, OUTPUT);
pinMode(PHASE1, OUTPUT);
pinMode(PHASE2, OUTPUT);
myPID.SetMode(AUTOMATIC);
myPID.SetOutputLimits(-255, 255);
Serial.println("READY");
}
void loop() {
while (Serial.available() > 0) {
char c = Serial.read();
if (c == '\n') {
serialBuffer.trim();
if (serialBuffer.equalsIgnoreCase("start")) {
startTime = millis();
started = true;
Serial.println("Start received. Time reset.");
} else if (serialBuffer.length() > 0) {
double delta = serialBuffer.toFloat();
setpoint = init_setpoint + delta;
Serial.print("Delta Received: ");
Serial.println(delta);
}
serialBuffer = "";
} else {
serialBuffer += c;
}
}
input = ReadTemperature();
myPID.Compute();
double pwmValue = abs(output);
digitalWrite(PHASE1, input < setpoint ? LOW : HIGH);
digitalWrite(PHASE2, input < setpoint ? HIGH : LOW);
analogWrite(PWM, pwmValue);
if (started) {
LogData(pwmValue);
}
delay(100); // 10Hz
}
double ReadTemperature() {
int Vo = analogRead(A0);
if (Vo <= 0) return -999.0;
float R2 = R1 * (Vi / (float)Vo - 1.0);
float logR2 = log(R2);
T = (1.0 / (c1 + c2 * logR2 + c3 * pow(logR2, 3))) - 273.15;
return T;
}
void LogData(double pwmValue) {
double delta = setpoint - init_setpoint;
String time = String(millis() - startTime);
String message = time + "," +
String(input, 2) + "," +
String(setpoint, 2) + "," +
String(delta, 2) + "," +
String(pwmValue, 2) + "," +
"Received:" + String(delta, 2);
Serial.println(message);
}
🐍 파이썬 코드 (ΔT 시퀀스 전송 및 로그 저장)
- 각 ΔT마다 50회 자극 + 50회 복귀 (총 100개) 전송
- 아두이노는 10Hz 주기로 수신하여 로그 출력
CSV 파일은 logs/20250416/delta_3.0/run_1.csv
형식으로 저장됩니다.
# -------------------------
# 전체 파이썬 코드: 아두이노와 10Hz 통신, 반복 ΔT 로그 저장 및 평균 시각화
# -------------------------
import serial
import csv
import time
import threading
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import glob
# -------------------------
# 설정
# -------------------------
PORT = 'COM4'
BAUD = 115200
DURATION = 10
WAIT_BETWEEN_RUNS = 10
REPEAT = 5 # 각 ΔT에 대해 10회 반복
DATE_STR = time.strftime('%Y%m%d')
BASE_DIR = os.path.join("logs", DATE_STR)
os.makedirs(BASE_DIR, exist_ok=True)
# -------------------------
# 로그 수신 쓰레드
# -------------------------
def log_receiver(ser, duration_s, filepath):
start_time = time.time()
with open(filepath, mode='w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Millis', 'Input_Temperature', 'Setpoint', 'Delta', 'PWM', 'Received'])
while time.time() - start_time < duration_s:
try:
line = ser.readline().decode('utf-8').strip()
if line and ',' in line and 'Received:' in line:
parts = line.split(',')
if len(parts) == 6:
writer.writerow(parts)
except:
continue
# -------------------------
# ΔT 리스트 전송 (10Hz), 로그 저장
# -------------------------
def send_delta_list_10hz(delta_value, run_index):
delta_folder = os.path.join(BASE_DIR, f"delta_{delta_value:.1f}")
os.makedirs(delta_folder, exist_ok=True)
filepath = os.path.join(delta_folder, f"run_{run_index}.csv")
ser = serial.Serial(PORT, BAUD, timeout=1)
time.sleep(2)
ser.reset_input_buffer()
# 로그 수신 시작
log_thread = threading.Thread(target=log_receiver, args=(ser, DURATION, filepath))
log_thread.start()
# ΔT 전송
ser.write(b"start\n")
time.sleep(0.1)
deltas = [delta_value] * 50 + [0.0] * 50 # 50개 자극, 50개 복귀
for i, delta in enumerate(deltas):
msg = f"{delta:.2f}\n"
ser.write(msg.encode())
time.sleep(0.1)
log_thread.join()
ser.close()
print(f"[Log] Saved to: {filepath}")
print("[Rest] Waiting for baseline recovery...")
time.sleep(WAIT_BETWEEN_RUNS)
# -------------------------
# 시각화 함수
# -------------------------
def visualize_single_average(delta_value):
delta_folder = os.path.join(BASE_DIR, f"delta_{delta_value:.1f}")
run_files = sorted(glob.glob(os.path.join(delta_folder, "run_*.csv")))
all_runs = []
for f in run_files:
try:
df = pd.read_csv(f)
if 'Millis' not in df.columns: continue
if df['Setpoint'].max() > 0 or df['Setpoint'].min() < 0:
df['Time_s'] = df['Millis'] / 1000.0
df['Input_Temperature'] = df['Input_Temperature'].clip(20, 40)
df['Setpoint'] = df['Setpoint'].clip(20, 40)
all_runs.append(df)
except Exception as e:
print(f"⚠️ Error reading {f}: {e}")
continue
if len(all_runs) == 0:
print(f"❌ No valid runs for ΔT {delta_value:.1f}")
return
min_len = min(len(df) for df in all_runs)
input_matrix = np.array([df['Input_Temperature'].values[:min_len] for df in all_runs])
setpoint_matrix = np.array([df['Setpoint'].values[:min_len] for df in all_runs])
time_axis = all_runs[0]['Time_s'].values[:min_len]
mean_input = input_matrix.mean(axis=0)
mean_setpoint = setpoint_matrix.mean(axis=0)
plt.figure(figsize=(10, 5))
for df in all_runs:
plt.plot(df['Time_s'].values[:min_len], df['Input_Temperature'].values[:min_len], color='gray', alpha=0.3)
plt.plot(time_axis, mean_input, label='Mean Input Temp', color='darkgreen', linewidth=2)
plt.plot(time_axis, mean_setpoint, label='Mean Setpoint', color='orange', linestyle='--', linewidth=2)
plt.ylim(20, 40)
plt.xlim(0, time_axis[-1])
plt.xlabel("Time (s)")
plt.ylabel("Temperature (°C)")
plt.title(f"ΔT {delta_value:.1f} - Trials + Average")
plt.legend()
plt.grid(True)
plt.tight_layout()
plot_path = os.path.join(delta_folder, f"delta_{delta_value:.1f}_trials_and_average_plot.png")
plt.savefig(plot_path)
plt.close()
print(f"[Plot] Saved: {plot_path}")
# -------------------------
# 실행
# -------------------------
if __name__ == "__main__":
DELTA_VALUES = np.round(np.arange(-6.0, 6.1, 1.0), 2) # -6.0 to +6.0 (step 1.0)
for delta in DELTA_VALUES:
for run in range(1, REPEAT + 1):
send_delta_list_10hz(delta, run_index=run)
visualize_single_average(delta)
📈 ΔT 평균 시각화
각 ΔT에 대해 평균 온도 및 목표 온도 곡선을 시각화하여 delta_3.0_trials_and_average_plot.png
형식으로 저장됩니다.
아래 이미지 참고
위의 두 코드를 통해 데이터를 확보하고 나면, 아래의 코드를 통해서 상승시간과 복귀 시간을 알 수 있습니다.
⏱ 상승 및 복귀 시간 계산
import os
import glob
import pandas as pd
import numpy as np
# -------------------------
# 설정
# -------------------------
BASE_LOG_PATH = "logs/20250416" # <-- 여기를 실제 경로로 바꾸세요
SAVE_PATH = os.path.join(BASE_LOG_PATH, "delta_summary_rise_return_times.csv")
BASELINE_TEMP = 32.5
TOLERANCE = 0.2 # 허용 오차 범위 (°C)
# -------------------------
# 함수: 온도 기준으로 상승/복귀 시간 계산
# -------------------------
def calculate_rise_and_return_times_by_temperature(df):
df = df.copy()
df['Time_s'] = df['Millis'] / 1000.0
df['Delta'] = df['Delta'].astype(float)
df['Setpoint'] = df['Setpoint'].astype(float)
df['Input_Temperature'] = df['Input_Temperature'].astype(float)
main_delta = df['Delta'][df['Delta'] != 0.0].iloc[0] if (df['Delta'] != 0.0).any() else 0.0
if main_delta == 0.0:
return main_delta, None, None
stim_start_time = df[df['Delta'] != 0.0]['Time_s'].iloc[0]
return_start_time = df[(df['Time_s'] > stim_start_time) & (df['Delta'] == 0.0)]['Time_s'].iloc[0]
target_temp = BASELINE_TEMP + main_delta
# 상승 시간
if main_delta > 0:
rise_condition = df['Input_Temperature'] >= (target_temp - TOLERANCE)
else:
rise_condition = df['Input_Temperature'] <= (target_temp + TOLERANCE)
rise_df = df[(df['Time_s'] >= stim_start_time) & rise_condition]
rise_time = rise_df['Time_s'].iloc[0] - stim_start_time if not rise_df.empty else None
# 복귀 시간
if main_delta > 0:
return_condition = df['Input_Temperature'] <= (BASELINE_TEMP + TOLERANCE)
else:
return_condition = df['Input_Temperature'] >= (BASELINE_TEMP - TOLERANCE)
return_df = df[(df['Time_s'] >= return_start_time) & return_condition]
return_time = return_df['Time_s'].iloc[0] - return_start_time if not return_df.empty else None
return main_delta, rise_time, return_time
# -------------------------
# 모든 delta 폴더 순회하여 결과 정리
# -------------------------
all_results = []
delta_folders = sorted([
f for f in os.listdir(BASE_LOG_PATH)
if f.startswith("delta_") and os.path.isdir(os.path.join(BASE_LOG_PATH, f))
])
for folder in delta_folders:
folder_path = os.path.join(BASE_LOG_PATH, folder)
csv_files = glob.glob(os.path.join(folder_path, "run_*.csv"))
try:
delta_value = float(folder.replace("delta_", ""))
except ValueError:
continue
for csv_file in csv_files:
try:
df = pd.read_csv(csv_file)
delta, rise, ret = calculate_rise_and_return_times_by_temperature(df)
all_results.append({
"Folder": folder,
"File": os.path.basename(csv_file),
"Delta": delta,
"Rise_Time_s": round(rise,2),
"Return_Time_s": round(ret,2)
})
except Exception as e:
print(f"⚠️ Error reading {csv_file}: {e}")
# -------------------------
# 결과 저장
# -------------------------
summary_df = pd.DataFrame(all_results)
summary_df.to_csv(SAVE_PATH, index=False)
print(f"✅ Saved summary to {SAVE_PATH}")
📄 결과 저장 예시:
Folder | File | Delta | Rise_Time_s | Return_Time_s |
---|---|---|---|---|
delta_6.0 | run_1.csv | 6.0 | 2.3 | 4.1 |
delta_-3.0 | run_5.csv | -3.0 | 1.9 | 3.7 |
728x90
반응형
'HCI > Arduino' 카테고리의 다른 글
[Arduino] Thermistor, Peltier module 을 활용해 Rising time과 Return Time 다항식 만들기 / 파이썬 / 아두이노 (0) | 2025.04.16 |
---|---|
[Arduino] Thermistor 센서 값 받아 아두이노 출력 하기 (2) | 2024.09.27 |