๐ Apache Flink: ์ค์๊ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ์ ๋ํ์! ๐

์๋ ํ์ธ์, ์ฌ๋ฌ๋ถ! ์ค๋์ ์ ๋ง ํซํ ์ฃผ์ ๋ก ์ฐพ์์์ด์. ๋ฐ๋ก Apache Flink! ์ด ๋ ์, ์ค์๊ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ ์์คํ ๊ตฌ์ถ์ ์ ๋๊ฐ์๋ผ๊ณ ํด๋ ๊ณผ์ธ์ด ์๋์ฃ . ใ ใ ใ ์ฌ๋ฌ๋ถ, ์ค๋น๋์ จ๋์? ์ง๊ธ๋ถํฐ Flink์ ์ธ๊ณ๋ก ํ๋ฉ~ ๋น ์ ธ๋ณผ๊ฒ์! ๐โโ๏ธ
๐ก ์ ๊น! ์๊ณ ๊ฐ์ธ์!
Apache Flink๋ ๋น ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ํ๋ช ์ ์ผ์ผํค๊ณ ์๋ ์คํ์์ค ํ๋ก์ ํธ์์. ์ค์๊ฐ์ผ๋ก ์์ฒญ๋ ์์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์์ด์, ์์ฆ ๊ฐ์ ๋ฐ์ดํฐ ํ์ ์๋์ ๊ผญ ํ์ํ ๋ ์์ด์ฃ !
๐ Flink, ๋ ๋๋์ฒด ๋ญ๋?
Flink๋ ๋ ์ผ์ด๋ก '์ฌ๋น ๋ฅด๋ค'๋ ๋ป์ด์์. ์ด๋ฆ๋ถํฐ ๋ฒ์จ ์คํผ๋๊ฐ ๋๊ปด์ง์ง ์๋์? ใ ใ Apache Flink๋ ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ง์ด์์. ์ฝ๊ฒ ๋งํด์, ์์ฒญ๋๊ฒ ๋ง์ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ๋น ๋ฅด๊ฒ ์ฒ๋ฆฌํ ์ ์๋ ์ํผ ํ์๋ฅผ ๊ฐ์ง ๋ ์์ด์ฃ !
์ฌ๋ฌ๋ถ, ํน์ ๋ทํ๋ฆญ์ค์์ ์ํ ๋ณด๋ค๊ฐ "์ด ์ํ ์ฌ๋ฐ๋ค~ ๋น์ทํ ๊ฑฐ ์๋?" ํ๊ณ ์ถ์ฒ ๋ฐ์๋ณธ ์ ์๋์? ๊ทธ๊ฒ ๋ฐ๋ก ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ํ ์์์! Flink ๊ฐ์ ์์คํ ์ด ์ฌ๋ฌ๋ถ์ ์ทจํฅ์ ์ค์๊ฐ์ผ๋ก ๋ถ์ํด์ ์ถ์ฒํด์ฃผ๋ ๊ฑฐ์ฃ . ๋๋ฐ ใทใทใท
์์ ๊ทธ๋ฆผ์ ๋ณด์ธ์. Flink๋ ๋ง์น ๊ฑฐ๋ํ ๋ น์ ์์ฒ๋ผ ๋ฐ์ดํฐ๋ฅผ ๋นจ์๋ค์ด๊ณ ์ฒ๋ฆฌํด์ ์ ์ฉํ ์ ๋ณด๋ก ๋ง๋ค์ด๋ด๋ ๊ฑฐ์์. ๋ฉ์ง์ง ์๋์? ๐
๐ค ๊ทผ๋ฐ ์ Flink๊ฐ ๊ทธ๋ ๊ฒ ํน๋ณํ๋ฐ?
์, ์ด์ Flink๊ฐ ์ ๊ทธ๋ ๊ฒ ํน๋ณํ์ง ์์ธํ ์์๋ณผ๊น์? ์ฌ๋ฌ๋ถ, ๋ฒจํธ ๋งค์ธ์. ์ง๊ธ๋ถํฐ Flink์ ๋งค๋ ฅ ํฌ์ธํธ๋ฅผ ํ๋ํ๋ ํํค์ณ๋ณผ ๊ฑฐ์์!
- ์ด๊ณ ์ ์ฒ๋ฆฌ ๋ฅ๋ ฅ: Flink๋ ๋ง ๊ทธ๋๋ก ๊ด์์ด์์! ๐ ์ด๋น ์๋ฐฑ๋ง ๊ฐ์ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค๋, ์์์ด ๊ฐ๋์?
- ์ ํ์ฑ ๋ณด์ฅ: "Exactly-once" ์ฒ๋ฆฌ๋ฅผ ์ง์ํด์ ๋ฐ์ดํฐ ์์ค์ด๋ ์ค๋ณต ์์ด ์ ํํ๊ฒ ์ฒ๋ฆฌํด์. ์๋ฒฝ์ฃผ์์๋ค์ ์ต์ ๊ธฐ๋ฅ์ด์ฃ ! โจ
- ์ํ ๊ด๋ฆฌ: ๋ณต์กํ ์ํ๋ฅผ ๊ด๋ฆฌํ ์ ์์ด์. ์ด๊ฒ ๋ฌด์จ ๋ง์ด๋๊ณ ์? ์ฝ๊ฒ ๋งํด, ๊ณผ๊ฑฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ์ตํ๊ณ ํ์ฉํ ์ ์๋ค๋ ๊ฑฐ์์!
- ๋ค์ํ ์๊ฐ ๊ฐ๋ ์ง์: ์ด๋ฒคํธ ์๊ฐ, ์ฒ๋ฆฌ ์๊ฐ ๋ฑ ๋ค์ํ ์๊ฐ ๊ฐ๋ ์ ์ง์ํด์. ์๊ฐ ์ฌํ๋ ๊ฐ๋ฅํ๋ค๊ตฌ์? ใ ใ ใ
- ํ๋ถํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ: ๋ค์ํ ๋ฐ์ดํฐ ์์ค์ ์ฐ๊ฒฐํ ์ ์๋ ์ปค๋ฅํฐ, ๋ณต์กํ ์ฒ๋ฆฌ๋ฅผ ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ฑ์ด ์์ด์. ๋ง์น ๋ ๊ณ ๋ธ๋ก์ฒ๋ผ ์กฐ๋ฆฝํด์ ์ฌ์ฉํ ์ ์์ฃ !
์์ฐ! ์ด ์ ๋๋ฉด Flink๊ฐ ์ ์ธ๊ธฐ ์๋์ง ์ ๊ฒ ๊ฐ์ง ์๋์? ๐
๐ ๏ธ Flink๋ก ๋ญ ํ ์ ์์๊น?
์, ์ด์ Flink๋ก ๋ฌด์์ ํ ์ ์๋์ง ๊ตฌ์ฒด์ ์ธ ์๋ฅผ ๋ค์ด๋ณผ๊ฒ์. ์ฌ๋ฌ๋ถ์ ์์๋ ฅ์ ์๊ทนํด๋ณผ ์๊ฐ์ด์์!
๐ ์ค์๊ฐ ์ถ์ฒ ์์คํ
์จ๋ผ์ธ ์ผํ๋ชฐ์์ ๊ณ ๊ฐ์ ํ๋์ ์ค์๊ฐ์ผ๋ก ๋ถ์ํด์ ๋ง์ถคํ ์ํ์ ์ถ์ฒํ ์ ์์ด์. "์ด ์ํ์ ๋ณธ ์ฌ๋๋ค์ด ์ด๋ฐ ๊ฒ๋ ๋ดค์ด์~" ์ด๋ฐ ๊ฑฐ ๋ง์ด ๋ณด์ จ์ฃ ? ๊ทธ๊ฒ ๋ฐ๋ก Flink์ ํ์ด์์!
๐จ ์ค์๊ฐ ์ฌ๊ธฐ ํ์ง
๊ธ์ต ๊ฑฐ๋์์ ์ด์ํ ํจํด์ ์ค์๊ฐ์ผ๋ก ๊ฐ์งํ ์ ์์ด์. ๋๊ตฐ๊ฐ ์ฌ๋ฌ๋ถ์ ์นด๋๋ก ์ด์ํ ๊ณณ์์ ๊ฒฐ์ ํ๋ ค๊ณ ํ ๋, ๋ฐ๋ก ์๋ฆผ์ด ์ค๋ ๊ทธ๋ฐ ๊ฑฐ์!
๐ ์ค์๊ฐ ๋์๋ณด๋
๋น์ฆ๋์ค ์ฑ๊ณผ๋ฅผ ์ค์๊ฐ์ผ๋ก ๋ชจ๋ํฐ๋งํ ์ ์๋ ๋์๋ณด๋๋ฅผ ๋ง๋ค ์ ์์ด์. ๋ง์น ์ฃผ์ ์ฐจํธ์ฒ๋ผ ์ค์๊ฐ์ผ๋ก ๋ณํ๋ ๊ทธ๋ํ๋ฅผ ์์ํด๋ณด์ธ์!
์ด๋ฐ ์์ผ๋ก Flink๋ ์ ๋ง ๋ค์ํ ๋ถ์ผ์์ ํ์ฉ๋ ์ ์์ด์. ์ฌ๋ฌ๋ถ์ ์์ด๋์ด์ ๋ฐ๋ผ ๋ฌด๊ถ๋ฌด์งํ ๊ฐ๋ฅ์ฑ์ด ์์ฃ !
๐๏ธ Flink๋ก ์ค์๊ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ ์์คํ ๊ตฌ์ถํ๊ธฐ
์, ์ด์ ๋ณธ๊ฒฉ์ ์ผ๋ก Flink๋ก ์ค์๊ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ ์์คํ ์ ๊ตฌ์ถํด๋ณผ๊น์? ์ฌ๋ฌ๋ถ, ๊ธด์ฅํ์ง ๋ง์ธ์. ์ฒ์ฒํ, ํ๋์ฉ ๋ฐ๋ผ์ ๋ณด์ธ์!
1. ํ๊ฒฝ ์ค์
๋จผ์ Flink๋ฅผ ์ค์นํด์ผ ํด์. ๋คํํ Flink๋ ์ค์น๊ฐ ์์ฃผ ๊ฐ๋จํด์!
# Flink ๋ค์ด๋ก๋
wget https://downloads.apache.org/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.11.tgz
# ์์ถ ํด์
tar xzf flink-1.14.0-bin-scala_2.11.tgz
# Flink ๋๋ ํ ๋ฆฌ๋ก ์ด๋
cd flink-1.14.0
์ง์~ ์ด๋ ๊ฒ ํ๋ฉด Flink ์ค์น ๋! ๋๋ฌด ์ฝ์ฃ ? ใ ใ ใ
2. Flink ์คํํ๊ธฐ
์ด์ Flink๋ฅผ ์คํํด๋ณผ๊น์?
# Flink ํด๋ฌ์คํฐ ์์
./bin/start-cluster.sh
์ด๋ ๊ฒ ํ๋ฉด Flink ํด๋ฌ์คํฐ๊ฐ ์์๋ผ์. ์น ๋ธ๋ผ์ฐ์ ์์ http://localhost:8081๋ก ์ ์ํ๋ฉด Flink์ ๋์๋ณด๋๋ฅผ ๋ณผ ์ ์์ด์. ๋ฉ์ง์ฃ ? ๐
3. ๊ฐ๋จํ Flink ํ๋ก๊ทธ๋จ ์์ฑํ๊ธฐ
์, ์ด์ ์ง์ง Flink ํ๋ก๊ทธ๋จ์ ์์ฑํด๋ณผ ๊ฑฐ์์. ๊ธด์ฅ๋๋์? ๊ฑฑ์ ๋ง์ธ์, ์์ฃผ ๊ฐ๋จํ ์์ ๋ก ์์ํ ๊ฑฐ์์!
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
public class SimpleFlinkJob {
public static void main(String[] args) throws Exception {
// ์คํ ํ๊ฒฝ ์์ฑ
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ๋ฐ์ดํฐ ์์ค ์์ฑ (์ฌ๊ธฐ์๋ ๊ฐ๋จํ 1๋ถํฐ 5๊น์ง์ ์ซ์๋ฅผ ์ฌ์ฉ)
DataStream<integer> numbers = env.fromElements(1, 2, 3, 4, 5);
// ๋ฐ์ดํฐ ์ฒ๋ฆฌ: ๊ฐ ์ซ์๋ฅผ 2๋ฐฐ๋ก ๋ง๋ค๊ธฐ
DataStream<integer> doubled = numbers.map(n -> n * 2);
// ๊ฒฐ๊ณผ ์ถ๋ ฅ
doubled.print();
// ํ๋ก๊ทธ๋จ ์คํ
env.execute("Simple Flink Job");
}
}
</integer></integer>
์ฐ์~ ์ฒซ ๋ฒ์งธ Flink ํ๋ก๊ทธ๋จ์ ์์ฑํ์ด์! ๐ ์ด ํ๋ก๊ทธ๋จ์ ๋ญ ํ๋ ๊ฑธ๊น์? ๊ฐ๋จํด์. 1๋ถํฐ 5๊น์ง์ ์ซ์๋ฅผ ๋ฐ์์ ๊ฐ๊ฐ์ 2๋ฐฐ๋ก ๋ง๋ค์ด ์ถ๋ ฅํ๋ ๊ฑฐ์์. ์คํํ๋ฉด 2, 4, 6, 8, 10์ด ์ถ๋ ฅ๋ ๊ฑฐ์์!
4. ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌํ๊ธฐ
์, ์ด์ ์กฐ๊ธ ๋ ์ค์ ์ ์ธ ์์ ๋ฅผ ๋ณผ๊น์? ์ค์๊ฐ์ผ๋ก ๋ค์ด์ค๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ํ๋ก๊ทธ๋จ์ ๋ง๋ค์ด๋ณผ ๊ฑฐ์์!
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
public class RealtimeProcessingJob {
public static void main(String[] args) throws Exception {
// ์คํ ํ๊ฒฝ ์์ฑ
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ๋ฐ์ดํฐ ์์ค ์์ฑ (์ค์ ๋ก๋ Kafka๋ ๋ค๋ฅธ ๋ฉ์์ง ์์คํ
์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์ฌ ์ ์์ด์)
DataStream<string> inputStream = env.socketTextStream("localhost", 9999);
// ๋ฐ์ดํฐ ์ฒ๋ฆฌ: 5์ด ๋์์ ๋ฐ์ดํฐ๋ฅผ ๋ชจ์์ ๋จ์ด ์ ์ธ๊ธฐ
DataStream<wordcount> wordCounts = inputStream
.flatMap((String line, Collector<wordcount> out) -> {
for (String word : line.split("\\s")) {
out.collect(new WordCount(word, 1L));
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count");
// ๊ฒฐ๊ณผ ์ถ๋ ฅ
wordCounts.print();
// ํ๋ก๊ทธ๋จ ์คํ
env.execute("Realtime Word Count");
}
public static class WordCount {
public String word;
public long count;
public WordCount() {}
public WordCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + ": " + count;
}
}
}
</wordcount></wordcount></string>
์ฐ์~ ์ด์ ์ง์ง ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ํ๋ก๊ทธ๋จ์ ๋ง๋ค์์ด์! ๐ ์ด ํ๋ก๊ทธ๋จ์ ๋ญ ํ๋ ๊ฑธ๊น์? ์ค๋ช ํด๋๋ฆด๊ฒ์:
- ๋จผ์ , ๋ก์ปฌํธ์คํธ์ 9999 ํฌํธ์์ ํ ์คํธ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์์. (์ค์ ๋ก๋ Kafka ๊ฐ์ ๋ฉ์์ง ์์คํ ์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์ฌ ์ ์์ด์)
- ๋ฐ์์จ ํ ์คํธ๋ฅผ ๋จ์ด๋ก ๋๋๊ณ , ๊ฐ ๋จ์ด๋ง๋ค ์นด์ดํธ๋ฅผ 1๋ก ์ค์ ํด์.
- 5์ด ๋์์ ๋ฐ์ดํฐ๋ฅผ ๋ชจ์์ ๊ฐ ๋จ์ด์ ์ถํ ํ์๋ฅผ ์ธ์.
- ๊ฒฐ๊ณผ๋ฅผ ์ถ๋ ฅํด์.
์ด ํ๋ก๊ทธ๋จ์ ์คํํ๊ณ , ๋ค๋ฅธ ํฐ๋ฏธ๋์์ nc -lk 9999
๋ช
๋ น์ด๋ก ํ
์คํธ๋ฅผ ์
๋ ฅํ๋ฉด, 5์ด๋ง๋ค ๊ฐ ๋จ์ด์ ์ถํ ํ์๋ฅผ ๋ณผ ์ ์์ด์. ์ ๋ง ๋ฉ์ง์ง ์๋์? ๐
๐ Flink์ ๊ณ ๊ธ ๊ธฐ๋ฅ๋ค
์, ์ด์ ๊ธฐ๋ณธ์ ์ธ ๊ฒ๋ค์ ์์์ผ๋ Flink์ ๋ ๋ฉ์ง ๊ธฐ๋ฅ๋ค์ ์ดํด๋ณผ๊น์? ์ฌ๋ฌ๋ถ, ์ค๋น๋์ จ๋์? ์ง๊ธ๋ถํฐ Flink์ ์จ๊ฒจ์ง ๋ณด๋ฌผ๋ค์ ํํค์ณ๋ณผ ๊ฑฐ์์! ๐ดโโ ๏ธ
1. ์ํ ๊ด๋ฆฌ (State Management)
Flink์ ์ํ ๊ด๋ฆฌ ๊ธฐ๋ฅ์ ์ ๋ง ๋๋จํด์. ๋ณต์กํ ์ฐ์ฐ์ ํ ๋ ์ด์ ๋ฐ์ดํฐ์ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ์ตํ๊ณ ์์ด์ผ ํ ๋๊ฐ ์์์์? Flink๋ ์ด๋ฐ ์ํฉ์ ์๋ฒฝํ๊ฒ ์ฒ๋ฆฌํ ์ ์์ด์!
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
public class AverageCalculator extends RichFlatMapFunction<integer double> {
private transient ValueState<tuple2 integer>> sum;
@Override
public void open(Configuration config) {
ValueStateDescriptor<tuple2 integer>> descriptor =
new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<tuple2 integer>>() {}));
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Integer input, Collector<double> out) throws Exception {
Tuple2<integer integer> currentSum = sum.value();
if (currentSum == null) {
currentSum = new Tuple2<>(0, 0);
}
currentSum.f0 += input;
currentSum.f1 += 1;
sum.update(currentSum);
if (currentSum.f1 >= 10) {
out.collect((double) currentSum.f0 / currentSum.f1);
sum.clear();
}
}
}
</integer></double></tuple2></tuple2></tuple2></integer>
์ด ์ฝ๋๋ ๋ญ ํ๋ ๊ฑธ๊น์? ๊ฐ๋จํด์! ์ ๋ ฅ์ผ๋ก ๋ค์ด์ค๋ ์ซ์๋ค์ ํ๊ท ์ ๊ณ์ฐํ๋ ๊ฑฐ์์. ๊ทธ๋ฐ๋ฐ ๋งค๋ฒ ์๋ก ๊ณ์ฐํ๋ ๊ฒ ์๋๋ผ, ์ด์ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ์ตํ๊ณ ์๋ค๊ฐ ์๋ก์ด ์ซ์๊ฐ ๋ค์ด์ฌ ๋๋ง๋ค ์ ๋ฐ์ดํธ๋ฅผ ํด์. ๊ทธ๋ฆฌ๊ณ 10๊ฐ์ ์ซ์๊ฐ ๋ชจ์ด๋ฉด ํ๊ท ์ ์ถ๋ ฅํ๊ณ ๋ค์ ์ฒ์๋ถํฐ ์์ํ๋ ๊ฑฐ์ฃ . ์์ ๋๋ํ์ง ์๋์? ๐
2. ์๊ฐ ๊ธฐ๋ฐ ์ฒ๋ฆฌ (Time-based Processing)
Flink๋ ์๊ฐ ๊ฐ๋ ์ ์ ๋ง ์ ๋ค๋ค์. ์ด๋ฒคํธ ์๊ฐ(Event Time)๊ณผ ์ฒ๋ฆฌ ์๊ฐ(Processing Time)์ ๊ตฌ๋ถํด์ ์ฌ์ฉํ ์ ์์ฃ . ์ด๊ฒ ์ ์ค์ํ๋๊ณ ์? ์ค์ ์ธ๊ณ์์๋ ๋ฐ์ดํฐ๊ฐ ๋ฐ์ํ ์๊ฐ๊ณผ ์ฒ๋ฆฌ๋๋ ์๊ฐ์ด ๋ค๋ฅผ ์ ์๊ฑฐ๋ ์!
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeBasedProcessing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ์ด๋ฒคํธ ์๊ฐ ์ฌ์ฉ ์ค์
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<myevent> events = env.addSource(new MyEventSource())
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<myevent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent event) {
return event.getTimestamp();
}
}
);
events
.keyBy(event -> event.getKey())
.timeWindow(Time.minutes(5))
.reduce((e1, e2) -> e1.getValue() > e2.getValue() ? e1 : e2)
.print();
env.execute("Time-based Processing Example");
}
}
</myevent></myevent>
์ฐ์~ ์ด ์ฝ๋ ์ข ๋ฉ์ง๋ฐ์? ๐ ์ด ํ๋ก๊ทธ๋จ์ 5๋ถ ๋จ์๋ก ๊ฐ ํค๋ณ๋ก ๊ฐ์ฅ ํฐ ๊ฐ์ ๊ฐ์ง ์ด๋ฒคํธ๋ฅผ ์ฐพ์๋ด์. ๊ทธ๋ฐ๋ฐ ์ฌ๊ธฐ์ ์ค์ํ ๊ฑด, ๋ฐ์ดํฐ๊ฐ ๋ฆ๊ฒ ๋์ฐฉํด๋ (์ต๋ 10์ด๊น์ง) ์ฌ๋ฐ๋ฅธ ์๊ฐ๋์ ํฌํจ์์ผ ์ฒ๋ฆฌํ๋ค๋ ๊ฑฐ์์. ์ค์ ์ธ๊ณ์ ๋ณต์กํ ์ํฉ์ ์ ํํ๊ฒ ๋ชจ๋ธ๋งํ ์ ์๋ ๊ฑฐ์ฃ !
3. ์ฒดํฌํฌ์ธํ ๊ณผ ์ฅ์ ๋ณต๊ตฌ (Checkpointing and Fault Tolerance)
Flink์ ๋ ๋ค๋ฅธ ๊ฐ์ ์ ๋ฐ๋ก ์ฅ์ ๋์ ๋ฅ๋ ฅ์ด์์. ์ฒดํฌํฌ์ธํ ์ด๋ผ๋ ๊ธฐ๋ฅ์ ํตํด ์ฃผ๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌ ์ํ๋ฅผ ์ ์ฅํ๊ณ , ๋ง์ฝ ์์คํ ์ ๋ฌธ์ ๊ฐ ์๊ฒจ๋ ๋ง์ง๋ง ์ฒดํฌํฌ์ธํธ๋ถํฐ ๋ค์ ์์ํ ์ ์์ด์. ์์ ๋๋ฐ์ด์ฃ ? ๐ฒ
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ์ฒดํฌํฌ์ธํ
์ค์
env.enableCheckpointing(5000); // 5์ด๋ง๋ค ์ฒดํฌํฌ์ธํธ ์์ฑ
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// ์ํ ๋ฐฑ์๋ ์ค์
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
์ด ์ค์ ์ผ๋ก Flink๋ 5์ด๋ง๋ค ์ฒดํฌํฌ์ธํธ๋ฅผ ๋ง๋ค๊ณ , ์ฅ์ ๊ฐ ๋ฐ์ํด๋ ์ ํํ ํ ๋ฒ(Exactly Once) ์ฒ๋ฆฌ๋ฅผ ๋ณด์ฅํด์. ๋ํ ์ฒดํฌํฌ์ธํธ๋ฅผ HDFS์ ์ ์ฅํด์ ์์คํ ์ด ์์ ํ ๋ค์ด๋์ด๋ ๋ณต๊ตฌํ ์ ์๊ฒ ํด์ค์. ์์ ์์ ํ์ฃ ? ๐
4. ๋ณต์กํ ์ด๋ฒคํธ ์ฒ๋ฆฌ (Complex Event Processing)
Flink๋ ๋จ์ํ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ๋์ด์ ๋ณต์กํ ์ด๋ฒคํธ ํจํด์ ๊ฐ์งํ๊ณ ์ฒ๋ฆฌํ ์ ์์ด์. ์ด๊ฑธ CEP(Complex Event Processing)๋ผ๊ณ ํ๋๋ฐ, ์ ๋ง ๊ฐ๋ ฅํด์!
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
DataStream<event> input = ...
Pattern<event> pattern = Pattern.<event>begin("start")
.where(new SimpleCondition<event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("start");
}
})
.next("middle")
.where(new SimpleCondition<event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("middle");
}
})
.followedBy("end")
.where(new SimpleCondition<event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
})
.within(Time.seconds(10));
PatternStream<event> patternStream = CEP.pattern(input, pattern);
DataStream<alert> result = patternStream.select(
(Map<string list>> pattern) -> {
return new Alert("Pattern Detected: " + pattern);
}
);
</string></alert></event></event></event></event></event></event></event>
์ด ์ฝ๋๋ ๋ญ ํ๋ ๊ฑธ๊น์? ์... ์์ํด๋ณด์ธ์. ์ฌ๋ฌ๋ถ์ด ๋ณด์ ์์คํ ์ ๋ง๋ค๊ณ ์๋ค๊ณ ํด์. ๊ทธ๋ฐ๋ฐ ํน์ ํจํด์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ๋ฉด ํดํน ์๋์ผ ์ ์๋ค๋ ๊ฑธ ์์์ด์. ์ด ์ฝ๋๋ ๋ฐ๋ก ๊ทธ๋ฐ ํจํด์ ๊ฐ์งํ๋ ๊ฑฐ์์! "start" ์ด๋ฒคํธ, ๊ทธ ๋ค์์ "middle" ์ด๋ฒคํธ, ๊ทธ๋ฆฌ๊ณ ๋ง์ง๋ง์ผ๋ก "end" ์ด๋ฒคํธ๊ฐ 10์ด ์ด๋ด์ ๋ฐ์ํ๋ฉด ์๋ฆผ์ ๋ณด๋ด๋ ๊ฑฐ์ฃ . ์์ ์ฒฉ๋ณด ์ํ ๊ฐ์ง ์๋์? ๐
๐ Flink์ ์ค์ ์ฌ์ฉ ์ฌ๋ก
์, ์ด์ Flink๊ฐ ์ผ๋ง๋ ๋๋จํ์ง ์๊ฒ ๋์ จ์ฃ ? ๊ทธ๋ผ ์ด์ ์ค์ ๋ก ์ด๋ค ๊ธฐ์ ๋ค์ด Flink๋ฅผ ์ฌ์ฉํ๊ณ ์๋์ง ์ดํด๋ณผ๊น์? ์ฌ๋ฌ๋ถ, ๋๋ผ์ง ๋ง์ธ์. ์ ๋ง ๋๋จํ ๊ธฐ์ ๋ค์ด Flink๋ฅผ ์ฌ์ฉํ๊ณ ์๊ฑฐ๋ ์! ๐ฎ
1. ์๋ฆฌ๋ฐ๋ฐ (Alibaba)
์ค๊ตญ์ ๊ฑฐ๋ ์ ์์๊ฑฐ๋ ๊ธฐ์ ์๋ฆฌ๋ฐ๋ฐ๋ Flink๋ฅผ ๋๊ท๋ชจ๋ก ์ฌ์ฉํ๊ณ ์์ด์. ํนํ '๊ด๊ตฐ์ '๋ผ๋ ๋๊ท๋ชจ ์ผํ ์ถ์ ๋ Flink์ ์ง๊ฐ๊ฐ ๋ฐํ๋๋ค๊ณ ํด์.
๐ ์๋ฆฌ๋ฐ๋ฐ์ Flink ํ์ฉ
- ์ค์๊ฐ ๊ฑฐ๋ ๋ชจ๋ํฐ๋ง
- ์ฌ๊ธฐ ๊ฑฐ๋ ํ์ง
- ์ค์๊ฐ ์ฌ๊ณ ๊ด๋ฆฌ
- ๊ฐ์ธํ๋ ์ํ ์ถ์ฒ
์๋ฆฌ๋ฐ๋ฐ๋ Flink๋ฅผ ์ฌ์ฉํด์ ์ด๋น ์์ต ๊ฑด์ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ค๊ณ ํด์. ์... ์์์ด ๊ฐ๋์? ๊ทธ ์์ฒญ๋ ์์ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๋ค๋! ๐คฏ
2. ์ฐ๋ฒ (Uber)
์ฐจ๋ ๊ณต์ ์๋น์ค๋ก ์ ๋ช ํ ์ฐ๋ฒ๋ Flink์ ์ด๋ ฌํ ํฌ์ด์์. ์ฐ๋ฒ๋ Flink๋ฅผ ์ฌ์ฉํด ์ค์๊ฐ์ผ๋ก ์ฐจ๋๊ณผ ์น๊ฐ์ ๋งค์นญํ๊ณ , ๋ค์ํ ๋ถ์์ ์ํํด์.
๐ ์ฐ๋ฒ์ Flink ํ์ฉ
- ์ค์๊ฐ ์ฐจ๋-์น๊ฐ ๋งค์นญ
- ๋์ ๊ฐ๊ฒฉ ์ฑ ์
- ์ด์ ์ ํ๋ ๋ถ์
- ์๋น์ค ํ์ง ๋ชจ๋ํฐ๋ง
์ฐ๋ฒ๋ Flink๋ฅผ ํตํด ์๋ฐฑ๋ง ๋ช ์ ์ด์ ์์ ์น๊ฐ์ ์ค์๊ฐ์ผ๋ก ์ฐ๊ฒฐํ๊ณ ์์ด์. ์ฌ๋ฌ๋ถ์ด ์ฐ๋ฒ๋ฅผ ์ด์ฉํ ๋๋ง๋ค, ๊ทธ ๋ค์์๋ Flink๊ฐ ์ด์ฌํ ์ผํ๊ณ ์๋ ๊ฑฐ์ฃ ! ๐
3. ๋ทํ๋ฆญ์ค (Netflix)
์คํธ๋ฆฌ๋ฐ ์๋น์ค์ ์, ๋ทํ๋ฆญ์ค๋ Flink๋ฅผ ์ฌ์ฉํ๊ณ ์์ด์. ๋ทํ๋ฆญ์ค๋ Flink๋ฅผ ํตํด ์ฌ์ฉ์ ๊ฒฝํ์ ๊ฐ์ ํ๊ณ , ์ฝํ ์ธ ์ถ์ฒ ์์คํ ์ ๊ฐํํ๊ณ ์์ฃ .
๐ฌ ๋ทํ๋ฆญ์ค์ Flink ํ์ฉ
- ์ค์๊ฐ ์์ฒญ ํจํด ๋ถ์
- ๊ฐ์ธํ๋ ์ฝํ ์ธ ์ถ์ฒ
- ์คํธ๋ฆฌ๋ฐ ํ์ง ๋ชจ๋ํฐ๋ง
- ์ด์ ์งํ ๊ฐ์ง ๋ฐ ๋์
๋ทํ๋ฆญ์ค๊ฐ ์ฌ๋ฌ๋ถ์ ์ทจํฅ์ ๋๋ฌด ์ ์๊ณ ์๋ค๊ณ ๋๋ ์ ์๋์? ๊ทธ ๋น๊ฒฐ์ด ๋ฐ๋ก Flink์์! Flink๊ฐ ์ฌ๋ฌ๋ถ์ ์์ฒญ ๊ธฐ๋ก์ ์ค์๊ฐ์ผ๋ก ๋ถ์ํด์ ์ต์ ์ ์ฝํ ์ธ ๋ฅผ ์ถ์ฒํด์ฃผ๋ ๊ฑฐ์ฃ . ์์ ์ ๊ธฐํ์ง ์๋์? ๐ฟ
4. ๋ผ์ธ (LINE)
- ์ง์์ธ์ ์ฒ - ์ง์ ์ฌ์ฐ๊ถ ๋ณดํธ ๊ณ ์ง
์ง์ ์ฌ์ฐ๊ถ ๋ณดํธ ๊ณ ์ง
- ์ ์๊ถ ๋ฐ ์์ ๊ถ: ๋ณธ ์ปจํ ์ธ ๋ ์ฌ๋ฅ๋ท์ ๋ ์ AI ๊ธฐ์ ๋ก ์์ฑ๋์์ผ๋ฉฐ, ๋ํ๋ฏผ๊ตญ ์ ์๊ถ๋ฒ ๋ฐ ๊ตญ์ ์ ์๊ถ ํ์ฝ์ ์ํด ๋ณดํธ๋ฉ๋๋ค.
- AI ์์ฑ ์ปจํ ์ธ ์ ๋ฒ์ ์ง์: ๋ณธ AI ์์ฑ ์ปจํ ์ธ ๋ ์ฌ๋ฅ๋ท์ ์ง์ ์ฐฝ์๋ฌผ๋ก ์ธ์ ๋๋ฉฐ, ๊ด๋ จ ๋ฒ๊ท์ ๋ฐ๋ผ ์ ์๊ถ ๋ณดํธ๋ฅผ ๋ฐ์ต๋๋ค.
- ์ฌ์ฉ ์ ํ: ์ฌ๋ฅ๋ท์ ๋ช ์์ ์๋ฉด ๋์ ์์ด ๋ณธ ์ปจํ ์ธ ๋ฅผ ๋ณต์ , ์์ , ๋ฐฐํฌ, ๋๋ ์์ ์ ์ผ๋ก ํ์ฉํ๋ ํ์๋ ์๊ฒฉํ ๊ธ์ง๋ฉ๋๋ค.
- ๋ฐ์ดํฐ ์์ง ๊ธ์ง: ๋ณธ ์ปจํ ์ธ ์ ๋ํ ๋ฌด๋จ ์คํฌ๋ํ, ํฌ๋กค๋ง, ๋ฐ ์๋ํ๋ ๋ฐ์ดํฐ ์์ง์ ๋ฒ์ ์ ์ฌ์ ๋์์ด ๋ฉ๋๋ค.
- AI ํ์ต ์ ํ: ์ฌ๋ฅ๋ท์ AI ์์ฑ ์ปจํ ์ธ ๋ฅผ ํ AI ๋ชจ๋ธ ํ์ต์ ๋ฌด๋จ ์ฌ์ฉํ๋ ํ์๋ ๊ธ์ง๋๋ฉฐ, ์ด๋ ์ง์ ์ฌ์ฐ๊ถ ์นจํด๋ก ๊ฐ์ฃผ๋ฉ๋๋ค.
์ฌ๋ฅ๋ท์ ์ต์ AI ๊ธฐ์ ๊ณผ ๋ฒ๋ฅ ์ ๊ธฐ๋ฐํ์ฌ ์์ฌ์ ์ง์ ์ฌ์ฐ๊ถ์ ์ ๊ทน์ ์ผ๋ก ๋ณดํธํ๋ฉฐ,
๋ฌด๋จ ์ฌ์ฉ ๋ฐ ์นจํด ํ์์ ๋ํด ๋ฒ์ ๋์์ ํ ๊ถ๋ฆฌ๋ฅผ ๋ณด์ ํฉ๋๋ค.
ยฉ 2025 ์ฌ๋ฅ๋ท | All rights reserved.
๋๊ธ 0๊ฐ