문의 주신 내용에 맞는 전문 컨설턴트 배정 후 연락드리겠습니다.
· 관리형 Flink
· Wordcount 예제
· 정리
$ mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.6.2
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── java
│ └── org
│ └── myorg
│ └── quickstart
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
package myorg;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class StreamingJob {
private static final String region = "us-east-1";
private static final String inputStreamName = "TextInputStream";
private static final String outputStreamName = "WordCountOutputStream";
private static DataStream createSourceFromStaticConfig(
StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
return env.addSource(new FlinkKinesisConsumer(inputStreamName,
new SimpleStringSchema(), inputProperties));
}
private static FlinkKinesisProducer createSinkFromStaticConfig() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
FlinkKinesisProducer sink = new FlinkKinesisProducer(new
SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition("0");
return sink;
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream input = createSourceFromStaticConfig(env);
input.flatMap(new Tokenizer())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.map(new MapFunction() {
@Override
public String map(Tuple2 value) throws Exception {
return value.f0 + "," + value.f1.toString();
}
})
.addSink(createSinkFromStaticConfig());
env.execute("Word Count");
}
public static final class Tokenizer
implements FlatMapFunction{
@Override
public void flatMap(String value, Collectorout) {
String[] tokens = value.toLowerCase().split("W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
}
mvn clean install -Pinclude-kinesis -DskipTests
org.apache.flink
flink-connector-kinesis_2.11
1.6.2
mvn clean package
import boto3
import json
from datetime import datetime
import calendar
import random
import time
my_stream_name = 'TextInputStream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
def put_to_stream(thing_id, property_value, property_timestamp):
paragraphs = [ 'Apache Flink is a framework and distributed processing engine
for stateful computations over unbounded and bounded data streams. Flink has been
designed to run in all common cluster environments, perform computations
at in-memory speed and at any scale.',
'Any kind of data is produced as a stream of events. Credit card transactions,
sensor measurements, machine logs, or user interactions on a website or mobile application,
all of these data are generated as a stream.',
'Amazon Kinesis Data Analytics is the easiest way to analyze streaming data,
gain actionable insights, and respond to your business and customer needs in real time.
Amazon Kinesis Data Analytics reduces the complexity of building, managing, and integrating
streaming applications with other AWS services. SQL users can easily query streaming data or
build entire streaming applications using templates and an interactive SQL editor.
Java developers can quickly build sophisticated streaming applications using open source Java
libraries and AWS integrations to transform and analyze data in real-time.',
'Amazon Kinesis Data Analytics takes care of everything required to run your real-time applications
continuously and scales automatically to match the volume and throughput of your incoming data.
With Amazon Kinesis Data Analytics, you only pay for the resources your streaming applications consume.
There is no minimum fee or setup cost.'
]
message = random.choice(paragraphs)
print message
put_response = kinesis_client.put_record(
StreamName=my_stream_name,
Data=message,
PartitionKey=thing_id)
while True:
property_value = random.randint(40, 120)
property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
thing_id = 'aa-bb'
put_to_stream(thing_id, property_value, property_timestamp)
# wait for 5 second
time.sleep(1.5)
import boto3
import json
from datetime import datetime
import time
# import unicodedata
# import base64
my_stream_name = 'WordCountOutputStream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
Limit=2)
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
Limit=2)
if len(record_response['Records']) > 0:
result = record_response['Records'][0]['Data']
print "----------------------------------------------------------"
print result
print "----------------------------------------------------------"
else:
print record_response
# wait for 5 seconds
time.sleep(5)