Akashic Records

학습사이트 http://books.toscrape.com 크롤링 결과 DB 저장 본문

Web Crawling for Beginners

학습사이트 http://books.toscrape.com 크롤링 결과 DB 저장

Andrew's Akashic Records 2024. 4. 26. 15:12
728x90

위는 파이썬 웹 크롤러 프로그램의 개념적 다이어그램을 나타낸 이미지입니다. 이 이미지는 컴퓨터 화면에 코드가 표시되고, 웹 서버와의 데이터 요청 및 응답을 상징하는 화살표가 흐름을 보여주는 모습을 단순하고 교육적인 스타일로 표현하고 있습니다. 컴퓨터는 'Web Crawler'로, 웹 서버는 'Website'로 레이블이 붙어 있습니다.

 

파이썬에서 MySQL 데이터베이스를 사용하여 유틸리티 모듈을 만드는 것은 매우 효과적인 방법입니다. 여기서 설명하는 모듈은 MyBatis와 유사하게 데이터베이스 연결, 쿼리 실행, 트랜잭션 관리 등을 쉽게 처리할 수 있도록 도와줍니다. 다음은 기본적인 파이썬 데이터베이스 유틸리티 모듈을 설계하는 방법에 대한 개요입니다.

 

1. 필요한 라이브러리 설치

MySQL과의 연동을 위해 mysql-connector-python 패키지를 사용할 수 있습니다. 이 라이브러리는 MySQL 서버와의 연결 및 쿼리 실행을 지원합니다.

pip install mysql-connector-python

 

2. 데이터베이스 연결 관리

데이터베이스 연결을 관리하는 클래스를 만들어 보겠습니다. 이 클래스는 데이터베이스에 연결하고 연결을 종료하는 기능을 제공합니다.

import mysql.connector
from mysql.connector import Error

class DatabaseManager:
    def __init__(self, host, database, user, password):
        self.connection = None
        self.host = host
        self.database = database
        self.user = user
        self.password = password

    def connect(self):
        try:
            self.connection = mysql.connector.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password
            )
        except Error as e:
            print(f"Error connecting to MySQL database: {e}")
            return None
        return self.connection

    def close_connection(self):
        if self.connection and self.connection.is_connected():
            self.connection.close()

 

3. 쿼리 실행 지원

MyBatis 스타일로 쿼리를 매핑하고 실행하는 메소드를 추가합니다. 이를 위해 SQL 쿼리와 파라미터를 메소드로 전달받아 실행할 수 있도록 합니다.

    def execute_query(self, query, params=None):
        cursor = self.connection.cursor()
        try:
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
            result = cursor.fetchall()
            return result
        except Error as e:
            print(f"Error executing query: {e}")
            return None
        finally:
            cursor.close()

    def execute_update(self, query, params=None):
        cursor = self.connection.cursor()
        try:
            cursor.execute(query, params)
            self.connection.commit()
            return cursor.rowcount
        except Error as e:
            self.connection.rollback()
            print(f"Error executing update: {e}")
            return None
        finally:
            cursor.close()

 

4. 트랜잭션 지원

트랜잭션을 시작하고, 커밋하며, 롤백하는 메소드를 추가합니다.

    def start_transaction(self):
        self.connection.start_transaction()

    def commit_transaction(self):
        self.connection.commit()

    def rollback_transaction(self):
        self.connection.rollback()

이 모듈은 데이터베이스 연결, 쿼리 실행, 트랜잭션 관리를 단순화하고 코드 재사용성을 높입니다. 각 메소드에 적절한 예외 처리와 로깅을 추가하여 더 견고하게 만들 수 있습니다. 사용자의 필요에 따라 더 많은 기능을 추가하거나 조정할 수 있습니다.

 

사용 예시코드

DatabaseManager 클래스를 사용하여 데이터베이스와 상호작용하는 예제 코드를 작성하겠습니다. 이 예제에서는 MySQL 데이터베이스에 연결하고, 특정 테이블에서 데이터를 조회한 다음, 데이터를 수정하는 기본적인 작업을 수행합니다.

 

예제 시나리오

  • 데이터베이스 이름: sample_db
  • 사용자 이름: user
  • 비밀번호: password
  • 호스트: localhost
  • 테이블 이름: employees (id, name, position 컬럼을 가짐)

예제 코드

# DatabaseManager 클래스 정의는 앞선 설명에 따라 이미 완료된 상태입니다.

def main():
    # 데이터베이스 설정 정보
    db_config = {
        'host': 'localhost',
        'database': 'sample_db',
        'user': 'user',
        'password': 'password'
    }

    # DatabaseManager 인스턴스 생성 및 데이터베이스 연결
    db_manager = DatabaseManager(**db_config)
    connection = db_manager.connect()

    if connection:
        print("Connection established!")

        # 쿼리 실행: 모든 직원 조회
        query = "SELECT id, name, position FROM employees"
        employees = db_manager.execute_query(query)

        if employees:
            print("Employees:")
            for emp in employees:
                print(emp)

        # 쿼리 실행: 직원 정보 업데이트
        update_query = "UPDATE employees SET position = %s WHERE name = %s"
        params = ('Senior Developer', 'John Doe')
        updated_rows = db_manager.execute_update(update_query, params)

        if updated_rows is not None:
            print(f"Updated {updated_rows} rows.")

        # 데이터베이스 연결 종료
        db_manager.close_connection()
    else:
        print("Failed to establish a connection.")

if __name__ == "__main__":
    main()

 

설명

  1. 데이터베이스 연결: DatabaseManager 클래스의 인스턴스를 생성하고, MySQL 서버에 연결합니다.
  2. 데이터 조회: execute_query 메소드를 사용하여 employees 테이블의 모든 직원 정보를 조회합니다.
  3. 데이터 업데이트: execute_update 메소드를 사용하여 특정 직원의 직위를 업데이트합니다. 이 메소드는 파라미터를 사용하여 SQL 인젝션을 방지합니다.
  4. 연결 종료: 모든 작업이 끝나면 데이터베이스 연결을 종료합니다.

이 코드는 Python 스크립트의 main 함수에서 실행되며, 터미널 또는 커맨드 프롬프트에서 실행할 수 있습니다. 데이터베이스의 실제 연결 정보에 따라 db_config를 수정해야 할 수도 있습니다.

 

Oracle DatabaseManager

Oracle 데이터베이스를 위한 DatabaseManager 클래스를 구현하는 것은 MySQL 버전과 유사합니다. Oracle 데이터베이스와의 연동을 위해서는 cx_Oracle 라이브러리를 사용할 수 있습니다. 이 라이브러리를 사용하여 데이터베이스 연결, 쿼리 실행, 트랜잭션 관리 등을 처리할 수 있습니다.

 

필요한 라이브러리 설치

Oracle 데이터베이스와 연동하기 전에 cx_Oracle 패키지를 설치해야 합니다.

pip install cx_Oracle

 

Oracle DatabaseManager 클래스 구현

import cx_Oracle
from cx_Oracle import DatabaseError

class OracleDatabaseManager:
    def __init__(self, dsn, user, password):
        self.dsn = dsn
        self.user = user
        self.password = password
        self.connection = None

    def connect(self):
        try:
            self.connection = cx_Oracle.connect(
                user=self.user,
                password=self.password,
                dsn=self.dsn
            )
            return self.connection
        except DatabaseError as e:
            print(f"Error connecting to Oracle database: {e}")
            return None

    def close_connection(self):
        if self.connection:
            self.connection.close()

    def execute_query(self, query, params=None):
        cursor = self.connection.cursor()
        try:
            cursor.execute(query, params or {})
            return cursor.fetchall()
        except DatabaseError as e:
            print(f"Error executing query: {e}")
            return None
        finally:
            cursor.close()

    def execute_update(self, query, params=None):
        cursor = self.connection.cursor()
        try:
            cursor.execute(query, params or {})
            self.connection.commit()
            return cursor.rowcount
        except DatabaseError as e:
            self.connection.rollback()
            print(f"Error executing update: {e}")
            return None
        finally:
            cursor.close()

    def start_transaction(self):
        self.connection.begin()

    def commit_transaction(self):
        self.connection.commit()

    def rollback_transaction(self):
        self.connection.rollback()

 

사용 예제

def main():
    # Oracle 데이터베이스 설정 정보
    dsn = cx_Oracle.makedsn('host', 1521, service_name='orcl')
    db_config = {
        'dsn': dsn,
        'user': 'user',
        'password': 'password'
    }

    # OracleDatabaseManager 인스턴스 생성 및 연결
    db_manager = OracleDatabaseManager(**db_config)
    connection = db_manager.connect()

    if connection:
        print("Connection established to Oracle!")

        # 쿼리 실행: 모든 직원 조회
        query = "SELECT id, name, position FROM employees"
        employees = db_manager.execute_query(query)

        if employees:
            print("Employees:")
            for emp in employees:
                print(emp)

        # 쿼리 실행: 직원 정보 업데이트
        update_query = "UPDATE employees SET position = :1 WHERE name = :2"
        params = ('Senior Developer', 'John Doe')
        updated_rows = db_manager.execute_update(update_query, params)

        if updated_rows is not None:
            print(f"Updated {updated_rows} rows.")

        # 연결 종료
        db_manager.close_connection()
    else:
        print("Failed to establish a connection.")

if __name__ == "__main__":
    main()

이 코드는 Oracle 데이터베이스에 연결하여 기본적인 데이터 조회 및 업데이트 작업을 수행합니다. 사용 중인 Oracle 서버의 호스트, 포트, 서비스 이름에 맞게 DSN 정보를 조정해야 합니다.

 

학습사이트 http://books.toscrape.com 크롤링 결과 테이블

CRAWLER_BOOKS 테이블 구조

 

 

크롤링 코드

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from collections import deque
import cx_Oracle
from database import OracleDatabaseManager
import sys
import io

def get_books_from_page(soup):
    """ 페이지에서 책들의 정보를 추출하는 함수 """
    books = []
    for book in soup.find_all('article', class_='product_pod'):
        title = book.find('h3').find('a')['title']
        price = book.find('p', class_='price_color').text
        stock = book.find('p', class_='instock availability').text.strip()
        books.append((title, price, stock))
    return books

def bfs_crawl(start_url, max_depth=3):
    """ 너비 우선 탐색을 사용하여 사이트 전체를 크롤링 """
    queue = deque([(start_url, 0)])  # URL과 현재 깊이를 저장합니다.
    visited = set()
    all_books = []

    while queue:
        current_url, depth = queue.popleft()
        if depth > max_depth:
            break
        if current_url in visited:
            continue
        
        visited.add(current_url)

        response = requests.get(current_url)
        soup = BeautifulSoup(response.text, 'html.parser')
        all_books.extend(get_books_from_page(soup))

        # 다음 페이지 링크 찾기
        next_button = soup.find('li', class_='next')
        if next_button:
            next_url = next_button.find('a')['href']
            next_page_url = urljoin(current_url, next_url)
            queue.append((next_page_url, depth+1))

    return all_books


def main():
    # 시작 URL
    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
    start_url = 'http://books.toscrape.com/'
    books = bfs_crawl(start_url, max_depth=3)

    # 데이터베이스 설정 정보
    dsn = cx_Oracle.makedsn('xxx.xxx.xxx.xxx', 1521, service_name='orcl')
    db_config = {
        'dsn': dsn,
        'user': 'myuser',
        'password': 'mypassword'
    }
        
    # DatabaseManager 인스턴스 생성 및 데이터베이스 연결
    db_manager = OracleDatabaseManager(**db_config)
    connection = db_manager.connect() 
    if connection:
        print("Connection established!")
        
        ## 트랜제션 시작
        db_manager.start_transaction
        
        init_query = "delete from CRAWLER_BOOKS"
        db_manager.execute_update(init_query)

        insert_query = "insert into CRAWLER_BOOKS(TITLE, PRICE, STOCK) values (:1,:2,:3)"
        for book in books:
            db_manager.execute_update(insert_query, book)

        db_manager.commit_transaction
        # 데이터베이스 연결 종료
        db_manager.close_connection()
    else:
        print("Failed to establish a connection.")    
    
if __name__ == "__main__":
    main()

 

크롤링 결과를 저장하기 전에 기존 결과를 삭제한후 크롤링 결과를 저장한다.

DB 작업 전에 트랜잭션을 시작하고 작업이 끝나면 commit 한 후 연결을 종료한다.

이 코드는 단순한 테스트 코드로 에러처리 부분이 제외되었으니 참고하기 바랍니다.

728x90
Comments