[Arduino] Thermistor, Peltier module 을 활용해 Rising time과 Return Time 계산하기 / 파이썬 / 아두이노

2025. 4. 16. 15:19·HCI/Arduino
728x90
반응형

이번 포스트에서는 아두이노와 파이썬을 활용해 Peltier 모듈을 제어하고, 온도 상승 시간 (Rising time) 및 복귀 시간 (Return time) 을 계산하는 실험을 정리합니다. 실시간 PID 제어를 통해 일정한 온도 변화 자극을 만들고, 이후 수집된 데이터를 기반으로 파이썬에서 자동 분석을 수행합니다.


 실험 개요

  • 목표: ΔT 자극을 주고 난 뒤, 목표 온도 도달 시간과 baseline 온도로 복귀하는 시간을 측정
  • 센서: Thermistor (NTC)
  • 액츄에이터: Peltier 모듈
  • 제어 방식: PID 제어
  • 샘플링 주기: 10Hz (100ms)
  • 데이터 수집 방식: Serial 통신 → CSV 저장

하드웨어 구성

  • Arduino Mega
  • Peltier module (Thermoelectric cooler)
  • Thermistor
  • Motor Driver (PWM 제어)
  • 외부 전원 (12V 이상)

아두이노 코드 (PID 제어 및 Serial 수신)

#include <PID_v1.h>

#define PWM 10
#define PHASE1 2
#define PHASE2 3

int Vi = 1023;
float R1 = 22000;
float T = 0;
float c1 = -1.185559046e-03;
float c2 = 5.505203063e-04;
float c3 = -9.653138374e-07;

double input = 0, output = 0;
double init_setpoint = 32.5;
double setpoint = init_setpoint;
double Kp = 384, Ki = 2, Kd = 32;

PID myPID(&input, &output, &setpoint, Kp, Ki, Kd, DIRECT);

String serialBuffer = "";
unsigned long startTime = 0;
bool started = false;

void setup() {
  Serial.begin(115200);
  pinMode(PWM, OUTPUT);
  pinMode(PHASE1, OUTPUT);
  pinMode(PHASE2, OUTPUT);

  myPID.SetMode(AUTOMATIC);
  myPID.SetOutputLimits(-255, 255);

  Serial.println("READY");
}

void loop() {
  while (Serial.available() > 0) {
    char c = Serial.read();
    if (c == '\n') {
      serialBuffer.trim();
      if (serialBuffer.equalsIgnoreCase("start")) {
        startTime = millis();
        started = true;
        Serial.println("Start received. Time reset.");
      } else if (serialBuffer.length() > 0) {
        double delta = serialBuffer.toFloat();
        setpoint = init_setpoint + delta;
        Serial.print("Delta Received: ");
        Serial.println(delta);
      }
      serialBuffer = "";
    } else {
      serialBuffer += c;
    }
  }

  input = ReadTemperature();
  myPID.Compute();

  double pwmValue = abs(output);
  digitalWrite(PHASE1, input < setpoint ? LOW : HIGH);
  digitalWrite(PHASE2, input < setpoint ? HIGH : LOW);
  analogWrite(PWM, pwmValue);

  if (started) {
    LogData(pwmValue);
  }

  delay(100);  // 10Hz
}

double ReadTemperature() {
  int Vo = analogRead(A0);
  if (Vo <= 0) return -999.0;
  float R2 = R1 * (Vi / (float)Vo - 1.0);
  float logR2 = log(R2);
  T = (1.0 / (c1 + c2 * logR2 + c3 * pow(logR2, 3))) - 273.15;
  return T;
}

void LogData(double pwmValue) {
  double delta = setpoint - init_setpoint;
  String time = String(millis() - startTime);
  String message = time + "," +
                   String(input, 2) + "," +
                   String(setpoint, 2) + "," +
                   String(delta, 2) + "," +
                   String(pwmValue, 2) + "," +
                   "Received:" + String(delta, 2);
  Serial.println(message);
}

🐍 파이썬 코드 (ΔT 시퀀스 전송 및 로그 저장)

  • 각 ΔT마다 50회 자극 + 50회 복귀 (총 100개) 전송
  • 아두이노는 10Hz 주기로 수신하여 로그 출력

CSV 파일은 logs/20250416/delta_3.0/run_1.csv 형식으로 저장됩니다.

# -------------------------
# 전체 파이썬 코드: 아두이노와 10Hz 통신, 반복 ΔT 로그 저장 및 평균 시각화
# -------------------------

import serial
import csv
import time
import threading
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import glob

# -------------------------
# 설정
# -------------------------
PORT = 'COM4'
BAUD = 115200
DURATION = 10
WAIT_BETWEEN_RUNS = 10
REPEAT = 5  # 각 ΔT에 대해 10회 반복
DATE_STR = time.strftime('%Y%m%d')
BASE_DIR = os.path.join("logs", DATE_STR)
os.makedirs(BASE_DIR, exist_ok=True)

# -------------------------
# 로그 수신 쓰레드
# -------------------------
def log_receiver(ser, duration_s, filepath):
    start_time = time.time()
    with open(filepath, mode='w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Millis', 'Input_Temperature', 'Setpoint', 'Delta', 'PWM', 'Received'])
        while time.time() - start_time < duration_s:
            try:
                line = ser.readline().decode('utf-8').strip()
                if line and ',' in line and 'Received:' in line:
                    parts = line.split(',')
                    if len(parts) == 6:
                        writer.writerow(parts)
            except:
                continue

# -------------------------
# ΔT 리스트 전송 (10Hz), 로그 저장
# -------------------------
def send_delta_list_10hz(delta_value, run_index):
    delta_folder = os.path.join(BASE_DIR, f"delta_{delta_value:.1f}")
    os.makedirs(delta_folder, exist_ok=True)
    filepath = os.path.join(delta_folder, f"run_{run_index}.csv")

    ser = serial.Serial(PORT, BAUD, timeout=1)
    time.sleep(2)
    ser.reset_input_buffer()

    # 로그 수신 시작
    log_thread = threading.Thread(target=log_receiver, args=(ser, DURATION, filepath))
    log_thread.start()

    # ΔT 전송
    ser.write(b"start\n")
    time.sleep(0.1)
    deltas = [delta_value] * 50 + [0.0] * 50  # 50개 자극, 50개 복귀
    for i, delta in enumerate(deltas):
        msg = f"{delta:.2f}\n"
        ser.write(msg.encode())
        time.sleep(0.1)

    log_thread.join()
    ser.close()

    print(f"[Log] Saved to: {filepath}")
    print("[Rest] Waiting for baseline recovery...")
    time.sleep(WAIT_BETWEEN_RUNS)

# -------------------------
# 시각화 함수
# -------------------------
def visualize_single_average(delta_value):
    delta_folder = os.path.join(BASE_DIR, f"delta_{delta_value:.1f}")
    run_files = sorted(glob.glob(os.path.join(delta_folder, "run_*.csv")))
    all_runs = []

    for f in run_files:
        try:
            df = pd.read_csv(f)
            if 'Millis' not in df.columns: continue
            if df['Setpoint'].max() > 0 or df['Setpoint'].min() < 0:
                df['Time_s'] = df['Millis'] / 1000.0
                df['Input_Temperature'] = df['Input_Temperature'].clip(20, 40)
                df['Setpoint'] = df['Setpoint'].clip(20, 40)
                all_runs.append(df)
        except Exception as e:
            print(f"⚠️ Error reading {f}: {e}")
            continue

    if len(all_runs) == 0:
        print(f"❌ No valid runs for ΔT {delta_value:.1f}")
        return

    min_len = min(len(df) for df in all_runs)
    input_matrix = np.array([df['Input_Temperature'].values[:min_len] for df in all_runs])
    setpoint_matrix = np.array([df['Setpoint'].values[:min_len] for df in all_runs])
    time_axis = all_runs[0]['Time_s'].values[:min_len]

    mean_input = input_matrix.mean(axis=0)
    mean_setpoint = setpoint_matrix.mean(axis=0)

    plt.figure(figsize=(10, 5))
    for df in all_runs:
        plt.plot(df['Time_s'].values[:min_len], df['Input_Temperature'].values[:min_len], color='gray', alpha=0.3)

    plt.plot(time_axis, mean_input, label='Mean Input Temp', color='darkgreen', linewidth=2)
    plt.plot(time_axis, mean_setpoint, label='Mean Setpoint', color='orange', linestyle='--', linewidth=2)

    plt.ylim(20, 40)
    plt.xlim(0, time_axis[-1])
    plt.xlabel("Time (s)")
    plt.ylabel("Temperature (°C)")
    plt.title(f"ΔT {delta_value:.1f} - Trials + Average")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()

    plot_path = os.path.join(delta_folder, f"delta_{delta_value:.1f}_trials_and_average_plot.png")
    plt.savefig(plot_path)
    plt.close()
    print(f"[Plot] Saved: {plot_path}")

# -------------------------
# 실행
# -------------------------
if __name__ == "__main__":
    DELTA_VALUES = np.round(np.arange(-6.0, 6.1, 1.0), 2)  # -6.0 to +6.0 (step 1.0)
    for delta in DELTA_VALUES:
        for run in range(1, REPEAT + 1):
            send_delta_list_10hz(delta, run_index=run)
        visualize_single_average(delta)

📈 ΔT 평균 시각화

각 ΔT에 대해 평균 온도 및 목표 온도 곡선을 시각화하여 delta_3.0_trials_and_average_plot.png 형식으로 저장됩니다.
아래 이미지 참고


위의 두 코드를 통해 데이터를 확보하고 나면, 아래의 코드를 통해서 상승시간과 복귀 시간을 알 수 있습니다.

⏱ 상승 및 복귀 시간 계산

import os
import glob
import pandas as pd
import numpy as np

# -------------------------
# 설정
# -------------------------
BASE_LOG_PATH = "logs/20250416"  # <-- 여기를 실제 경로로 바꾸세요
SAVE_PATH = os.path.join(BASE_LOG_PATH, "delta_summary_rise_return_times.csv")
BASELINE_TEMP = 32.5
TOLERANCE = 0.2  # 허용 오차 범위 (°C)

# -------------------------
# 함수: 온도 기준으로 상승/복귀 시간 계산
# -------------------------
def calculate_rise_and_return_times_by_temperature(df):
    df = df.copy()
    df['Time_s'] = df['Millis'] / 1000.0
    df['Delta'] = df['Delta'].astype(float)
    df['Setpoint'] = df['Setpoint'].astype(float)
    df['Input_Temperature'] = df['Input_Temperature'].astype(float)

    main_delta = df['Delta'][df['Delta'] != 0.0].iloc[0] if (df['Delta'] != 0.0).any() else 0.0
    if main_delta == 0.0:
        return main_delta, None, None

    stim_start_time = df[df['Delta'] != 0.0]['Time_s'].iloc[0]
    return_start_time = df[(df['Time_s'] > stim_start_time) & (df['Delta'] == 0.0)]['Time_s'].iloc[0]

    target_temp = BASELINE_TEMP + main_delta

    # 상승 시간
    if main_delta > 0:
        rise_condition = df['Input_Temperature'] >= (target_temp - TOLERANCE)
    else:
        rise_condition = df['Input_Temperature'] <= (target_temp + TOLERANCE)

    rise_df = df[(df['Time_s'] >= stim_start_time) & rise_condition]
    rise_time = rise_df['Time_s'].iloc[0] - stim_start_time if not rise_df.empty else None

    # 복귀 시간
    if main_delta > 0:
        return_condition = df['Input_Temperature'] <= (BASELINE_TEMP + TOLERANCE)
    else:
        return_condition = df['Input_Temperature'] >= (BASELINE_TEMP - TOLERANCE)

    return_df = df[(df['Time_s'] >= return_start_time) & return_condition]
    return_time = return_df['Time_s'].iloc[0] - return_start_time if not return_df.empty else None

    return main_delta, rise_time, return_time

# -------------------------
# 모든 delta 폴더 순회하여 결과 정리
# -------------------------
all_results = []

delta_folders = sorted([
    f for f in os.listdir(BASE_LOG_PATH)
    if f.startswith("delta_") and os.path.isdir(os.path.join(BASE_LOG_PATH, f))
])

for folder in delta_folders:
    folder_path = os.path.join(BASE_LOG_PATH, folder)
    csv_files = glob.glob(os.path.join(folder_path, "run_*.csv"))
    try:
        delta_value = float(folder.replace("delta_", ""))
    except ValueError:
        continue

    for csv_file in csv_files:
        try:
            df = pd.read_csv(csv_file)
            delta, rise, ret = calculate_rise_and_return_times_by_temperature(df)
            all_results.append({
                "Folder": folder,
                "File": os.path.basename(csv_file),
                "Delta": delta,
                "Rise_Time_s": round(rise,2),
                "Return_Time_s": round(ret,2)
            })
        except Exception as e:
            print(f"⚠️ Error reading {csv_file}: {e}")

# -------------------------
# 결과 저장
# -------------------------
summary_df = pd.DataFrame(all_results)
summary_df.to_csv(SAVE_PATH, index=False)
print(f"✅ Saved summary to {SAVE_PATH}")

📄 결과 저장 예시:

Folder File Delta Rise_Time_s Return_Time_s
delta_6.0 run_1.csv 6.0 2.3 4.1
delta_-3.0 run_5.csv -3.0 1.9 3.7

 

 

728x90
반응형

'HCI > Arduino' 카테고리의 다른 글

[Arduino] dual thermal feedback  (0) 2025.05.22
[Arduino] Thermistor, Peltier module 을 활용해 Rising time과 Return Time 다항식 만들기 / 파이썬 / 아두이노  (0) 2025.04.16
[Arduino] Thermistor 센서 값 받아 아두이노 출력 하기  (2) 2024.09.27
'HCI/Arduino' 카테고리의 다른 글
  • [Arduino] dual thermal feedback
  • [Arduino] Thermistor, Peltier module 을 활용해 Rising time과 Return Time 다항식 만들기 / 파이썬 / 아두이노
  • [Arduino] Thermistor 센서 값 받아 아두이노 출력 하기
sillon
sillon
꾸준해지려고 합니다..
    반응형
  • sillon
    sillon coding
    sillon
  • 전체
    오늘
    어제
    • menu (614)
      • notice (2)
      • python (68)
        • 자료구조 & 알고리즘 (23)
        • 라이브러리 (19)
        • 기초 (8)
        • 자동화 (14)
        • 보안 (1)
      • coding test - python (301)
        • Programmers (166)
        • 백준 (76)
        • Code Tree (22)
        • 기본기 문제 (37)
      • coding test - C++ (5)
        • Programmers (4)
        • 백준 (1)
        • 기본기문제 (0)
      • 공부정리 (5)
        • 신호처리 시스템 (0)
        • Deep learnig & Machine lear.. (41)
        • Data Science (18)
        • Computer Vision (17)
        • NLP (40)
        • Dacon (2)
        • 모두를 위한 딥러닝 (강의 정리) (4)
        • 모두의 딥러닝 (교재 정리) (9)
        • 통계 (2)
      • HCI (23)
        • Haptics (7)
        • Graphics (11)
        • Arduino (4)
      • Project (21)
        • Web Project (1)
        • App Project (1)
        • Paper Project (1)
        • 캡스톤디자인2 (17)
        • etc (1)
      • OS (10)
        • Ubuntu (9)
        • Rasberry pi (1)
      • App & Web (9)
        • Android (7)
        • javascript (2)
      • C++ (5)
        • 기초 (5)
      • Cloud & SERVER (8)
        • Git (2)
        • Docker (1)
        • DB (4)
      • Paper (7)
        • NLP Paper review (6)
      • 데이터 분석 (0)
        • GIS (0)
      • daily (2)
        • 대학원 준비 (0)
      • 영어공부 (6)
        • job interview (2)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    소수
    programmers
    백준
    Python
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
sillon
[Arduino] Thermistor, Peltier module 을 활용해 Rising time과 Return Time 계산하기 / 파이썬 / 아두이노
상단으로

티스토리툴바