Kafka #2. 설치

Zookeeper #1. 개요
Zookeeper #2. 설치와 설정
Zookeeper #3. 구동과 확인

Kafka #1. 개요
Kafka #2. 설치

Kafka 클러스터 구축을 위해서는 zookeeper 가 필요하다. 일반적으로 kafka 클러스터 구축은 3개 이상의 broker를 가지는데 이 ‘세 개 이상의 broker가 하나처럼 동작하기 위한 정보는 zookeeper가 관리’하기 때문에다.

기본적으로 프로듀서 혹은 컨슈머는 카프카에 연결할 때 직접 카프카 브로커에 연결하는 것이 아니라 주키퍼에 연결하여 카프카 브로커, 토픽, 파티션 등의 정보를 취득한 뒤 브로커에 연결하게 된다.1실제 카프카 연결 시 연결 옵션으로 브로커의 정보만 입력하더라도 주키퍼에 연결한 뒤 다시 브로커에 연결된다.

Zookeeper 설치

zoookeeper 설치는 zookeeper #1. 개요, #2. 설치와 설정, #3. 구동과 확인 포스트를 참고하면 된다.

다운로드 및 압축 해제

  • Apache kafka 공식 페이지에서 바이너리를 다운로드 한다.
  • 다운로드한 파일을 FTP 또는 SFTP등을 이용하여 서버에 업로드한다.
  • 또는 wget 명령을 이용하여 서버에서 다운로드 한다

설정

카프카 클러스터의 설정을 위해서는 ‘zookeeper’ 클러스터가 존재하는 상태에서 $KAFKA_HOME/config/server.config 파일에 기재된 설정만 잘 조절해주면 된다.

############################# Server Basics #############################
# 브로커 고유 ID. 각각의 브로커는 중복되지 않은 고유한 숫자 값을 가진다. 
broker.id=1

############################# Socket Server Settings #############################
# Broker 가 사용하는 호스트와 포트
listeners=PLAINTEXT://dist01.haedongg.net:9092

# Producer와 Consumer가 접근하는 호스트와 포트
# 다음의 연결 옵션을 제공한다 -PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
advertised.listeners=PLAINTEXT://dist01.haedongg.net:9092

# 네트워크 요청 처리 스레드
num.network.threads=3

# IO 발생 시 생기는 스레드 수
num.io.threads=8

# 소켓 서버가 사용하는 송수신 버퍼
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################
# Broker가 받은 데이터 관리를 위한 저장 공간
log.dirs=/home/kafka/data

# 여러 개의 디스크, 여러 개의 디렉토리를 사용하는 경우 콤마로 구분하여 추가한다.
# log.dirs=/home/kafka/data,/mnt/sdb1/kafka/data,/third_disk/mq-data

# 토픽 당 파티션의 수. 값 만큼 병렬 처리 가능하고, 파일 수도 늘어난다.
# 기본 값으로써 토픽을 생성할 때 파티션 숫자를 지정할 수 있고, 변경도 가능하다.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" 
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.

# 토픽에 설정된 replication의 인수가 지정한 값보다 크면 새로운 토픽을 생성하고 작을 경우 브로커의 숫자와 같게 된다.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
# 수집 데이터 파일 삭제 주기. (시간, 168h=7days)
# 실제 수신된 메시지의 보존 기간을 의미한다. 
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# 토픽별 수집 데이터 보관 파일의 크기. 파일 크기를 초과하면 새로운 파일이 생성 된다. 
log.segment.bytes=1073741824

# 수집 데이터 파일 삭제 여부 확인 주기 (밀리초, 300000ms=5min)
log.retention.check.interval.ms=300000

# 삭제 대상 수집 데이터 파일의 처리 방법. (delete=삭제, compact=불필요 내용만 제거)
log.cleanup.policy=delete

# 수집 데이터 파일 삭제를 위한 스레드 수
log.cleaner.threads=1

############################# Zookeeper #############################
# zookeeper 정보.
# zookeeper 클러스터 모두 콤마(,)로 구분해서 기재한다.
zookeeper.connect=dist01.haedongg.net:2181,dist02.haedongg.net:2181,dist03.haedongg.net:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################
# 초기 GroupCoordinator 재조정 지연 시간 (밀리초) 
# 운영환경에서는 3초를 권장 
group.initial.rebalance.delay.ms=3000

############################# 기타 ################################
# 최대 메시지 크기 (byte) 
# 예시는 15MB 까지의 메시지를 수신할 수 있다.
message.max.bytes=15728640

구동 및 확인

Kafka broker 구동을 위해서는 JAVA가 필요하다. JDK가 설치되어있고 환경변수($JAVA_HOME 및 $PATH)가 설정되어 있다면 별도의 설정 없이 카프카 브로커를 구동할 수 있다. 만약 JDK가 설치되어있지 않거나, 별도의 JAVA를 사용하고자 한다면 export 명령을 이용하여 변수를 선언하거나, $KAFKA_HOME/bin/kafka-run-class.sh 파일에 JAVA관련 변수를 넣어주도록 한다.

# Memory options
# 다음 $JAVA_HOME 관련 옵션을 적절히 수정한다.
# JAVA_HOME=/WHERE/YOU/INSTALLED/JAVA 이렇게 JAVA_HOME 변수를 지정하면 된다. 
# 만약 JAVA_HOME 변수가 선언되지 않았다면  PATH에 등록된 경로에서 java 명령을 찾게 된다. 
if [ -z "$JAVA_HOME" ]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi

# Memory options
# 카프카 구동을 위한 java heap memory 옵션
if [ -z "$KAFKA_HEAP_OPTS" ]; then
#  KAFKA_HEAP_OPTS="-Xmx256M"   이 값이 기본
  KAFKA_HEAP_OPTS="-Xms256M -Xmx1G"
fi

broker 실행

  • broker로 구동할 모든 Kafka 서버에서 수행한다.
$KAFKA_HOME/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
ex) /home/apps/kafka/bin/kafka-server-start.sh -daemon /home/apps/kafka/config/server.properties
# 만약 -daemon 옵션을 추가하지 않으면 브로커가 background가 아닌 foreground에서 구동된다.
  • 프로세스 및 리슨 포트 확인
jps -l

18320 kafka.Kafka
18403 sun.tools.jps.Jps
17899 org.apache.zookeeper.server.quorum.QuorumPeerMain

netstat -nltp

Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      973/sshd
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1286/master
tcp6       0      0 :::8080                 :::*                    LISTEN      17899/java
tcp6       0      0 :::40434                :::*                    LISTEN      17899/java
tcp6       0      0 :::22                   :::*                    LISTEN      973/sshd
tcp6       0      0 ::1:25                  :::*                    LISTEN      1286/master
tcp6       0      0 :::34880                :::*                    LISTEN      18320/java
tcp6       0      0 :::9092                 :::*                    LISTEN      18320/java
tcp6       0      0 :::2181                 :::*                    LISTEN      17899/java
  • 종료
$KAFKA_HOME/bin/kafka-server-stop.sh

 

 

 

Kafka #1. 개요

Zookeeper #1. 개요
Zookeeper #2. 설치와 설정
Zookeeper #3. 구동과 확인

Kafka #1. 개요
Kafka #2. 설치

아파치 카프카(Apache Kafka)는 아파치 소프트웨어 재단이 스칼라로 개발한 오픈 소스 메시지 브로커 프로젝트이다. 이 프로젝트는 실시간 데이터 피드를 관리하기 위해 통일된, 높은 처리량, 낮은 지연시간을 지닌 플랫폼을 제공하는 것이 목표이다. 요컨대 분산 트랜잭션 로그로 구성된, 상당히 확장 가능한 pub/sub 메시지 큐로 정의할 수 있으며, 스트리밍 데이터를 처리하기 위한 기업 인프라를 위한 고부가 가치 기능이다.

큐 (Queue)

먼저 큐에 대한 이해가 필요한데, 말 그대로 ‘기다리는 줄(열)’, ‘대기 열’을 생각하면 된다. 전공자라면 자료구조, 알고리즘 과목 등에서 접하게 되는데 Stack 과 함께 배운다.
FIFO1First In First Out 구멍이 두 개인 파이프의 한쪽에 동전을 넣는다고 생각하면 된다. 계속 동전을 밀어 넣다 보면 가장 먼저 넣었던 동전부터 동전이 밀려나오는 구조가 된다.
이와 반되 되는 것이 Stack. 쌓는다는 의미의 stack으로 FILO2First In Last Out 컵에 동전을 쌓는 것을 상상하면 된다. 컵에 동전을 쌓아 넣다다가 동전을 꺼내면 나중에 넣었던 동전을 먼저 꺼내야 한다.

 

Message Queue

이렇게 먼저 넣은 값이 먼저 출력되는 구조를 이용해 메시지3여기서 말하는 메시지는 특별한 개념이 아니다. 송/수신되는 데이터를 종류, 형태와 상관 없이 통틀어 메시지라고 칭하는 것이다. 이 메시지는 단순한 텍스트일 수도 있고, 파일일 수도 있고 다양하다.를 전송하는 것이 Message Queue이다. Rabbit MQ, IBM MQ, Apache active MQ , Rocket MQ 등 다양한 종류의 Message Queue가 존재한다.
기본적인 개념은 모두 같지만 메시지를 주고 받는 방식에 따라 크게 두 종류로 구분할 수 있다.
– Push : 서버가 메시지를 클라이언트에 보내주는 방식
– Pull : 클라이언트가 메시지를 서버에서 가져오는 방식

별것 아닌 것 같지만 Pull 과 Push는 큰 차이를 가진다.

종류PushPull
장점· 메시지 발생 즉시 클라이언트에 전달· 필요한 클라이언트만 메시지를 가져가므로 서버 부하 감소
· 네트워크 부하 감소
단점· 서버가 모든 클라이언트에 연결해야 하므로 서버 부하 증가· 메시지를 전달한 클라이언트 확인 불가
주 용도카카오톡 등 메시지, 알림 전달분석용 서버 로그 데이터 등
Push 방식과 Pull 방식 비교

 

Apache Kafka

Apache Kafka는 Pull 방식의 Message Queue이다.
중요한 개념으로 메시지를 발생 시키는 (또는 서버의 Queue에 메시지를 보내는) Producer(또는 Publisher)와 메시지를 가져가는 Consumer(또는 subscriber)가 있다.4실제 이 프로듀서와 컨슈머는 추상적인 개념으로만 존재하며 Kafka 자체를 칭하지는 않고 Kafka를 구성하는 구성요소는 아니다!!

일반적인 개념에서 Kafka는 Broker를 의미한다.

즉, 누군가 메시지를 보내는 곳, 누군가 메시지를 꺼내갈 곳이 Kafka 이다. 5물론 kafka connect, mirror-maker 등 카프카가 직접 메시지를 읽어오는 개념이 존재하긴 하다. 하지만, 어쨌거나 프로듀서, 컨슈머 등을 이야기하는 개념에서의 kafka는 broker 를 떠올리면 된다.

이 Kafka Broker가 하는 일과 핵심적인 개념은 다음과 같다.

  • Queue : 가장 핵심적인 기능이다. 수신된 메시지를 저장한다. kafka가 인지하는 형식으로 디스크에 저장된다.
  • Offset 관리 : 프로듀서가 보낸 메시지와 컨슈머가 가져간 메시지의 offset, 즉, 몇 개의 메시지가 수신되었고 어떤 클라이언트가 어디서부터 어디까지의 메시지를 가져갔는지 기억하고 관리한다.
  • Topic : Topic은 앞서 설명한 메시지를 담는 파이프 하나를 떠올리면 된다. 메시지를 넣을 파이프를 만들고 이름을 붙여서 관리한다. 이 Topic은 여러개의 Pratition으로 나뉠 수 있다.
  • Replica : 데이터의 복제, Broker나 디스크의 장애 등에 대비해 데이터를 복제하는 기능이다.

 

Public cloud에서

AWS : MKS(Managed Kafka Service)
GCP : Pub/Sub (Publisher와 Subscriber)
AZURE: HDInsight Kafka

 

 

 

 

 

minio object storage 구축

MinIO는 GNU Affero General Public License v3.0에 따라 출시 된 고성능 개체 스토리지이며 Amazon S3 클라우드 스토리지 서비스와 API와 완벽하게 호환된다.

자세한 정보는 minio 홈페이지에서 확인할 수 있다.

설치
yum 등을 이용해서 설치도 가능하지만 quick-guide 페이지를 참조해 바이너리를 다운 받아 구동해본다.

haedong@haedong:~:]$ wget --no-check-certificate https://dl.min.io/server/minio/release/linux-amd64/minio
--2021-08-18 16:10:33--  https://dl.min.io/server/minio/release/linux-amd64/minio
...중략...
Proxy request sent, awaiting response... 200 OK
Length: 91971584 (88M) [application/octet-stream]
Saving to: ‘minio’
100%[==========================================================================================>] 91,971,584  17.4MB/s   in 5.3s
2021-08-18 16:10:39 (16.6 MB/s) - ‘minio’ saved [91971584/91971584]

구동

구동이 먼저 나온 이유는, 배포되는 minio 파일이 바로 실행파일이기 때문이다. 기본 옵션 플래그만 붙여서 실행하면 싱글노드 구동은 즉시 가능하다. 데이터가 저장될 디렉토리를 생성하고 서비스를 실행해보자. 딱히 설정파일 등을 만들 필요 없이 명령 한줄로 실행된다.

haedong@haedong:~:]$ mkdir data
haedong@haedong:~:]$ chmod 700 minio
#755 등을 줘도 무방하다.
haedong@haedong:~:]$ ./minio server --address :9000 --console-address :9090 /home/haedong/data
API: http://192.168.192.168:9000  http://127.0.0.1:9000
RootUser: minioadmin
RootPass: minioadmin

Console: http://192.168.192.168:9090 http://127.0.0.1:9090
RootUser: minioadmin
RootPass: minioadmin

Command-line: https://docs.min.io/docs/minio-client-quickstart-guide
   $ mc alias set myminio http://172.17.172.110:9000 minioadmin minioadmin

Documentation: https://docs.min.io
WARNING: Detected default credentials 'minioadmin:minioadmin', we recommend that you change these values with 'MINIO_ROOT_USER' and 'MINIO_ROOT_PASSWORD' environment variables

갤러리 추가

표준 출력으로 출력된 RootUser 는 username, RootPass는 password로 하여 서비스 콘솔에 접속할 수 있다. 로그인 이후 접속을 위한 endpoint access key를 생성하거나 Bucket을 생성하고 웹 UI를 통해 데이터를 업로드할 수 있다.

멀티 노드 구성
싱글노드와 달리 몇가지 정보를 추가해 주는 것이 관리에 용이하다.
minio 디렉토리를 생성하고 관리를 위한 정보들을 추가한 구동 스크립트를 생성하자.
vi ~/minio/start.sh

#!/bin/bash

MINIO_HOME=$HOME/minio
MINIO_BIN_DIR=$MINIO_HOME/bin
MINIO_DATA_DIR=$MINIO_HOME/data
MINIO_LOG_DIR=$MINIO_HOME/logs
export MINIO_ROOT_USER=haedonggang
export MINIO_ROOT_PASSWORD=haedonggang
LOG_FILE=$MINIO_LOG_DIR/minio.log
# 꼭 위와 같은 변수를 사용할 필요는 없지만 변수들을 이용해 설정을 해 두면 관리가 용이하다.

# host01 부터 host04까지 4개의 노드를 활용하는 경우
nohup $MINIO_BIN_DIR/minio server --address :9000 --console-address :9090 http://host01$MINIO_DATA_DIR http://host02$MINIO_DATA_DIR http://host03$MINIO_DATA_DIR http://host04$MINIO_DATA_DIR >> $LOG_FILE &


MINIO_PID=$!
if [ ! -z $MINIO_PID ] ; then
echo "$MINIO_PID" > minio.pid
fi

이후 각 노드에서 start.sh 스크립트를 이용해 서비스를 구동하면 된다. 딱히 마스터의 개념은 없으므로 각각 노드에서 스크립트를 한 번씩 실행하자.

이후 싱글노드 구성에서와 같이 9090포트를 이용해 접속하면 된다. 다만 노드가 4개이므로 어느 노드에 접속해도 동일한 화면을 확인할 수 있다.

Hadoop # 2. 싱글 Namenode 설치,설정

Hadoop #1. 개요

사전 작업

하둡을 구동할 계정은 hadoop, 그룹도 hadoop 으로 한다. hadoop 계정의 홈 디렉토리는 /home/hadoop 이다. 키-쌍 생성 관련 내용은 별도의 포스트를 통해 설명하겠다

 # 사용자 생성
[root@hadoop01 ~]# useradd -g hadoop hadoop
[root@hadoop01 ~]# su - hadoop
[root@hadoop01 ~]# passwd hadoop
hadoop 사용자의 비밀 번호 변경 중
새  암호:
새  암호 재입력:
passwd: 모든 인증 토큰이 성공적으로 업데이트 되었습니다.
 #여기까지 모든 서버에서 작업한다.
 # 키 쌍 생성
[hadoop@hadoop01 ~]$ ssh-keygen
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hadoop/.ssh/id_rsa):
Created directory '/home/hadoop/.ssh'.
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /home/hadoop/.ssh/id_rsa.
Your public key has been saved in /home/hadoop/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:kQyuv0LCUMDOBwicLr4VyFSskm0wYIGHLIv1pTbALF0 hadoop@centos7
The key''s randomart image is:
+---[RSA 2048]----+
|%O=oE .          |
|XBO. ..o .       |
|O@+o o. +        |
|BBoo=.   .       |
|++..o.  S        |
| .o...           |
|  oo  .          |
| .  .  .         |
|     ..          |
+----[SHA256]-----+
[hadoop@hadoop01 ~]$ chmod 700 ~/.ssh
[hadoop@hadoop01 ~]$ cat ~/.ssh/id_rsa.pub >> authorized_keys
[hadoop@hadoop01 ~]$ chmod 600 ~/.ssh/*
[hadoop@hadoop01 ~]$ ls -l ~/.ssh/
합계 12
-rw------- 1 hadoop hadoop  396  9월 14 15:32 authorized_keys
-rw------- 1 hadoop hadoop 1675  9월 14 15:24 id_rsa
-rw------- 1 hadoop hadoop  396  9월 14 15:24 id_rsa.pub
 # 하둡 클러스터로 사용할 모든 서버에 키를 복사하고 권한을 변경한다.
[hadoop@hadoop01 ~]$ scp ~/.ssh/authorized_keys hadoop@hadoop02:/home/hadoop/.ssh/authorized_keys
The authenticity of host 'hadoop02 (192.168.0.2)' can't be established.
ECDSA key fingerprint is SHA256:nhIT6XvSamWF1mgXDkAuM64eZj5XCJww5T2NEojH2iU.
ECDSA key fingerprint is MD5:7b:a5:40:02:c3:cd:0f:e7:36:77:dd:3c:cc:3b:ba:d2.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
hadoop@hadoop02's password:
authorized_keys     
[hadoop@hadoop01 ~]$  ssh hadoop02 chmod 700 /home/hadoop/.ssh
[hadoop@hadoop01 ~]$  ssh hadoop02 chmod 600 /home/hadoop/.ssh/authorized_keys
hadoop@hadoop02's password:

설치

Apache Hadoop 홈페이지에서 바이너리를 다운로드 하고 압축을 해제한다.
※ hadoop 계정으로 진행

[hadoop@hadoop01 ~]$ wget http://mirror.apache-kr.org/hadoop/common/hadoop-3.1.4/hadoop-3.1.4.tar.gz
--2020-09-14 15:28:30--  http://mirror.apache-kr.org/hadoop/common/hadoop-3.1.4/hadoop-3.1.4.tar.gz
Resolving mirror.apache-kr.org (mirror.apache-kr.org)... 125.209.216.167
Connecting to mirror.apache-kr.org (mirror.apache-kr.org)|125.209.216.167|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 348326890 (332M) [application/octet-stream]
Saving to: ‘hadoop-3.1.4.tar.gz’
100%[=====================================================================================================================================>] 348,326,890 37.9MB/s   in 11s
2020-09-14 15:28:41 (30.1 MB/s) - ‘hadoop-3.1.4.tar.gz’ saved [348326890/348326890]
[hadoop@hadoop01 ~]$ tar -xvzf  hadoop-3.1.4.tar.gz
hadoop-3.1.4/
hadoop-3.1.4/lib/
중략
hadoop-3.1.4/libexec/mapred-config.sh
hadoop-3.1.4/libexec/hadoop-config.cmd
hadoop-3.1.4/libexec/hdfs-config.cmd
[hadoop@hadoop01 ~]$ ln -s  ~/hadoop-3.1.4 ~/hadoop
[hadoop@hadoop01 ~]$ ls -l
합계 340164
lrwxrwxrwx 1 hadoop hadoop        12  9월 14 15:31 hadoop -> hadoop-3.1.4
drwxr-xr-x 9 hadoop hadoop       169  9월 14 15:31 hadoop-3.1.4
-rw-rw-r-- 1 hadoop hadoop 348326890  8월 24 21:40 hadoop-3.1.4.tar.gz
drwxrwxr-x 2 hadoop hadoop         6  9월 14 15:22 perl5
 #나머지 노드에서 모두 동일한 작업을 수행한다. (다운로드, 압축 풀기, 심볼릭 링크 생성)
[hadoop@hadoop01 ~]$  vi  ~/.bas_profile
 # 여기부터 파일의 맨 뒤에 붙여넣기 한다.
export HADOOP_HOME=/home/hadoop/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
[hadoop@hadoop01 ~]$  source ~/.bas_profile
 나머지 노드들에서 동일하게 수행 해 준다.

설정

 # Hadoop 환경변수 설정 (하둡이 구동될 때 읽어들이는 환경변수)
[hadoop@hadoop01 ~]$ vi $HADOOP_HOME/etc/hadoop/hadoop-env.sh
 # 기존 내용을 모두 지우고 여기서부터 붙여넣기 한다.
 export HADOOP_HOME=/home/hadoop/hadoop
 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

 # 시스템 사양에 맞추어 적절히 증감한다.
 export HADOOP_HEAPSIZE_MAX= 8g
 export HADOOP_HEAPSIZE_MIN=8g

export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}
 # core-site.xml
[hadoop@hadoop01 ~]$ vi $HADOOP_HOME/etc/hadoop/core-site.xml
<!-- 모두 지우고 여기서부터 붙여넣기 한다. -->
<configuration>
  <property>
      <name>fs.defaultFS</name>
      <value>hdfs://hadoop01:8020</value>
<!-- 기본 HDFS 주소. 예를 들어 "hdfs dfs -ls / " 를 수행하면  / 앞에 value의 주소가 들어간다고 이해하면 된다 -->
  </property>
  <property>
      <name>hadoop.proxyuser.hive.groups</name>
      <value>*</value>
  </property>
  <property>
      <name>hadoop.proxyuser.hive.hosts</name>
      <value>*</value>
  </property>
</configuration>
 # hdfs-site.xml
[hadoop@hadoop01 ~]$ vi $HADOOP_HOME/etc/hadoop/hdfs-site.xml
<!-- 모두 지우고 여기서부터 붙여넣기 한다. -->
<configuration>
   <property>
        <name>dfs.replication</name>
        <value>3</value>
<!-- 
데이터 복제 횟수 
-->
   </property>
   <property>
        <name>dfs.namenode.name.dir</name>
        <value>/home/hadoop/namenode</value>
<!--
로컬(리눅스 시스템의) 디렉토리 경로이며, 반드시 hadoop을 구동하는 계정이 디렉토리에 대한 모든 권한을 가지고 있어야 한다.
-->
   </property>
   <property>
        <name>dfs.namenode.checkpoint.dir</name>
        <value>/home/hadoop/namesecondary</value>
<!-- 
namenode 디렉토리와 동일하게 로컬 디렉토리이고 모든 권한을 부여해야 한다. 
-->
   </property>
   <property>
        <name>dfs.datanode.data.dir</name>
        <value>/home/hadoop/datanode</value>
<!-- 
실제 hdfs 에 쌓인 데이터가 저장되는 경로. 여유공간이 넉넉한 파티션의 디렉토리를 지정해야 한다.
 datanode로 동작하는 클라이언트, 즉 workers 파일에 등록되어있는 호스트에 모두 동일하게 적용해야 한다.
 당연히 hadoop을 구동하는 계정이 모든 권한을 가진 디렉토리여야 한다.
-->
   </property>
   <property>
         <name>dfs.http.address</name>
         <value>hadoop01:50070</value>
<!--namenode http 주소 및 포트. hadoop web-UI -->
   </property>
   <property>
         <name>dfs.secondary.http.address</name>
         <value>hadoop01:50090</value>
<!-- 
Secondary namenode http 주소 및 포트
주의 할 점은 secondary-namenode는 백업, 복구를 담당하는 것이 아니라 보조의 역할이라는 점이다. 
-->
   </property>

<property>
         <name>dfs.block.size</name>
         <value>16777216</value>
<!-- 
HDFS의 block size, Byte 단위.  다음 값들을 참고하자.
8MB=8,388,608 / 16MB=16,777,216 / 32MB=33,554,432 / 64MB=67,108,864 / 128MB=134,217,728
-->
    </property>
    <property>
      <name>dfs.permissions.enabled</name>
      <value>false</value>
<!-- 
값이 false일 경우 파일 시스템의 퍼미션 관련 기능이 비활성화된다. 
-->
    </property>
    <property>
      <name>fs.trash.interval</name>
      <value>3</value>
<!-- 
휴지통과 같은 기능이다. 파일을 삭제하면 임시 공간으로 이동되고 지정한 시간 이후 삭제된다.
value의 값(=분)에 따라 휴지통이 비워진다.  
-->
    </property>
</configuration>
 # yarn-site.xml
[hadoop@hadoop01 ~]$ vi $HADOOP_HOME/etc/hadoop/yarn-site.xml
<!-- 모두 지우고 여기서부터 붙여넣기 한다. -->
<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop02</value>
    </property>
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>hadoop02:8025</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>hadoop02:8030</value>
    </property>
    <property>
        <name>yarn.resourcemanager.address</name>
        <value>hadoop02:8050</value>
    </property>
    <property>
        <name>yarn.resourcemanager.webapp.address</name>
        <value>hadoop02:8055</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
<!--
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler
- FIFO : 먼저 들어온 작업을 먼저 처리한다. 후순위 작업은 이전 작업이 끝날 때까지 대기한다.
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
- FAIR : 작업에 균등하게 자원을 할당한다.
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
- 트리 형태로 큐를 선언하고 각 큐 별로 이용할 수 있는 자원의 용량을 정하여 할당한다. 
-->
        </property>
</configuration>
 # mapreduce-site.xml
[hadoop@hadoop01 ~]$ vi $HADOOP_HOME/etc/hadoop/mapreduce-site.xml
<!-- 모두 지우고 여기서부터 붙여넣기 한다. -->
<configuration>
     <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
     </property>
     <property>
        <name>mapred.local.dir</name>
        <value>/home/hadoop/mapred</value>
     </property>
     <property>
        <name>mapred.system.dir</name>
        <value>/home/hadoop/mapred</value>
     </property>
     <property>
        <name>mapred.map.memory.mb</name>
        <value>2048</value>
<!-- map 작업을 위한 최대 리소스 제한 -->
     </property>
     <property>
        <name>mapred.map.java.opts</name>
        <value>-Xmx1024M</value>
<!-- map 작업을 위한 최대 child jvms  -->
     </property>
     <property>
        <name>mapred.reduce.memory.mb</name>
        <value>3072</value>
<!-- reduce 작업을 위한 최대 리소스 제한  -->
     </property>
     <property>
        <name>mapred.reduce.java.opts</name>
        <value>-Xmx2560M</value>
     <!-- reduce 작업을 위한 최대 child jvms  -->
     </property>
     <property>
        <name>mapreduce.jobhistory.address</name>
        <value>hadoop02:10020</value>
<!-- Job history server 주소 (기본 포트는 10020)  -->
     </property>
     <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>hadoop02:19888</value>
<!-- Job history server Web UI주소 (기본 포트는 19888)  -->
     </property>
     <property>
        <name>mapreduce.jobhistory.intermediate-done-dir</name>
        <value>/home/hadoop/mr-history/tmp</value>
     </property>
     <property>
        <name>mapreduce.jobhistory.done-dir</name>
        <value>/home/hadoop/mmr-histroy/done</value>
     </property>
</configuration>
[hadoop@hadoop01 ~]$ vi $HADOOP_CONF_DIR/workers
 # datanode 구동을 위한 노드를 기록한다.
hadoop01
hadoop02
192.168.0.3
hadoop04.net
 # 수정한 설정 파일을 모든 노드에 복사한다.
[hadoop@hadoop01 ~]$ scp $HADOOP_HOME/etc/hadoop/* hadoop02:/home/hadoop/etc/hadoop/


구동 및 확인. 그리고 종료

 # 구동 스크립트를 이용한 방법

 # 하둡 구동
[hadoop@hadoop01 ~]$  $HADOOP_HOME/sbin/start-all.sh
WARNING: Attempting to start all Apache Hadoop daemons as hadoop in 10 seconds.
WARNING: This is not a recommended production deployment configuration.
WARNING: Use CTRL-C to abort.
Starting namenodes on [centos7]
Starting datanodes
Starting secondary namenodes [centos7]
Starting resourcemanager
Starting nodemanagers

 # 프로세스 확인
[hadoop@hadoop01 ~]$  jps -l
92880 org.apache.hadoop.yarn.server.nodemanager.NodeManager
92339 org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode
93780 sun.tools.jps.Jps
92685 org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
92111 org.apache.hadoop.hdfs.server.datanode.DataNode

 # 종료
[hadoop@hadoop01 ~]$  sbin/stop-all.sh
WARNING: Stopping all Apache Hadoop daemons as hadoop in 10 seconds.
WARNING: Use CTRL-C to abort.
Stopping namenodes on [centos7]
Stopping datanodes
Stopping secondary namenodes [centos7]
Stopping nodemanagers
Stopping resourcemanager

Hadoop # 1. 개요

개요

아파치 하둡(Apache Hadoop, High-Availability Distributed Object-Oriented Platform)은 대량의 자료를 처리할 수 있는 큰 컴퓨터 클러스터에서 동작하는 분산 응용 프로그램을 지원하는 프리웨어 자바 소프트웨어 프레임워크이다. 원래 너치의 분산 처리를 지원하기 위해 개발된 것으로, 아파치 루씬의 하부 프로젝트이다. 분산처리 시스템인 구글 파일 시스템을 대체할 수 있는 하둡 분산 파일 시스템(HDFS: Hadoop Distributed File System)과 맵리듀스를 구현한 것이다.

베이스 아파치 하둡 프레임워크는 다음의 모듈을 포함하고 있다:

하둡 커먼(Hadoop Common)
하둡 분산 파일 시스템(HDFS)
하둡 YARN
하둡 맵리듀스

쉽게 설명하면 자바로 만든 네트워크 클러스터링이 가능한 가상 파일 시스템 이다.

이런… 느낌이려나…

컨셉 자체는 매우 단순하다.
1. 네트워크로 연결 되어있다.
2. 데이터를 쪼개서 각 노드에 분산 저장한다.
3. 노드의 장애에 대비해 중복 저장한다.
4. 어디까지나 ‘파일을 저장하기 위한 파일 시스템이다.’

Zookeeper #3. 구동과 확인

Zookeeper #1. 개요
Zookeeper #2. 설치와 설정
Zookeeper #3. 구동과 확인

Kafka #1. 개요
Kafka #2. 설치

※ zookeeper 앙상블에 들어갈 모든 노드에서 구동 되어야 한다.

 # zookeeper 시작
[kafka@kafka ~]$ $ZOOKEEPER_HOME/bin/zkServer.sh start
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

 # zookeeper 종료
[kafka@kafka ~]$ $ZOOKEEPER_HOME/bin/zkServer.sh stop
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

 # zookeeper 재시작
[kafka@kafka ~]$  $ZOOKEEPER_HOME/bin/zkServer.sh restart
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
Using config: /home/kafka/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

 # zookeeper 프로세스 확인
[kafka@kafka ~]$  jps -l
53088 sun.tools.jps.Jps
38497 kafka.Kafka
52964 org.apache.zookeeper.server.quorum.QuorumPeerMain
87046 org.apache.kafka.connect.cli.ConnectStandalone
 # zookeeper 프로세스 확인
[kafka@kafka ~]$  jps -l
53088 sun.tools.jps.Jps
38497 kafka.Kafka
52964 org.apache.zookeeper.server.quorum.QuorumPeerMain
87046 org.apache.kafka.connect.cli.ConnectStandalone
 # 네트워크 확인
[kafka@kafka ~]$  netstat -nltp
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:3389            0.0.0.0:*               LISTEN      -
tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      -
tcp        0      0 127.0.0.1:3350          0.0.0.0:*               LISTEN      -
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      -
tcp        0      0 127.0.0.1:5432          0.0.0.0:*               LISTEN      -
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      -
tcp6       0      0 :::35068                :::*                    LISTEN      52964/java
tcp6       0      0 192.168.103.113:9092    :::*                    LISTEN      38497/java
tcp6       0      0 :::2181                 :::*                    LISTEN      52964/java
tcp6       0      0 :::3888               :::*                    LISTEN      52964/java
tcp6       0      0 :::2888                 :::*                    LISTEN      52964/java
tcp6       0      0 :::111                  :::*                    LISTEN      -
tcp6       0      0 :::8080                 :::*                    LISTEN      52964/java
tcp6       0      0 :::8083                 :::*                    LISTEN      87046/java
tcp6       0      0 :::37045                :::*                    LISTEN      87046/java
tcp6       0      0 :::42901                :::*                    LISTEN      38497/java
tcp6       0      0 :::22                   :::*                    LISTEN      -

로그 확인
일단 [ERROR], [CRITICAL] 같은 단어가 보이면 뭐가 잘못됐건 잘못 된 것이다.
[WARN]의 경우는 보통 당장 구동에는 문제가 없지만 특정 작업, 조건 등에서 문제가 발생할 수
있다. 주의 깊게 봐야 한다.

[kafka@kafka ~]$ tail -100f $ZOOKEEPER_HOME/logs/zookeeper-kafka-server.out
2020-09-03 10:11:35,822 [myid:2] - WARN  [QuorumConnectionThread-[myid=2]-2:QuorumCnxManager@400] - Cannot open channel to 3 at election address /192.168.113.138:3888
java.net.ConnectException: 연결이 거부됨 (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:607)
        at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:383)
        at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(Qu ....
2020-09-03 10:11:35,828 [myid:2] - INFO  [WorkerReceiver[myid=2]:FastLeaderElection$ ... 후략

예제의 경우 다른 zookeeper 노드가 아직 구동되지 않아 발생한 [WARN] 메시지가 존재한다.
나머지 노드가 구동되고 연결되면 [WARN] 메시지는 사라진다.

로그의 모든 내용을 설명하기는 어렵고 자주 바라보고 눈에 익숙해 질 수 있도록 해야한다.

Zookeeper #2. 설치와 설정

Zookeeper #1. 개요
Zookeeper #2. 설치와 설정
Zookeeper #3. 구동과 확인

Kafka #1. 개요
Kafka #2. 설치

대충.. 이렇게 구성 해보자..
2888은 leader만 오픈 한다는 것을 기억하자.

1. 다운로드 및 압축 해제
Apache zookeeper 홈페이지에서 필요한 버전을 선택하거나 여기를 클릭하여1글 작성일 2020년 09월01일 기준으로 최신버전은 3.6.1이다.다운로드 한다.
– 다운로드한 파일을 FTP 또는 SFTP등을 이용하여 서버에 업로드한다.
– 또는 wget 명령을 이용하여 서버에서 다운로드 한다.

[kafka@kafka ~]$ wget http://apache.tt.co.kr/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
--2020-09-01 08:27:41--  http://apache.tt.co.kr/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
Resolving apache.tt.co.kr (apache.tt.co.kr)... 211.47.69.77
Connecting to apache.tt.co.kr (apache.tt.co.kr)|211.47.69.77|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9394700 (9.0M) [application/x-gzip]
Saving to: ‘apache-zookeeper-3.5.8-bin.tar.gz’

100%[========================================================================>] 9,394,700   72.0KB/s   in 2m 6s

2020-09-01 08:29:47 (72.8 KB/s) - ‘apache-zookeeper-3.5.8-bin.tar.gz’ saved [9394700/9394700]

[kafka@kafka ~]$ ll
합계 91272
-rw-r--r--  1 kafka kafka 12436328  5월  1 04:53 apache-zookeeper-3.6.1-bin.tar.gz
drwxrwxr-x 76 kafka kafka     4096  9월  1 08:29 data
drwxr-xr-x  7 kafka kafka      245  8월 13 07:54 jdk1.8.0_251
lrwxrwxrwx  1 kafka kafka       16  8월 12 09:00 kafka -> kafka_2.13-2.6.0
drwxr-xr-x  9 kafka kafka      182  8월 13 09:48 kafka_2.13-2.6.0
-rw-r--r--  1 kafka kafka 65537909  8월  5 07:01 kafka_2.13-2.6.0.tgz
drwxrwxr-x  2 kafka kafka        6  8월 12 09:07 perl5

– 압축을 해제하고 심볼릭 링크를 생성한다.2Symbolic-Link:윈도우즈의 바로가기 lnk파일과 유사하다. 3심볼릭 링크를 사용하면 환경 변수의 관리가 편리하다. 예제의 경우 압축을 해제하면 ‘apache-zookeeper-3.6.1-bin’ 라는 이름의 디렉토리가 생성되는데 이 이름으로 환경 변수를 등록하여 운영 중 zookeeper 판올림을 할경우 환경변수를 변경 해야 한다. 또 ‘zookeeper’라는 이름으로 변경을 했을 경우 동일한 디렉토리에 파일을 교체해야 하므로 별도로 파일을 이동해줘야 하지만 심볼링 링크로 연결 했을 경우는 링크만 변경 하는 것으로 바이너리를 교체하는 효과를 볼 수 있다.

[kafka@kafka ~]$ tar -xvzf apache-zookeeper-3.6.1-bin.tar.gz
apache-zookeeper-3.6.1-bin/docs/
apache-zookeeper-3.6.1-bin/docs/skin/
    :
중략
   :
apache-zookeeper-3.6.1-bin/lib/metrics-core-3.2.5.jar
apache-zookeeper-3.6.1-bin/lib/snappy-java-1.1.7.jar
[kafka@kafka ~]$ ln -s apache-zookeeper-3.6.1-bin zookeeper
[kafka@kafka ~]$ ll
합계 12148
drwxrwxr-x 6 kafka kafka      133  9월  1 08:35 apache-zookeeper-3.6.1-bin
-rw-r--r-- 1 kafka kafka 12436328  9월  1 08:34 apache-zookeeper-3.6.1-bin.tar.gz
lrwxrwxrwx 1 kafka kafka       26  9월  1 09:12 zookeeper -> apache-zookeeper-3.6.1-bin
 

2. 환경변수 설정
※ /home/kafka 디렉토리에 압축을 해제했고 kafka 계정을 사용하는 것으로 가정한다.
– ~/.bash_profile 에 환경 변수를 등록한다. 4~‘ 는 사용자의 home 디렉토리를 의미한다. 홈 디렉토리는 /etc/passwd 에 기록 되어있다.

[kafka@kafka zookeeper]$ vi ~/.bash_profile

export JAVJA_HOME=/home/kafka/jdk1.8.0_251
export PATH=$PATH:$JAVA_HOME/bin
 # JAVA_HOME 디렉토리 관련 설정. 

#여기부터 추가한다.
export ZOOKEEPER_HOME=/home/kafka/zookeeper
export ZOOBINDIR=$ZOOKEEPER_HOME/bin
 # zookeeper home 과 bin 설정
 # $ZOOKEEPER_HOME/bin/zkEnv.sh 에 $ZOOBINDIR이 명시되어있긴 하지만
 #편의를 위해 등록한다. zkEnv.sh 를 수정해도 된다.

export PATH=$PATH:$ZOOBINDIR
 #zookeeper bin 디렉토리를 PATH에 등록

 #변수를 별도록 export 하지 않으면 해당 변수는 '현재의 스크립트에서만 유효하다'

[kafka@zookeeper-kafka zookeeper]$ source ~/.bash_profile


3. zookeeper 설정
– 설정 파일 수정, node id 파일 생성, 클러스터간 설정 파일 동기화 세가지 작업이 필요하다.

가. $ZOOKEEPER_HOME/conf/zoo.cfg 파일을 생성하고 파일을 수정한다.

[kafka@kafka ~]$ vi $ZOOKEEPER_HOME/conf/zoo.cfg

tickTime=2000
 # 단위는 밀리초. 신호를 보내고 응답이 오기까지 설정된 시간만큼 대기한다.

initLimit=10
 # election 과정 이후 리더로 선출 된 zookeeper와 follower zookeeper들 간의 
 # 동기화 등에 사용되는   tick 시도 횟수. 즉 TickTime=2000에 initLimit=10일경우 
 # 총 20초동안 대기한다.
 # 관리하는 데이터의 양이 많을 경우 값을 늘려준다.

syncLimit=5
 # follower와 zookeeper 간의 동기화를 위한 틱 시도 횟수. 

dataDir=/home/kafka/zookeeper/datadir
 # zookeeper 데이터 디렉토리. tmp는 권장하지않는다.

clientPort=2181
 # zookeeper로 관리되는 클라이언트가 zookeeper에 연결 할 때 사용하는 포트

maxClientCnxns=7
# 최대 접속 가능한 클라이언트 갯수.
# 관리 대상 클라이언트가 많으면 값을 늘려준다.

autopurge.snapRetainCount=3
 # dataDir 디렉토리에 유지할 snapshot 갯수

autopurge.purgeInterval=1
 # 정리(삭제) 시도 주기, 간격. 
 # 단위는 시간.

server.1=192.168.100.111:2888:3888;2181
server.2=zookeeper2:2888:3888;2181
server.3=zookeeper3.fqdn.name:2888:3888;2181
 # 노드 정보. IP, host명(hosts 파일에 있는), FQDN 을 사용할 수 있다.
 # 2888은 leader노드가 follower 노드를 위해 열어두는 포트(동기화용).
 # 3888은 리더 선출을 위한 election 용 포트.
 # server. 뒤에 오는 숫자는 노드의 식별자가 된다. 

admin.enableServer=true
admin.serverPort=8000
admin.commandURL=/commands
 # 관리 서버 설정.
 # 사용하도록 설정하고 웹브라우저로 접속하면 커맨드 정보를 볼 수 있다.
[kafka@kafka ~]$  mkdir /home/kafka/zookeeper/datadir
[kafka@kafka ~]$  echo 1> /home/kafka/zookeeper/datadir/myid
 # myid에 적는 숫자는 zoo.cfg 에 기록한 서버 번호와 일치 해야 한다.