Tải bản đầy đủ (.pdf) (20 trang)

báo cáo lưu trữ và xử lý dữ liệu lớn đề tài xây dựng data pipeline xử lý dữ liệu từ thị trường chứng khoán

Bạn đang xem bản rút gọn của tài liệu. Xem và tải ngay bản đầy đủ của tài liệu tại đây (6.67 MB, 20 trang )

<span class="text_page_counter">Trang 1</span><div class="page_container" data-page="1">

<b>TRƯỜNG ĐẠI HỌC BÁCH KHOA HÀ NỘI</b>

VIỆN CÔNG NGHỆ THÔNG TIN VÀ TRUYỀN THÔNG**********

<b>BÁO CÁO</b>

<b>LƯU TRỮ VÀ XỬ LÝ DỮ LIỆU LỚNĐỀ TÀI: Xây dựng data pipeline xử lý dữ liệu từ thị </b>

<b>trường chứng khốn</b>

<b>Nhóm 28</b>

</div><span class="text_page_counter">Trang 2</span><div class="page_container" data-page="2">

<b>2.2 2 Luồng xử lý dữ liệu chính của hệ thống</b> 4

<b>c. Xử lý dữ liệu tại pyspark_application 10d. Cung cấp API cung cấp thông tin các bài báo thu thập được 13</b>

<b>Đóng góp thành viên</b>

STT H và tênọ MSSV Công vi c th c hi nệ ự ệ1 Trầần Đ c H iứ ả 20194270 Nghiên c u và cài ứ

đ t môi trặ ường docker, x lý d li uử ữ ệ2 Nguyêễn Phương

20194336 Crawl d li u các ữ ệbài báo, têần x lý ửd li u báoữ ệ3 Nguyêễn M nh Duyạ 20194262 Crawl d li u t api ữ ệ ừ

ssi, têần x lý d ử ữli u ssiệ4 Nguyêễn Tr ng Hùngọ 20194290 Xầy d ng server ự

cung cầấp api các bàibáo

</div><span class="text_page_counter">Trang 3</span><div class="page_container" data-page="3">

<b>Chương 1: Giới thiệu đề tài</b>

Trong năm 2022, thị trường chứng khoán Việt Nam (TTCK) cũng ghi nhận những kỷ lục vềsố lượng nhà đầu tư mới tham gia, có tháng lên tới gần nửa triệu tài khoản được mở mới, khốingoại mua ròng kỷ lục hay thanh khoản một số cổ phiếu lên tới cả trăm triệu đơn vị/phiên.Thị trường cũng ghi nhận những biến động khó lường và đầy cảm xúc của nhiều cổ phiếu.Trong tháng 11 và 12, TTCK cũng ghi nhận những phiên hồi phục mạnh nhất thế giới. Rấtnhiều kỷ lục đã được ghi nhận trong năm qua, trong đó nhiều mã cổ phiếu có những lúc mangđến niềm vui lớn, nhưng cũng có lúc là nỗi buồn xót xa đối với nhiều nhà đầu tư trên sàn.Khơng ít mã cổ phiếu lên đỉnh cao mọi thời đại, tăng gấp 5-7, thậm chí vài chục lần, trongmột khoảng thời gian ngắn nhưng rồi rớt rất nhanh, khiến các nhà đầu tư hoang mang lo sợ.

Chính vì vậy một công cụ trợ giúp nhà đầu tư thấy được toàn cảnh thị trường một cách chitiết và toàn diện trở thành một nhu cầu quan trọng đối với mỗi nhà đầu tư

Nhìn thấy nhu cầu cao như vậy nhóm chúng em quyết định xây dựng áp dụng các công nghệlưu trữ và xử lý dữ liệu lớn cho dữ liệu chứng khoán tại thị trường Việt Nam nhằm cung cấpcho nhà đầu tư các thơng tin hữu ích khi tham gia thị trường, giúp nhà đầu tư có thể đưa racác quyết định ít rủi ro nhất.

Các cơng nghệ mà nhóm sử dụng trong hệ thống phân tích thông tin tuyển dụng này bao gồmHadoop File System, Spark, Kafka, Mongodb, ExpressJS, ElasticSearch và Kibana.

</div><span class="text_page_counter">Trang 4</span><div class="page_container" data-page="4">

<b>Chương 2: Tổng quan hệ thống 2.1 Data Pipeline</b>

Hệ thống đảm bảo 2 luồng dữ liệu nguồn được xử lý riêng biệt

- Đầu tiên là dữ liệu chứng khốn thơ được lấy từ public api của ssi được ghi vào hadoop và gửi đường dẫn file lên kafka sau đó spark lắng nghe kafka và đọc dữ liệu thô ra xử lý rồi lưu 1 bản trở lại hadoop và lưu 1 bản lên elasticsearch để phục vụ hiểnthị dữ liệu.

- Thứ 2 là luồng xử lý dữ liệu các bài báo ban đầu dữ liệu được lấy từ link rss của các nguồn báo khác nhau và gửi lên kafka, spark sẽ lắng nghe kafka và xử lý dữ liệu đượcgửi đến. Sau đó spark lưu 1 bản vào hadoop, 1 bản vào mongodb và 1 bản vào kibana.

<b>2.2 Các thành phần của hệ thống</b>

<b>a. Article crawler</b>

Một chương trình chạy trên môi trường NodeJS sử dụng thư viện Axios và CherioJS để tìm nạp các bài báo từ link RSS của VNExpress và trích xuất nội dung bài báo bằng cách lấy nội dung của các thẻ p từ mã HTML rồi gửi danh sách các bài báo lên Kafka topic ‘article’.

<b>b. Stock_iboard_crawler</b>

Một chương trình chạy trên python:3.7 định kỳ (1 phút) sẽ gọi api của SSI iboard để lấy thơng tin hiện tại của các mã chứng khốn. Sau đó ghi dữ liệu này lên hadoop và gửi đường dẫn hadoop cùng đường dẫn file config, dấu thời gian lên Kafka topic ‘crawl_data’

‘/project20221/articles.parquet’ ở định dạng các file parquet.

<b>e. Pyspark_application</b>

Ứng dụng spark được viết trên môi trường python:3.7 lắng nghe và nhận công việc từ

</div><span class="text_page_counter">Trang 5</span><div class="page_container" data-page="5">

Kafka rồi thực hiện các thao tác xử lý, lưu trữ dữ liệu

<b>f. Mongodb</b>

Cơ sở dữ liệu có 1 bảng articles để lưu trữ thông tin các bài báo sử dụng Mongodb Atlas

<b>g. NodeJS server: Article_server</b>

Server chạy trên môi trường NodeJS kết nối với Mongodb và cung cấp 1 API trả về danh sách các bài báo được lưu trữ.

<b>h. Elasticsearch Kibana</b>

Cơng cụ lưu trữ và trực quan hóa dữ liệu

</div><span class="text_page_counter">Trang 6</span><div class="page_container" data-page="6">

<b>2.2 2 Luồng xử lý dữ liệu chính của hệ thống</b>

Luồng xử lý dữ liệu ssi

</div><span class="text_page_counter">Trang 7</span><div class="page_container" data-page="7">

Luồng xử lý dữ liệu các bài báo kinh tế

</div><span class="text_page_counter">Trang 8</span><div class="page_container" data-page="8">

<b>Chương 3. Triển khai 3.1 Triển khai</b>

namenode:2.0.0-hadoop3.2.1-datanode 9864 hadoop3.2.1-java8

bde2020/hadoop-datanode:2.0.0-Spark spark-master 8080 hadoop3.3

bde2020/spark-master:3.3.0-spark-worker 8081 hadoop3.3

bde2020/spark-worker:3.3.0-Elasticsearch elasticsearch 9200 docker.elastic.co/elasticsearch/elasticsearch:7.15.1

kibana 5601 docker.elastic.co/kibana/kibana:7.15.1

Kafka zookeeper 2181 wurstmeister/zookeeperkafka 9092 wurstmeister/kafka

App pyspark_application myracoon/racoon_pyspark:mongo

stock_iboard_crawler myracoon/racoon_pyspark:mongo

articles_crawler myracoon/racoon_node:latestarticles_server 3000 myracoon/racoon_node:latest

<b>3.1.2 Thực hiện</b>

<b>a. Crawl dữ liệu từ vnexpress</b>

Nhóm lặp 10 phút 1 lần sử dụng RSS được cung cấp bởi vnexpress:

‘ để có được danh sách các bài báo bằng thư viện axios ở định dạng xml và dùng xml2json để đưa về json sử dụng.

Sau đó với mỗi bài link sẽ sử dụng axios để lấy html rồi sử dụng Cheerio.js để lấy nội dung các thẻ p trong mã html của bài báo

</div><span class="text_page_counter">Trang 9</span><div class="page_container" data-page="9">

Cuối cùng gửi tất cả lên Kafka topic ‘article’

<b>b. Crawl dữ liệu từ SSI Iboard</b>

Đối với dữ liệu từ bảng giá SSI nhóm sử dụng python với thư viện request để gọi api

</div><span class="text_page_counter">Trang 10</span><div class="page_container" data-page="10">

Đối với dữ liệu từ bảng giá SSI nhóm sử dụng python với thư viện request để gọi api, thông tin để gọi api như đương dẫn, body, … được lưu trong các file json tại ‘/stock_iboard_crawler/config’ mà sẽ được stock_iboard_crawler ghi lên hadoop mỗi lần khởi động lại ví dụ:

Dữ liệu sau khi lấy về sẽ được ghi 1 bản vào hadoop và gửi đường dẫn hadoop này lên Kafka cùng với đường dẫn file config và dấu thời gian.

</div><span class="text_page_counter">Trang 11</span><div class="page_container" data-page="11">

<b>c. Xử lý dữ liệu tại pyspark_application</b>

Đầu tiên khi khởi động pyspark_application sẽ tạo một ứng dụng spark, kết nối đến vàlắng nghe kafka, tạo 1 kết nối đến mongodb

Tạo client spark

Nhóm thực hiện tạo client spark với 4 core tối đa vì trong mơi trường docker nhóm chỉ có thể chạy 1 spark worker với 8 core mà cần 4 core nữa cho ssi_iboard_crawler.Sau đó nếu chưa có thơng tin các mã chứng khốn thì chương trình sẽ sử dụng thư viện request để tìm nạp tất cả thơng tin các mã chứng khoán và ghi ra

‘/data/stock.json’

</div><span class="text_page_counter">Trang 12</span><div class="page_container" data-page="12">

<b>-Dữ liệu báo từ topic ‘article’</b>

Sau khi có đầy đủ dữ liệu chương trình sẽ lắng nghe và xử lý dữ liệu từ kafka. Đối vớidữ liệu từ topic ‘article’ là danh sách các bài báo thì chương trình sẽ thực hiện tiền xử lý để loại bỏ các bài báo trùng lặp bằng cách tính cột id = ‘nguồn báo’ + ‘id của bài báo’ và xác định trùng lặp bằng cách kiểm tra id này đã tồn tại trong dataframe chứa các bài báo đã được ghi vào hadoop trong quá khứ chưa, cùng với việc chuẩn hóa lại thời gian xuất bản.

Sau đó sẽ sử dụng spark map để tìm ra các tag của bài báo (các mã chứng khoán đượcnhắc đến trong nội dung bài báo) bằng cách sử dụng regex để chỉ lấy ra chữ, số, dấu cách từ nội dung bài báo -> tách thành từng từ -> so khớp với từng mã chứng khoán -> trả về cột tag là chuỗi các mã chứng khốn cách nhau bởi dấu cách.

/pyspark_application/services/article.pySau đó ghi thông tin này vào hadoop và mongodb.

</div><span class="text_page_counter">Trang 13</span><div class="page_container" data-page="13">

Tiếp theo tách từng bài báo theo từng lần mã cổ phiếu xuất hiện, xóa đi các hàng có tag rỗng và ghi vào elasticsearch

<b>-Dữ liệu từ topic ‘crawl_data’</b>

Dữ liệu từ SSI có 2 vấn đề một là dữ liệu sẽ có thể bị thiếu đi các cột dữ liệu do chưa đến giờ hoặc SSI chưa cập nhật thì api sẽ khơng trả về, và các cột có trả về thì cũng có thể thiếu, bằng ký tự rỗng hoặc bằng null. Vì vậy cần phải tiền xử lý.

Đối với các vấn đề trên nhóm sử dụng hàm fill_data

Cùng với withColumn, dataframe.na.fill để tạo các mới nếu chưa tồn tại, cast lại các cột có giá trị thuộc kiểu string về đúng kiểu có thể tính tốn là double, long, … điền giá trị mặc định vào các chỗ trống.

Sau khi kết thúc quá trình tiền xử lý, nhóm sử dụng spark map để lặp qua từng hàng một và thêm vào các cột thông tin của doanh nghiệp tương ứng với mã cổ phiếu và sàn giao dịch.

Sau đó vì dữ liệu doanh nghiệp được lấy từ api chưa hề qua xử lý vậy nên sẽ tồn tại các giá trị null và các trường như vốn điều lệ, số lượng nhân viên đều ở dạng string. Để xử lý điều đó nhóm thực hiện 2 đoạn code sau để đưa về kiểu giá trị đúng và điền giá trị mặc định vào các chỗ trống.

</div><span class="text_page_counter">Trang 14</span><div class="page_container" data-page="14">

/pyspark_application/services/ssi_stock_data.pySau khi xử lý lưu 1 bản dữ liệu vào Hadoop và 1 bản vào elasticsearch

<b>d. Cung cấp API cung cấp thông tin các bài báo thu thập được</b>

Dữ liệu các bài báo sau khi được ghi vào database mongodb sẽ có thể đọc thơng qua một api cung cấp bởi 1 server nodejs. Api cung cấp ở cổng 3000 container article_server hoặc được map với cổng 3000 ở máy local

</div><span class="text_page_counter">Trang 16</span><div class="page_container" data-page="16">

Ngoài các biểu đồ về toàn bộ thị trường với dữ liệu trên nhóm cịn có thể xây dựng cơng cụ để theo dõi sự thay đổi của 1 cổ phiếu bất kỳ.

Theo dõi mã cổ phiếu AAA trên sàn hose ngày 31 / 1

Theo dõi mã cổ phiếu AAA trên sàn hose sáng ngày 9 / 2

Với Kibana nhóm cịn thử nghiệm so sánh mức độ thay đổi của một cổ phiếu về giá và tỷ trọng giao dịch so với các cổ phiếu khác cùng ngành để giúp người dùng dễ

</div><span class="text_page_counter">Trang 17</span><div class="page_container" data-page="17">

dàng nhận định rủi ro, đưa ra các quyết định sáng suốt hơn trong việc lựa chọn đầu tư.

So sánh thay đổi về giá của AAA so với các cổ phiếu cùng ngành 1353 - Hóa chất

So sánh tỷ trọng khối lượng của AAA so với các cổ phiếu cùng sub sector 1353 - Hóachất

<b>Dữ liệu các bài báo</b>

Ngồi sử dụng cho api cung cấp thơng tin các bài báo dữ liệu từ các bài báo cịn được phần tích để tìm ra các mã doanh nghiệp được nhắc đến trong bài báo để tìm được độ phổ biến của một mã cổ phiếu trên phương tiện truyền thông.

</div><span class="text_page_counter">Trang 18</span><div class="page_container" data-page="18">

Độ phổ biến trên các phương tiện truyền thông

</div><span class="text_page_counter">Trang 19</span><div class="page_container" data-page="19">

<b>Chương 4: Kết luận và định hướng</b>

Sau quá trình làm bài tập lớn, nhóm đã tìm hiểu vả triển khai được cơ bản về các công nghệ Kafka, Spark, Hadoop, Elasticsearch cùng Kibana từ đó đã xây dựng được một cơng cụ thực sự có tác dụng giúp cho nhà đầu tư muốn có cái nhìn bao qt, tồn diện về thị trường chứng khốn.

Định hướng tương lai có thể ứng dụng các cơng nghệ mới như học máy, các thuật tốn phân tích văn bản và đa dạng hóa các nguồn tin như các bình luận trên nền tảng mạng xã hội cũng như mở rộng thêm các nguồn báo để có thể có được cái nhìn rõ nhất về suy nghĩ của thị trường.

Đến đây thì project cũng đã hồn thiện. Nhóm 28 chúng em xin cảm ơn thầy đã hướng dẫn trong suốt quá trình làm project này. Chúng em xin chân thành cảm ơn!

</div><span class="text_page_counter">Trang 20</span><div class="page_container" data-page="20">

<b>TÀI LIỆU THAM KHẢO</b>

[1] TS. Trần Việt Trung, Slide lưu trữ và xử lý dữ liệu lớn[2] Kafka documentation , Pyspark documentation, MongoDB documentation, MongoDB Documentation

[7] Spark, Apache Spark Installation on Windows - Spark

[9] PySpark Document, Apache Spark Installation on Windows - Spark

[10] PySpark UDF, PySpark UDF (User Defined Function) - Spark

[12] Mongoose Document, Mongoose v6.4.6: Documents (mongoosejs.com)

[13] ExpressJS, Express "Hello World" example (expressjs.com)

</div>

×